1919import java .util .ArrayList ;
2020import java .util .Collection ;
2121import java .util .Collections ;
22+ import kafka .admin .AdminUtils ;
2223import java .util .HashMap ;
2324import java .util .HashSet ;
2425import java .util .List ;
3435import java .util .concurrent .TimeUnit ;
3536import java .util .concurrent .TimeoutException ;
3637import java .util .concurrent .atomic .AtomicBoolean ;
37- import kafka .admin .AdminUtils ;
3838import kafka .admin .BrokerMetadata ;
3939import kafka .server .ConfigType ;
4040import kafka .zk .KafkaZkClient ;
@@ -88,17 +88,18 @@ public class MultiClusterTopicManagementService implements Service {
8888 private final long _preferredLeaderElectionIntervalMs ;
8989 private final ScheduledExecutorService _executor ;
9090
91-
9291 @ SuppressWarnings ("unchecked" )
9392 public MultiClusterTopicManagementService (Map <String , Object > props , String serviceName ) throws Exception {
9493 _serviceName = serviceName ;
9594 MultiClusterTopicManagementServiceConfig config = new MultiClusterTopicManagementServiceConfig (props );
9695 String topic = config .getString (CommonServiceConfig .TOPIC_CONFIG );
97- Map <String , Map > propsByCluster = props .containsKey (MultiClusterTopicManagementServiceConfig .PROPS_PER_CLUSTER_CONFIG )
98- ? (Map ) props .get (MultiClusterTopicManagementServiceConfig .PROPS_PER_CLUSTER_CONFIG ) : new HashMap <>();
96+ Map <String , Map > propsByCluster =
97+ props .containsKey (MultiClusterTopicManagementServiceConfig .PROPS_PER_CLUSTER_CONFIG ) ? (Map ) props .get (
98+ MultiClusterTopicManagementServiceConfig .PROPS_PER_CLUSTER_CONFIG ) : new HashMap <>();
9999 _topicManagementByCluster = initializeTopicManagementHelper (propsByCluster , topic );
100100 _scheduleIntervalMs = config .getInt (MultiClusterTopicManagementServiceConfig .REBALANCE_INTERVAL_MS_CONFIG );
101- _preferredLeaderElectionIntervalMs = config .getLong (MultiClusterTopicManagementServiceConfig .PREFERRED_LEADER_ELECTION_CHECK_INTERVAL_MS_CONFIG );
101+ _preferredLeaderElectionIntervalMs =
102+ config .getLong (MultiClusterTopicManagementServiceConfig .PREFERRED_LEADER_ELECTION_CHECK_INTERVAL_MS_CONFIG );
102103 _executor = Executors .newSingleThreadScheduledExecutor (
103104 r -> new Thread (r , _serviceName + "-multi-cluster-topic-management-service" ));
104105 _topicPartitionResult .complete (null );
@@ -108,14 +109,16 @@ public CompletableFuture<Void> topicPartitionResult() {
108109 return _topicPartitionResult ;
109110 }
110111
111- private Map <String , TopicManagementHelper > initializeTopicManagementHelper (Map <String , Map > propsByCluster , String topic ) throws Exception {
112+ private Map <String , TopicManagementHelper > initializeTopicManagementHelper (Map <String , Map > propsByCluster ,
113+ String topic ) throws Exception {
112114 Map <String , TopicManagementHelper > topicManagementByCluster = new HashMap <>();
113- for (Map .Entry <String , Map > entry : propsByCluster .entrySet ()) {
115+ for (Map .Entry <String , Map > entry : propsByCluster .entrySet ()) {
114116 String clusterName = entry .getKey ();
115117 Map serviceProps = entry .getValue ();
116- if (serviceProps .containsKey (MultiClusterTopicManagementServiceConfig .TOPIC_CONFIG ))
117- throw new ConfigException ("The raw per-cluster config for MultiClusterTopicManagementService must not contain " +
118- MultiClusterTopicManagementServiceConfig .TOPIC_CONFIG );
118+ if (serviceProps .containsKey (MultiClusterTopicManagementServiceConfig .TOPIC_CONFIG )) {
119+ throw new ConfigException ("The raw per-cluster config for MultiClusterTopicManagementService must not contain "
120+ + MultiClusterTopicManagementServiceConfig .TOPIC_CONFIG );
121+ }
119122 serviceProps .put (MultiClusterTopicManagementServiceConfig .TOPIC_CONFIG , topic );
120123 topicManagementByCluster .put (clusterName , new TopicManagementHelper (serviceProps ));
121124 }
@@ -129,8 +132,8 @@ public synchronized void start() {
129132 _executor .scheduleWithFixedDelay (tmRunnable , 0 , _scheduleIntervalMs , TimeUnit .MILLISECONDS );
130133
131134 Runnable pleRunnable = new PreferredLeaderElectionRunnable ();
132- _executor .scheduleWithFixedDelay (pleRunnable , _preferredLeaderElectionIntervalMs , _preferredLeaderElectionIntervalMs ,
133- TimeUnit .MILLISECONDS );
135+ _executor .scheduleWithFixedDelay (pleRunnable , _preferredLeaderElectionIntervalMs ,
136+ _preferredLeaderElectionIntervalMs , TimeUnit .MILLISECONDS );
134137 LOGGER .info ("{}/MultiClusterTopicManagementService started." , _serviceName );
135138 }
136139 }
@@ -153,13 +156,12 @@ public void awaitShutdown(long timeout, TimeUnit unit) {
153156 try {
154157 _executor .awaitTermination (Integer .MAX_VALUE , TimeUnit .MILLISECONDS );
155158 } catch (InterruptedException e ) {
156- LOGGER .info ("Thread interrupted when waiting for {}/MultiClusterTopicManagementService to shutdown" , _serviceName );
159+ LOGGER .info ("Thread interrupted when waiting for {}/MultiClusterTopicManagementService to shutdown" ,
160+ _serviceName );
157161 }
158162 LOGGER .info ("{}/MultiClusterTopicManagementService shutdown completed" , _serviceName );
159163 }
160164
161-
162-
163165 private class TopicManagementRunnable implements Runnable {
164166
165167 @ Override
@@ -190,7 +192,8 @@ public void run() {
190192 try {
191193 helper .maybeReassignPartitionAndElectLeader ();
192194 } catch (KafkaException e ) {
193- LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
195+ LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName ,
196+ e );
194197 }
195198 }
196199 } catch (Throwable t ) {
@@ -216,7 +219,8 @@ public void run() {
216219 try {
217220 helper .maybeElectLeader ();
218221 } catch (IOException | KafkaException e ) {
219- LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
222+ LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName ,
223+ e );
220224 }
221225 }
222226 } catch (Throwable t ) {
@@ -246,7 +250,6 @@ static class TopicManagementHelper {
246250 String _topic ;
247251 TopicFactory _topicFactory ;
248252
249-
250253 @ SuppressWarnings ("unchecked" )
251254 TopicManagementHelper (Map <String , Object > props ) throws Exception {
252255 TopicManagementServiceConfig config = new TopicManagementServiceConfig (props );
@@ -263,13 +266,17 @@ static class TopicManagementHelper {
263266 _bootstrapServers = adminClientConfig .getList (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG );
264267 _topicProperties = new Properties ();
265268 if (props .containsKey (TopicManagementServiceConfig .TOPIC_PROPS_CONFIG )) {
266- for (Map .Entry <String , Object > entry : ((Map <String , Object >) props .get (TopicManagementServiceConfig .TOPIC_PROPS_CONFIG )).entrySet ())
269+ for (Map .Entry <String , Object > entry : ((Map <String , Object >) props .get (
270+ TopicManagementServiceConfig .TOPIC_PROPS_CONFIG )).entrySet ()) {
267271 _topicProperties .put (entry .getKey (), entry .getValue ().toString ());
272+ }
268273 }
269274
270- Map topicFactoryConfig = props .containsKey (TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) ?
271- (Map ) props .get (TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) : new HashMap ();
272- _topicFactory = (TopicFactory ) Class .forName (topicFactoryClassName ).getConstructor (Map .class ).newInstance (topicFactoryConfig );
275+ Map topicFactoryConfig =
276+ props .containsKey (TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) ? (Map ) props .get (
277+ TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) : new HashMap ();
278+ _topicFactory =
279+ (TopicFactory ) Class .forName (topicFactoryClassName ).getConstructor (Map .class ).newInstance (topicFactoryConfig );
273280
274281 _adminClient = constructAdminClient (props );
275282 LOGGER .info ("{} configs: {}" , _adminClient .getClass ().getSimpleName (), props );
@@ -304,29 +311,27 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup
304311
305312 int partitionNum = partitions .size ();
306313 if (partitionNum < minPartitionNum ) {
307- LOGGER .info ("{} will increase partition of the topic {} in the cluster from {}"
308- + " to {}." , this .getClass ().toString (), _topic , partitionNum , minPartitionNum );
314+ LOGGER .info ("{} will increase partition of the topic {} in the cluster from {}" + " to {}." ,
315+ this .getClass ().toString (), _topic , partitionNum , minPartitionNum );
309316 Set <Integer > blackListedBrokers = _topicFactory .getBlackListedBrokers (_zkConnect );
310317 Set <BrokerMetadata > brokers = new HashSet <>();
311318 for (Node broker : _adminClient .describeCluster ().nodes ().get ()) {
312- BrokerMetadata brokerMetadata = new BrokerMetadata (
313- broker .id (), null
314- );
319+ BrokerMetadata brokerMetadata = new BrokerMetadata (broker .id (), null );
315320 brokers .add (brokerMetadata );
316321 }
317322
318323 if (!blackListedBrokers .isEmpty ()) {
319324 brokers .removeIf (broker -> blackListedBrokers .contains (broker .id ()));
320325 }
321326
322- List <List <Integer >> newPartitionAssignments = newPartitionAssignments (minPartitionNum , partitionNum , brokers , _replicationFactor );
327+ List <List <Integer >> newPartitionAssignments =
328+ newPartitionAssignments (minPartitionNum , partitionNum , brokers , _replicationFactor );
323329
324330 NewPartitions newPartitions = NewPartitions .increaseTo (minPartitionNum , newPartitionAssignments );
325331
326332 Map <String , NewPartitions > newPartitionsMap = new HashMap <>();
327333 newPartitionsMap .put (_topic , newPartitions );
328334 CreatePartitionsResult createPartitionsResult = _adminClient .createPartitions (newPartitionsMap );
329-
330335 }
331336 }
332337
@@ -402,7 +407,6 @@ int numPartitions() throws InterruptedException, ExecutionException {
402407 return _adminClient .describeTopics (Collections .singleton (_topic )).values ().get (_topic ).get ().partitions ().size ();
403408 }
404409
405-
406410 private Set <Node > getAvailableBrokers () throws ExecutionException , InterruptedException {
407411 Set <Node > brokers = new HashSet <>(_adminClient .describeCluster ().nodes ().get ());
408412 Set <Integer > blackListedBrokers = _topicFactory .getBlackListedBrokers (_zkConnect );
@@ -412,11 +416,11 @@ private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedEx
412416
413417 void maybeReassignPartitionAndElectLeader () throws ExecutionException , InterruptedException , TimeoutException {
414418 try (KafkaZkClient zkClient = KafkaZkClient .apply (_zkConnect , JaasUtils .isZkSecurityEnabled (),
415- Utils .ZK_SESSION_TIMEOUT_MS , Utils .ZK_CONNECTION_TIMEOUT_MS ,
416- Integer . MAX_VALUE , Time . SYSTEM , METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
419+ Utils .ZK_SESSION_TIMEOUT_MS , Utils .ZK_CONNECTION_TIMEOUT_MS , Integer . MAX_VALUE , Time . SYSTEM ,
420+ METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
417421
418- List <TopicPartitionInfo > partitionInfoList = _adminClient
419- .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
422+ List <TopicPartitionInfo > partitionInfoList =
423+ _adminClient .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
420424 Collection <Node > brokers = this .getAvailableBrokers ();
421425 boolean partitionReassigned = false ;
422426 if (partitionInfoList .size () == 0 ) {
@@ -426,27 +430,31 @@ void maybeReassignPartitionAndElectLeader() throws ExecutionException, Interrupt
426430 int currentReplicationFactor = getReplicationFactor (partitionInfoList );
427431 int expectedReplicationFactor = Math .max (currentReplicationFactor , _replicationFactor );
428432
429- if (_replicationFactor < currentReplicationFactor )
433+ if (_replicationFactor < currentReplicationFactor ) {
430434 LOGGER .debug (
431435 "Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster." ,
432436 _replicationFactor , currentReplicationFactor , _topic );
437+ }
433438
434- if (expectedReplicationFactor > currentReplicationFactor
435- && Utils . ongoingPartitionReassignments ( _adminClient ) .isEmpty ()) {
439+ if (expectedReplicationFactor > currentReplicationFactor && Utils . ongoingPartitionReassignments ( _adminClient )
440+ .isEmpty ()) {
436441 LOGGER .info (
437442 "MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
438443 + "from {} to {}" , _topic , currentReplicationFactor , expectedReplicationFactor );
439444 reassignPartitions (_adminClient , brokers , _topic , partitionInfoList .size (), expectedReplicationFactor );
445+
440446 partitionReassigned = true ;
441447 }
442448
443449 // Update the properties of the monitor topic if any config is different from the user-specified config
444450 Properties currentProperties = zkClient .getEntityConfigs (ConfigType .Topic (), _topic );
445451 Properties expectedProperties = new Properties ();
446- for (Object key : currentProperties .keySet ())
452+ for (Object key : currentProperties .keySet ()) {
447453 expectedProperties .put (key , currentProperties .get (key ));
448- for (Object key : _topicProperties .keySet ())
454+ }
455+ for (Object key : _topicProperties .keySet ()) {
449456 expectedProperties .put (key , _topicProperties .get (key ));
457+ }
450458
451459 if (!currentProperties .equals (expectedProperties )) {
452460 LOGGER .info ("MultiClusterTopicManagementService will overwrite properties of the topic {} "
@@ -458,16 +466,14 @@ void maybeReassignPartitionAndElectLeader() throws ExecutionException, Interrupt
458466 && Utils .ongoingPartitionReassignments (_adminClient ).isEmpty ()) {
459467 LOGGER .info ("{} will reassign partitions of the topic {} in cluster." , this .getClass ().toString (), _topic );
460468 reassignPartitions (_adminClient , brokers , _topic , partitionInfoList .size (), expectedReplicationFactor );
469+
461470 partitionReassigned = true ;
462471 }
463472
464- if (partitionInfoList .size () >= brokers .size () &&
465- someBrokerNotElectedLeader (partitionInfoList , brokers )) {
473+ if (partitionInfoList .size () >= brokers .size () && someBrokerNotElectedLeader (partitionInfoList , brokers )) {
466474 if (!partitionReassigned || Utils .ongoingPartitionReassignments (_adminClient ).isEmpty ()) {
467- LOGGER .info (
468- "MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
469- + "cluster." , _topic
470- );
475+ LOGGER .info ("MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
476+ + "cluster." , _topic );
471477 triggerPreferredLeaderElection (partitionInfoList , _topic );
472478 _preferredLeaderElectionRequested = false ;
473479 } else {
@@ -482,14 +488,14 @@ void maybeElectLeader() throws Exception {
482488 return ;
483489 }
484490
485- try (KafkaZkClient zkClient = KafkaZkClient .apply (_zkConnect , JaasUtils .isZkSecurityEnabled (), Utils .ZK_SESSION_TIMEOUT_MS ,
486- Utils .ZK_CONNECTION_TIMEOUT_MS , Integer .MAX_VALUE , Time .SYSTEM , METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
491+ try (KafkaZkClient zkClient = KafkaZkClient .apply (_zkConnect , JaasUtils .isZkSecurityEnabled (),
492+ Utils .ZK_SESSION_TIMEOUT_MS , Utils .ZK_CONNECTION_TIMEOUT_MS , Integer .MAX_VALUE , Time .SYSTEM ,
493+ METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
487494 if (Utils .ongoingPartitionReassignments (_adminClient ).isEmpty ()) {
488- List <TopicPartitionInfo > partitionInfoList = _adminClient
489- .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
490- LOGGER .info (
491- "MultiClusterTopicManagementService will trigger requested preferred leader election for the"
492- + " topic {} in cluster." , _topic );
495+ List <TopicPartitionInfo > partitionInfoList =
496+ _adminClient .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
497+ LOGGER .info ("MultiClusterTopicManagementService will trigger requested preferred leader election for the"
498+ + " topic {} in cluster." , _topic );
493499 triggerPreferredLeaderElection (partitionInfoList , _topic );
494500 _preferredLeaderElectionRequested = false ;
495501 }
@@ -503,7 +509,8 @@ private void triggerPreferredLeaderElection(List<TopicPartitionInfo> partitionIn
503509 }
504510 ElectPreferredLeadersResult electPreferredLeadersResult = _adminClient .electPreferredLeaders (partitions );
505511
506- LOGGER .info ("{}: triggerPreferredLeaderElection - {}" , this .getClass ().toString (), electPreferredLeadersResult .all ());
512+ LOGGER .info ("{}: triggerPreferredLeaderElection - {}" , this .getClass ().toString (),
513+ electPreferredLeadersResult .all ());
507514 }
508515
509516 private static void reassignPartitions (AdminClient adminClient , Collection <Node > brokers , String topic ,
@@ -541,11 +548,13 @@ private static void reassignPartitions(AdminClient adminClient, Collection<Node>
541548 NewPartitionReassignment newPartitionReassignment = new NewPartitionReassignment (targetReplicas );
542549 reassignments .put (topicPartitionSeqEntry .getKey (), Optional .of (newPartitionReassignment ));
543550 }
551+
544552 AlterPartitionReassignmentsResult alterPartitionReassignmentsResult =
545553 adminClient .alterPartitionReassignments (reassignments );
546554 try {
547555 alterPartitionReassignmentsResult .all ().get ();
548556 } catch (InterruptedException | ExecutionException e ) {
557+
549558 LOGGER .error ("An exception occurred while altering the partition reassignments for {}" , topic , e );
550559 }
551560 }
@@ -567,21 +576,25 @@ static int getReplicationFactor(List<TopicPartitionInfo> partitionInfoList) {
567576
568577 static boolean someBrokerNotPreferredLeader (List <TopicPartitionInfo > partitionInfoList , Collection <Node > brokers ) {
569578 Set <Integer > brokersNotPreferredLeader = new HashSet <>(brokers .size ());
570- for (Node broker : brokers )
579+ for (Node broker : brokers ) {
571580 brokersNotPreferredLeader .add (broker .id ());
572- for (TopicPartitionInfo partitionInfo : partitionInfoList )
581+ }
582+ for (TopicPartitionInfo partitionInfo : partitionInfoList ) {
573583 brokersNotPreferredLeader .remove (partitionInfo .replicas ().get (0 ).id ());
584+ }
574585
575586 return !brokersNotPreferredLeader .isEmpty ();
576587 }
577588
578589 static boolean someBrokerNotElectedLeader (List <TopicPartitionInfo > partitionInfoList , Collection <Node > brokers ) {
579590 Set <Integer > brokersNotElectedLeader = new HashSet <>(brokers .size ());
580- for (Node broker : brokers )
591+ for (Node broker : brokers ) {
581592 brokersNotElectedLeader .add (broker .id ());
593+ }
582594 for (TopicPartitionInfo partitionInfo : partitionInfoList ) {
583- if (partitionInfo .leader () != null )
595+ if (partitionInfo .leader () != null ) {
584596 brokersNotElectedLeader .remove (partitionInfo .leader ().id ());
597+ }
585598 }
586599 return !brokersNotElectedLeader .isEmpty ();
587600 }
@@ -634,11 +647,16 @@ static boolean someBrokerNotElectedLeader(List<TopicPartitionInfo> partitionInfo
634647 * {"topic":"kmf-topic","partition":0,"replicas":[2,0]}]}
635648 * </pre>
636649 */
637- private static String formatAsNewReassignmentJson (String topic , scala .collection .Map <Object , Seq <Object >> partitionsToReassign ) {
650+ private static String formatAsNewReassignmentJson (String topic ,
651+ scala .collection .Map <Object , Seq <Object >> partitionsToReassign ) {
638652 StringBuilder builder = new StringBuilder ();
639653 builder .append ("{\" version\" :1,\" partitions\" :[\n " );
640654 for (int partition = 0 ; partition < partitionsToReassign .size (); partition ++) {
641- builder .append (" {\" topic\" :\" " ).append (topic ).append ("\" ,\" partition\" :" ).append (partition ).append (",\" replicas\" :[" );
655+ builder .append (" {\" topic\" :\" " )
656+ .append (topic )
657+ .append ("\" ,\" partition\" :" )
658+ .append (partition )
659+ .append (",\" replicas\" :[" );
642660 Seq <Object > replicas = partitionsToReassign .apply (partition );
643661 for (int replicaIndex = 0 ; replicaIndex < replicas .size (); replicaIndex ++) {
644662 Object replica = replicas .apply (replicaIndex );
@@ -651,6 +669,5 @@ private static String formatAsNewReassignmentJson(String topic, scala.collection
651669 builder .append ("]}" );
652670 return builder .toString ();
653671 }
654-
655672 }
656673}
0 commit comments