Skip to content

Commit 2fa6df8

Browse files
author
Lincong Li
authored
Add retry logic to the produce and consume services (#342)
1 parent 7f99c09 commit 2fa6df8

File tree

4 files changed

+104
-29
lines changed

4 files changed

+104
-29
lines changed

src/main/java/com/linkedin/xinfra/monitor/common/Utils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.ByteArrayOutputStream;
1717
import java.io.IOException;
1818
import java.lang.management.ManagementFactory;
19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Collections;
2122
import java.util.List;
@@ -276,4 +277,12 @@ public static List<MbeanAttributeValue> getMBeanAttributeValues(String mbeanExpr
276277
}
277278
return values;
278279
}
280+
281+
public static void delay(Duration duration) {
282+
try {
283+
Thread.sleep(duration.toMillis());
284+
} catch (InterruptedException e) {
285+
LOG.warn("While trying to sleep for {} millis. Got:", duration.toMillis(), e);
286+
}
287+
}
279288
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
3+
* file except in compliance with the License. You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
*/
10+
11+
package com.linkedin.xinfra.monitor.services;
12+
13+
import com.linkedin.xinfra.monitor.common.Utils;
14+
import java.time.Duration;
15+
import java.util.Collections;
16+
import java.util.Map;
17+
import java.util.concurrent.ExecutionException;
18+
import org.apache.kafka.clients.admin.AdminClient;
19+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
20+
import org.apache.kafka.clients.admin.TopicDescription;
21+
import org.apache.kafka.common.KafkaFuture;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
26+
public abstract class AbstractService implements Service {
27+
28+
private static final Logger LOG = LoggerFactory.getLogger(AbstractService.class);
29+
// Below fields are used for the topic description retry logic since sometimes it takes a while for the admin clint
30+
// to discover a topic due to the fact that Kafka's metadata is eventually consistent. The retry logic is particularly
31+
// helpful to avoid exceptions when a new topic gets created since it takes even longer for the admin client to discover
32+
// the newly created topic
33+
private final int _describeTopicRetries;
34+
private final Duration _describeTopicRetryInterval;
35+
36+
AbstractService(int describeTopicRetries, Duration describeTopicRetryInterval) {
37+
if (describeTopicRetries < 1) {
38+
throw new IllegalArgumentException("Expect retry greater 0. Got: " + describeTopicRetries);
39+
}
40+
_describeTopicRetries = describeTopicRetries;
41+
_describeTopicRetryInterval = describeTopicRetryInterval;
42+
}
43+
44+
TopicDescription getTopicDescription(AdminClient adminClient, String topic) {
45+
int attemptCount = 0;
46+
TopicDescription topicDescription = null;
47+
Exception exception = null;
48+
49+
while (attemptCount < _describeTopicRetries) {
50+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
51+
Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
52+
KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(topic);
53+
topicDescription = null;
54+
exception = null;
55+
try {
56+
topicDescription = topicDescriptionKafkaFuture.get();
57+
} catch (InterruptedException | ExecutionException e) {
58+
exception = e;
59+
}
60+
if (exception != null) {
61+
LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {} at attempt {}", topic,
62+
attemptCount, exception);
63+
} else if (topicDescription == null) {
64+
LOG.warn("Got null description for topic {} at attempt {}", topic, attemptCount);
65+
} else {
66+
return topicDescription;
67+
}
68+
attemptCount++;
69+
if (attemptCount < _describeTopicRetries) {
70+
Utils.delay(_describeTopicRetryInterval);
71+
}
72+
}
73+
74+
if (exception != null) {
75+
throw new IllegalStateException(exception);
76+
} else {
77+
throw new IllegalStateException(String.format("Got null description for topic %s after %d retry(s)", topic, _describeTopicRetries));
78+
}
79+
}
80+
}

src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import com.linkedin.xinfra.monitor.services.metrics.CommitAvailabilityMetrics;
1818
import com.linkedin.xinfra.monitor.services.metrics.CommitLatencyMetrics;
1919
import com.linkedin.xinfra.monitor.services.metrics.ConsumeMetrics;
20+
import java.time.Duration;
2021
import java.util.ArrayList;
21-
import java.util.Collections;
2222
import java.util.HashMap;
2323
import java.util.List;
2424
import java.util.Map;
@@ -28,24 +28,21 @@
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import org.apache.avro.generic.GenericRecord;
3030
import org.apache.kafka.clients.admin.AdminClient;
31-
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3231
import org.apache.kafka.clients.admin.TopicDescription;
3332
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3433
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
35-
import org.apache.kafka.common.KafkaFuture;
3634
import org.apache.kafka.common.MetricName;
3735
import org.apache.kafka.common.TopicPartition;
3836
import org.apache.kafka.common.metrics.JmxReporter;
3937
import org.apache.kafka.common.metrics.MetricConfig;
4038
import org.apache.kafka.common.metrics.Metrics;
4139
import org.apache.kafka.common.metrics.MetricsReporter;
42-
import org.apache.kafka.common.metrics.Sensor;
4340
import org.apache.kafka.common.metrics.stats.CumulativeSum;
4441
import org.apache.kafka.common.utils.SystemTime;
4542
import org.slf4j.Logger;
4643
import org.slf4j.LoggerFactory;
4744

48-
public class ConsumeService implements Service {
45+
public class ConsumeService extends AbstractService {
4946
private static final Logger LOG = LoggerFactory.getLogger(ConsumeService.class);
5047
private static final String TAGS_NAME = "name";
5148
private static final long COMMIT_TIME_INTERVAL = 4;
@@ -83,6 +80,8 @@ public ConsumeService(String name,
8380
CompletableFuture<Void> topicPartitionResult,
8481
ConsumerFactory consumerFactory)
8582
throws ExecutionException, InterruptedException {
83+
// TODO: Make values of below fields come from configs
84+
super(10, Duration.ofMinutes(1));
8685
_baseConsumer = consumerFactory.baseConsumer();
8786
_latencySlaMs = consumerFactory.latencySlaMs();
8887
_name = name;
@@ -231,19 +230,10 @@ public synchronized void start() {
231230
_consumeThread.start();
232231
LOG.info("{}/ConsumeService started.", _name);
233232

234-
Sensor topicPartitionCount = metrics.sensor("topic-partitions");
235-
DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singleton(_topic));
236-
Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
237-
KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(_topic);
238-
TopicDescription topicDescription = null;
239-
try {
240-
topicDescription = topicDescriptionKafkaFuture.get();
241-
} catch (InterruptedException | ExecutionException e) {
242-
LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {}", _topic, e);
243-
}
233+
TopicDescription topicDescription = getTopicDescription(_adminClient, _topic);
244234
@SuppressWarnings("ConstantConditions")
245235
double partitionCount = topicDescription.partitions().size();
246-
topicPartitionCount.add(
236+
metrics.sensor("topic-partitions").add(
247237
new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.",
248238
tags), new CumulativeSum(partitionCount));
249239
}

src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.linkedin.xinfra.monitor.producer.NewProducer;
1717
import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig;
1818
import com.linkedin.xinfra.monitor.services.metrics.ProduceMetrics;
19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Collections;
2122
import java.util.HashMap;
@@ -36,9 +37,7 @@
3637
import org.apache.kafka.clients.admin.TopicDescription;
3738
import org.apache.kafka.clients.producer.ProducerConfig;
3839
import org.apache.kafka.clients.producer.RecordMetadata;
39-
import org.apache.kafka.common.KafkaFuture;
4040
import org.apache.kafka.common.config.ConfigException;
41-
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4241
import org.apache.kafka.common.metrics.JmxReporter;
4342
import org.apache.kafka.common.metrics.MetricConfig;
4443
import org.apache.kafka.common.metrics.Metrics;
@@ -48,7 +47,7 @@
4847
import org.slf4j.LoggerFactory;
4948

5049
@SuppressWarnings("rawtypes")
51-
public class ProduceService implements Service {
50+
public class ProduceService extends AbstractService {
5251
private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class);
5352
private static final String[] NON_OVERRIDABLE_PROPERTIES = new String[]{
5453
ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -78,6 +77,8 @@ public class ProduceService implements Service {
7877
private static final String KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer";
7978

8079
public ProduceService(Map<String, Object> props, String name) throws Exception {
80+
// TODO: Make values of below fields come from configs
81+
super(10, Duration.ofMinutes(1));
8182
_name = name;
8283
ProduceServiceConfig config = new ProduceServiceConfig(props);
8384
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
@@ -156,16 +157,11 @@ private void initializeProducer(Map<String, Object> props) throws Exception {
156157
@Override
157158
public synchronized void start() {
158159
if (_running.compareAndSet(false, true)) {
159-
try {
160-
KafkaFuture<Map<String, TopicDescription>> topicDescriptionsFuture = _adminClient.describeTopics(Collections.singleton(_topic)).all();
161-
Map<String, TopicDescription> topicDescriptions = topicDescriptionsFuture.get();
162-
int partitionNum = topicDescriptions.get(_topic).partitions().size();
163-
initializeStateForPartitions(partitionNum);
164-
_handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS);
165-
LOG.info("{}/ProduceService started", _name);
166-
} catch (InterruptedException | UnknownTopicOrPartitionException | ExecutionException e) {
167-
LOG.error("Exception occurred while starting produce service for topic: {}", _topic, e);
168-
}
160+
TopicDescription topicDescription = getTopicDescription(_adminClient, _topic);
161+
int partitionNum = topicDescription.partitions().size();
162+
initializeStateForPartitions(partitionNum);
163+
_handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS);
164+
LOG.info("{}/ProduceService started", _name);
169165
}
170166
}
171167

0 commit comments

Comments
 (0)