1111package com .linkedin .xinfra .monitor .services ;
1212
1313import com .linkedin .xinfra .monitor .common .Utils ;
14- import com .linkedin .xinfra .monitor .services .configs .MultiClusterTopicManagementServiceConfig ;
15- import com .linkedin .xinfra .monitor .topicfactory .TopicFactory ;
1614import com .linkedin .xinfra .monitor .services .configs .CommonServiceConfig ;
15+ import com .linkedin .xinfra .monitor .services .configs .MultiClusterTopicManagementServiceConfig ;
1716import com .linkedin .xinfra .monitor .services .configs .TopicManagementServiceConfig ;
17+ import com .linkedin .xinfra .monitor .topicfactory .TopicFactory ;
1818import java .io .IOException ;
1919import java .util .ArrayList ;
2020import java .util .Collection ;
2323import java .util .HashSet ;
2424import java .util .List ;
2525import java .util .Map ;
26+ import java .util .Optional ;
2627import java .util .Properties ;
2728import java .util .Random ;
2829import java .util .Set ;
3132import java .util .concurrent .Executors ;
3233import java .util .concurrent .ScheduledExecutorService ;
3334import java .util .concurrent .TimeUnit ;
35+ import java .util .concurrent .TimeoutException ;
3436import java .util .concurrent .atomic .AtomicBoolean ;
3537import kafka .admin .AdminUtils ;
3638import kafka .admin .BrokerMetadata ;
3739import kafka .server .ConfigType ;
3840import kafka .zk .KafkaZkClient ;
3941import org .apache .kafka .clients .admin .AdminClient ;
4042import org .apache .kafka .clients .admin .AdminClientConfig ;
43+ import org .apache .kafka .clients .admin .AlterPartitionReassignmentsResult ;
4144import org .apache .kafka .clients .admin .CreatePartitionsResult ;
4245import org .apache .kafka .clients .admin .ElectPreferredLeadersResult ;
46+ import org .apache .kafka .clients .admin .NewPartitionReassignment ;
4347import org .apache .kafka .clients .admin .NewPartitions ;
4448import org .apache .kafka .clients .admin .NewTopic ;
4549import org .apache .kafka .clients .admin .TopicDescription ;
5458import org .slf4j .Logger ;
5559import org .slf4j .LoggerFactory ;
5660import scala .Option$ ;
61+ import scala .collection .JavaConverters ;
5762import scala .collection .Seq ;
5863
5964
@@ -184,7 +189,7 @@ public void run() {
184189 TopicManagementHelper helper = entry .getValue ();
185190 try {
186191 helper .maybeReassignPartitionAndElectLeader ();
187- } catch (IOException | KafkaException e ) {
192+ } catch (KafkaException e ) {
188193 LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
189194 }
190195 }
@@ -405,7 +410,7 @@ private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedEx
405410 return brokers ;
406411 }
407412
408- void maybeReassignPartitionAndElectLeader () throws Exception {
413+ void maybeReassignPartitionAndElectLeader () throws ExecutionException , InterruptedException , TimeoutException {
409414 try (KafkaZkClient zkClient = KafkaZkClient .apply (_zkConnect , JaasUtils .isZkSecurityEnabled (),
410415 Utils .ZK_SESSION_TIMEOUT_MS , Utils .ZK_CONNECTION_TIMEOUT_MS ,
411416 Integer .MAX_VALUE , Time .SYSTEM , METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
@@ -426,13 +431,12 @@ void maybeReassignPartitionAndElectLeader() throws Exception {
426431 "Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster." ,
427432 _replicationFactor , currentReplicationFactor , _topic );
428433
429- if (expectedReplicationFactor > currentReplicationFactor && ! zkClient
430- . reassignPartitionsInProgress ()) {
434+ if (expectedReplicationFactor > currentReplicationFactor
435+ && Utils . ongoingPartitionReassignments ( _adminClient ). isEmpty ()) {
431436 LOGGER .info (
432437 "MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
433438 + "from {} to {}" , _topic , currentReplicationFactor , expectedReplicationFactor );
434- reassignPartitions (zkClient , brokers , _topic , partitionInfoList .size (),
435- expectedReplicationFactor );
439+ reassignPartitions (_adminClient , brokers , _topic , partitionInfoList .size (), expectedReplicationFactor );
436440 partitionReassigned = true ;
437441 }
438442
@@ -450,19 +454,16 @@ void maybeReassignPartitionAndElectLeader() throws Exception {
450454 zkClient .setOrCreateEntityConfigs (ConfigType .Topic (), _topic , expectedProperties );
451455 }
452456
453- if (partitionInfoList .size () >= brokers .size () &&
454- someBrokerNotPreferredLeader (partitionInfoList , brokers ) && !zkClient
455- .reassignPartitionsInProgress ()) {
456- LOGGER .info ("{} will reassign partitions of the topic {} in cluster." ,
457- this .getClass ().toString (), _topic );
458- reassignPartitions (zkClient , brokers , _topic , partitionInfoList .size (),
459- expectedReplicationFactor );
457+ if (partitionInfoList .size () >= brokers .size () && someBrokerNotPreferredLeader (partitionInfoList , brokers )
458+ && Utils .ongoingPartitionReassignments (_adminClient ).isEmpty ()) {
459+ LOGGER .info ("{} will reassign partitions of the topic {} in cluster." , this .getClass ().toString (), _topic );
460+ reassignPartitions (_adminClient , brokers , _topic , partitionInfoList .size (), expectedReplicationFactor );
460461 partitionReassigned = true ;
461462 }
462463
463464 if (partitionInfoList .size () >= brokers .size () &&
464465 someBrokerNotElectedLeader (partitionInfoList , brokers )) {
465- if (!partitionReassigned || ! zkClient . reassignPartitionsInProgress ()) {
466+ if (!partitionReassigned || Utils . ongoingPartitionReassignments ( _adminClient ). isEmpty ()) {
466467 LOGGER .info (
467468 "MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
468469 + "cluster." , _topic
@@ -483,7 +484,7 @@ void maybeElectLeader() throws Exception {
483484
484485 try (KafkaZkClient zkClient = KafkaZkClient .apply (_zkConnect , JaasUtils .isZkSecurityEnabled (), Utils .ZK_SESSION_TIMEOUT_MS ,
485486 Utils .ZK_CONNECTION_TIMEOUT_MS , Integer .MAX_VALUE , Time .SYSTEM , METRIC_GROUP_NAME , "SessionExpireListener" , null )) {
486- if (! zkClient . reassignPartitionsInProgress ()) {
487+ if (Utils . ongoingPartitionReassignments ( _adminClient ). isEmpty ()) {
487488 List <TopicPartitionInfo > partitionInfoList = _adminClient
488489 .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
489490 LOGGER .info (
@@ -495,8 +496,7 @@ void maybeElectLeader() throws Exception {
495496 }
496497 }
497498
498- private void triggerPreferredLeaderElection (List <TopicPartitionInfo > partitionInfoList , String partitionTopic )
499- throws ExecutionException , InterruptedException {
499+ private void triggerPreferredLeaderElection (List <TopicPartitionInfo > partitionInfoList , String partitionTopic ) {
500500 Collection <TopicPartition > partitions = new HashSet <>();
501501 for (TopicPartitionInfo javaPartitionInfo : partitionInfoList ) {
502502 partitions .add (new TopicPartition (partitionTopic , javaPartitionInfo .partition ()));
@@ -506,37 +506,54 @@ private void triggerPreferredLeaderElection(List<TopicPartitionInfo> partitionIn
506506 LOGGER .info ("{}: triggerPreferredLeaderElection - {}" , this .getClass ().toString (), electPreferredLeadersResult .all ());
507507 }
508508
509- private static void reassignPartitions (KafkaZkClient zkClient , Collection <Node > brokers , String topic , int partitionCount , int replicationFactor ) {
510- scala .collection .mutable .ArrayBuffer <BrokerMetadata > brokersMetadata = new scala .collection .mutable .ArrayBuffer <>(brokers .size ());
509+ private static void reassignPartitions (AdminClient adminClient , Collection <Node > brokers , String topic ,
510+ int partitionCount , int replicationFactor ) {
511+
512+ scala .collection .mutable .ArrayBuffer <BrokerMetadata > brokersMetadata =
513+ new scala .collection .mutable .ArrayBuffer <>(brokers .size ());
511514 for (Node broker : brokers ) {
512515 brokersMetadata .$plus$eq (new BrokerMetadata (broker .id (), Option$ .MODULE$ .apply (broker .rack ())));
513516 }
514517 scala .collection .Map <Object , Seq <Object >> assignedReplicas =
515518 AdminUtils .assignReplicasToBrokers (brokersMetadata , partitionCount , replicationFactor , 0 , 0 );
516-
517- scala . collection . immutable . Map < TopicPartition , Seq < Object >> newAssignment = new scala .collection .immutable .HashMap <>();
519+ scala . collection . immutable . Map < TopicPartition , Seq < Object >> newAssignment =
520+ new scala .collection .immutable .HashMap <>();
518521 scala .collection .Iterator <scala .Tuple2 <Object , scala .collection .Seq <Object >>> it = assignedReplicas .iterator ();
519522 while (it .hasNext ()) {
520523 scala .Tuple2 <Object , scala .collection .Seq <Object >> scalaTuple = it .next ();
521524 TopicPartition tp = new TopicPartition (topic , (Integer ) scalaTuple ._1 );
522525 newAssignment = newAssignment .$plus (new scala .Tuple2 <>(tp , scalaTuple ._2 ));
523526 }
524527
525- scala .collection .immutable .Set <String > topicList = new scala .collection .immutable .Set .Set1 <>(topic );
526- scala .collection .Map <Object , scala .collection .Seq <Object >> currentAssignment =
527- zkClient .getPartitionAssignmentForTopics (topicList ).apply (topic );
528- String currentAssignmentJson = formatAsNewReassignmentJson (topic , currentAssignment );
529528 String newAssignmentJson = formatAsNewReassignmentJson (topic , assignedReplicas );
530-
531529 LOGGER .info ("Reassign partitions for topic " + topic );
532- LOGGER .info ("Current partition replica assignment " + currentAssignmentJson );
533- LOGGER .info ("New partition replica assignment " + newAssignmentJson );
534- zkClient .createPartitionReassignment (newAssignment );
530+ LOGGER .info ("New topic partition replica assignments: {}" , newAssignmentJson );
531+
532+ Set <Map .Entry <TopicPartition , Seq <Object >>> newAssignmentMap =
533+ scala .collection .JavaConverters .mapAsJavaMap (newAssignment ).entrySet ();
534+ Map <TopicPartition , Optional <NewPartitionReassignment >> reassignments = new HashMap <>();
535+ for (Map .Entry <TopicPartition , Seq <Object >> topicPartitionSeqEntry : newAssignmentMap ) {
536+ List <Integer > targetReplicas = new ArrayList <>();
537+ List <Object > replicas = JavaConverters .seqAsJavaList (topicPartitionSeqEntry .getValue ());
538+ for (Object replica : replicas ) {
539+ targetReplicas .add ((int ) replica );
540+ }
541+ NewPartitionReassignment newPartitionReassignment = new NewPartitionReassignment (targetReplicas );
542+ reassignments .put (topicPartitionSeqEntry .getKey (), Optional .of (newPartitionReassignment ));
543+ }
544+ AlterPartitionReassignmentsResult alterPartitionReassignmentsResult =
545+ adminClient .alterPartitionReassignments (reassignments );
546+ try {
547+ alterPartitionReassignmentsResult .all ().get ();
548+ } catch (InterruptedException | ExecutionException e ) {
549+ LOGGER .error ("An exception occurred while altering the partition reassignments for {}" , topic , e );
550+ }
535551 }
536552
537553 static int getReplicationFactor (List <TopicPartitionInfo > partitionInfoList ) {
538- if (partitionInfoList .isEmpty ())
554+ if (partitionInfoList .isEmpty ()) {
539555 throw new RuntimeException ("Partition list is empty." );
556+ }
540557
541558 int replicationFactor = partitionInfoList .get (0 ).replicas ().size ();
542559 for (TopicPartitionInfo partitionInfo : partitionInfoList ) {
0 commit comments