44import logging
55import socket
66from kafka .client_async import KafkaClient , selectors
7+ import kafka .errors as Errors
78from kafka .errors import (
8- IncompatibleBrokerVersion , KafkaConfigurationError , KafkaConnectionError ,
9- NodeNotReadyError , NotControllerError )
9+ IncompatibleBrokerVersion , KafkaConfigurationError , NotControllerError ,
10+ UnrecognizedBrokerVersion )
1011from kafka .metrics import MetricConfig , Metrics
1112from kafka .protocol .admin import (
1213 CreateTopicsRequest , DeleteTopicsRequest , DescribeConfigsRequest , AlterConfigsRequest , CreatePartitionsRequest ,
@@ -240,17 +241,22 @@ def _validate_timeout(self, timeout_ms):
240241 return timeout_ms or self .config ['request_timeout_ms' ]
241242
242243 def _refresh_controller_id (self ):
243- """Determine the kafka cluster controller
244- """
245- response = self ._send_request_to_node (
246- self ._client .least_loaded_node (),
247- MetadataRequest [1 ]([])
248- )
249- self ._controller_id = response .controller_id
250- version = self ._client .check_version (self ._controller_id )
251- if version < (0 , 10 , 0 ):
252- raise IncompatibleBrokerVersion (
253- "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
244+ """Determine the kafka cluster controller."""
245+ version = self ._matching_api_version (MetadataRequest )
246+ if 1 <= version <= 6 :
247+ request = MetadataRequest [version ]()
248+ response = self ._send_request_to_node (self ._client .least_loaded_node (), request )
249+ controller_id = response .controller_id
250+ # verify the controller is new enough to support our requests
251+ controller_version = self ._client .check_version (controller_id )
252+ if controller_version < (0 , 10 , 0 ):
253+ raise IncompatibleBrokerVersion (
254+ "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
255+ .format (controller_version ))
256+ self ._controller_id = controller_id
257+ else :
258+ raise UnrecognizedBrokerVersion (
259+ "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
254260 .format (version ))
255261
256262 def _send_request_to_node (self , node , request ):
@@ -271,22 +277,34 @@ def _send_request_to_node(self, node, request):
271277 else :
272278 raise future .exception # pylint: disable-msg=raising-bad-type
273279
274- def _send (self , request ):
275- """Send a kafka protocol message to the cluster controller. Will block until the message result is received.
280+ def _send_request_to_controller (self , request ):
281+ """Send a kafka protocol message to the cluster controller.
282+
283+ Will block until the message result is received.
276284
277285 :param request: The message to send
278- :return The kafka protocol response for the message
279- :exception NodeNotReadyError: If the controller connection can't be established
286+ :return: The kafka protocol response for the message
280287 """
281- remaining_tries = 2
282- while remaining_tries > 0 :
283- remaining_tries = remaining_tries - 1
284- try :
285- return self ._send_request_to_node (self ._controller_id , request )
286- except (NotControllerError , KafkaConnectionError ) as e :
287- # controller changed? refresh it
288- self ._refresh_controller_id ()
289- raise NodeNotReadyError (self ._controller_id )
288+ tries = 2 # in case our cached self._controller_id is outdated
289+ while tries :
290+ tries -= 1
291+ response = self ._send_request_to_node (self ._controller_id , request )
292+ # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
293+ for topic , error_code in getattr (response , "topic_errors" , response .topic_error_codes ):
294+ error_type = Errors .for_code (error_code )
295+ if tries and isinstance (error_type , NotControllerError ):
296+ # No need to inspect the rest of the errors for
297+ # non-retriable errors because NotControllerError should
298+ # either be thrown for all errors or no errors.
299+ self ._refresh_controller_id ()
300+ break
301+ elif error_type is not Errors .NoError :
302+ raise error_type (
303+ "Request '{}' failed with response '{}'."
304+ .format (request , response ))
305+ else :
306+ return response
307+ raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
290308
291309 @staticmethod
292310 def _convert_new_topic_request (new_topic ):
@@ -332,7 +350,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
332350 raise NotImplementedError (
333351 "Support for CreateTopics v{} has not yet been added to KafkaAdmin."
334352 .format (version ))
335- return self ._send (request )
353+ return self ._send_request_to_controller (request )
336354
337355 def delete_topics (self , topics , timeout_ms = None ):
338356 """Delete topics from the cluster
@@ -352,19 +370,25 @@ def delete_topics(self, topics, timeout_ms=None):
352370 raise NotImplementedError (
353371 "Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
354372 .format (version ))
355- return self ._send (request )
373+ return self ._send_request_to_controller (request )
356374
357375 # list topics functionality is in ClusterMetadata
376+ # Note: if implemented here, send the request to the least_loaded_node()
358377
359378 # describe topics functionality is in ClusterMetadata
379+ # Note: if implemented here, send the request to the controller
360380
361381 # describe cluster functionality is in ClusterMetadata
382+ # Note: if implemented here, send the request to the least_loaded_node()
362383
363- # describe_acls protocol not implemented
384+ # describe_acls protocol not yet implemented
385+ # Note: send the request to the least_loaded_node()
364386
365- # create_acls protocol not implemented
387+ # create_acls protocol not yet implemented
388+ # Note: send the request to the least_loaded_node()
366389
367- # delete_acls protocol not implemented
390+ # delete_acls protocol not yet implemented
391+ # Note: send the request to the least_loaded_node()
368392
369393 @staticmethod
370394 def _convert_describe_config_resource_request (config_resource ):
@@ -404,7 +428,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
404428 raise NotImplementedError (
405429 "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
406430 .format (version ))
407- return self ._send ( request )
431+ return self ._send_request_to_node ( self . _client . least_loaded_node (), request )
408432
409433 @staticmethod
410434 def _convert_alter_config_resource_request (config_resource ):
@@ -419,6 +443,12 @@ def _convert_alter_config_resource_request(config_resource):
419443 def alter_configs (self , config_resources ):
420444 """Alter configuration parameters of one or more kafka resources.
421445
446+ Warning:
447+ This is currently broken for BROKER resources because those must be
448+ sent to that specific broker, versus this always picks the
449+ least-loaded node. See the comment in the source code for details.
450+ We would happily accept a PR fixing this.
451+
422452 :param config_resources: An array of ConfigResource objects.
423453 :return: Appropriate version of AlterConfigsResponse class
424454 """
@@ -431,11 +461,19 @@ def alter_configs(self, config_resources):
431461 raise NotImplementedError (
432462 "Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
433463 .format (version ))
434- return self ._send (request )
464+ # TODO the Java client has the note:
465+ # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
466+ # // and send the request to that specific broker. Other resources are grouped together into
467+ # // a single request that may be sent to any broker.
468+ #
469+ # So this is currently broken as it always sends to the least_loaded_node()
470+ return self ._send_request_to_node (self ._client .least_loaded_node (), request )
435471
436- # alter replica logs dir protocol not implemented
472+ # alter replica logs dir protocol not yet implemented
473+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
437474
438- # describe log dirs protocol not implemented
475+ # describe log dirs protocol not yet implemented
476+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
439477
440478 @staticmethod
441479 def _convert_create_partitions_request (topic_name , new_partitions ):
@@ -468,17 +506,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
468506 raise NotImplementedError (
469507 "Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
470508 .format (version ))
471- return self ._send (request )
509+ return self ._send_request_to_controller (request )
472510
473- # delete records protocol not implemented
511+ # delete records protocol not yet implemented
512+ # Note: send the request to the partition leaders
474513
475- # create delegation token protocol not implemented
514+ # create delegation token protocol not yet implemented
515+ # Note: send the request to the least_loaded_node()
476516
477- # renew delegation token protocol not implemented
517+ # renew delegation token protocol not yet implemented
518+ # Note: send the request to the least_loaded_node()
478519
479- # expire delegation_token protocol not implemented
520+ # expire delegation_token protocol not yet implemented
521+ # Note: send the request to the least_loaded_node()
480522
481- # describe delegation_token protocol not implemented
523+ # describe delegation_token protocol not yet implemented
524+ # Note: send the request to the least_loaded_node()
482525
483526 def describe_consumer_groups (self , group_ids ):
484527 """Describe a set of consumer groups.
@@ -495,7 +538,8 @@ def describe_consumer_groups(self, group_ids):
495538 raise NotImplementedError (
496539 "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
497540 .format (version ))
498- return self ._send (request )
541+ # TODO this is completely broken, as it needs to send to the group coordinator
542+ # return self._send(request)
499543
500544 def list_consumer_groups (self ):
501545 """List all consumer groups known to the cluster.
@@ -509,6 +553,8 @@ def list_consumer_groups(self):
509553 raise NotImplementedError (
510554 "Support for ListGroups v{} has not yet been added to KafkaAdmin."
511555 .format (version ))
512- return self ._send (request )
556+ # TODO this is completely broken, as it needs to send to the group coordinator
557+ # return self._send(request)
513558
514- # delete groups protocol not implemented
559+ # delete groups protocol not yet implemented
560+ # Note: send the request to the group's coordinator.
0 commit comments