Skip to content

Commit 12f9d3e

Browse files
authored
Treating threads as daemons (#10)
* treating threads as daemons, set threads names * set thread number * setup gradle release * adjusting id * consumer stop can't hang application * [Gradle Release Plugin] - new version commit: '2.0.6-snapshot'. * version adjusting
1 parent 6860267 commit 12f9d3e

File tree

14 files changed

+129
-41
lines changed

14 files changed

+129
-41
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Kafka Client is a vanilla java library that makes it easy to consume data from k
2020
# Getting Started
2121

2222
```groovy
23-
compile("com.mageddo.rapids-kafka-client:rapids-kafka-client:2.0.4")
23+
compile("com.mageddo.rapids-kafka-client:rapids-kafka-client:2.0.5-snapshot")
2424
```
2525

2626
```java

build.gradle

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
plugins {
22
id "java-library"
33
id "maven"
4+
id 'net.researchgate.release' version '2.8.1'
45
}
56

67
apply from: "nexus.gradle"
@@ -37,3 +38,19 @@ compileTestJava {
3738
sourceCompatibility = JavaVersion.VERSION_11
3839
targetCompatibility = JavaVersion.VERSION_11
3940
}
41+
42+
release {
43+
project.ext.set("release.useAutomaticVersion", true)
44+
git {
45+
requireBranch = ''
46+
}
47+
failOnCommitNeeded = false
48+
failOnPublishNeeded = false
49+
failOnUnversionedFiles = false
50+
buildTasks = []
51+
}
52+
53+
confirmReleaseVersion.doLast {
54+
def f = file("${project.projectDir}/README.md")
55+
f.text = f.text.replaceAll("(:\\ )*\\d+\\.\\d+\\.\\d+", "\$1${version}")
56+
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
projectUrl=https://github.com/mageddo-projects/kafka-client
22
group=com.mageddo.rapids-kafka-client
3-
version=2.0.4
3+
version=2.0.6
44
org.gradle.daemon=false

src/main/java/com/mageddo/kafka/client/BatchConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class BatchConsumer<K, V> extends DefaultConsumer<K, V> {
2020

2121
private final Consumer<K, V> consumer;
2222
private final ConsumerConfig<K, V> consumerConfig;
23+
private final int number;
2324

2425
@Override
2526
protected void consume(ConsumerRecords<K, V> records) {
@@ -78,6 +79,11 @@ protected ConsumerConfig<K, V> consumerConfig() {
7879
return this.consumerConfig;
7980
}
8081

82+
@Override
83+
protected int getNumber() {
84+
return this.number;
85+
}
86+
8187
private void commitFirstRecord(Consumer<K, V> consumer, ConsumerRecords<K, V> records, TopicPartition partition) {
8288
final ConsumerRecord<K, V> firstRecord = getFirstRecord(records, partition);
8389
if (firstRecord != null) {

src/main/java/com/mageddo/kafka/client/ConsumerController.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void consume(ConsumerConfig<K, V> consumerConfig) {
7575
this.checkReasonablePollInterval(consumerConfig);
7676

7777
for (int i = 0; i < threads; i++) {
78-
final ThreadConsumer<K, V> consumer = this.getInstance(this.create(consumerConfig), consumerConfig);
78+
final ThreadConsumer<K, V> consumer = this.getInstance(this.create(consumerConfig), consumerConfig, i);
7979
consumer.start();
8080
}
8181
log.info(
@@ -84,11 +84,11 @@ public void consume(ConsumerConfig<K, V> consumerConfig) {
8484
);
8585
}
8686

87-
ThreadConsumer<K, V> getInstance(Consumer<K, V> consumer, ConsumerConfig<K, V> consumerConfig) {
87+
ThreadConsumer<K, V> getInstance(Consumer<K, V> consumer, ConsumerConfig<K, V> consumerConfig, int number) {
8888
if (consumerConfig.batchCallback() != null) {
89-
return this.bindInstance(new BatchConsumer<>(consumer, consumerConfig));
89+
return this.bindInstance(new BatchConsumer<>(consumer, consumerConfig, number));
9090
}
91-
return this.bindInstance(new RecordConsumer<>(consumer, consumerConfig));
91+
return this.bindInstance(new RecordConsumer<>(consumer, consumerConfig, number));
9292
}
9393

9494
private ThreadConsumer<K, V> bindInstance(ThreadConsumer<K, V> consumer) {

src/main/java/com/mageddo/kafka/client/ConsumerStarter.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5-
import java.util.concurrent.ExecutionException;
65
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.Executors;
8-
import java.util.concurrent.Future;
6+
import java.util.concurrent.atomic.AtomicInteger;
97
import java.util.stream.Collectors;
108

9+
import com.mageddo.kafka.client.internal.Threads;
10+
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

@@ -102,33 +102,23 @@ public void stop() {
102102
}
103103
this.stopped = true;
104104
log.info("status=stopping-consumers, toStop={}", this.factories.size());
105-
final ExecutorService executorService = Executors.newScheduledThreadPool(5);
106-
try {
107-
final List<Future<String>> futures = new ArrayList<>();
108-
for (ConsumerController<?, ?> factory : this.factories) {
109-
futures.add(executorService.submit(() -> {
110-
try {
111-
factory.close();
112-
return factory.toString();
113-
} catch (Exception e) {
114-
log.warn("status=failed-to-stop-consumer, consumer={}", factory);
115-
return String.format("failed to stop: %s, %s", factory.toString(), e.getMessage());
116-
}
117-
}));
118-
}
119-
int stopped = 0;
120-
for (Future<String> future : futures) {
105+
final ExecutorService executorService = Threads.createPool(5);
106+
final AtomicInteger stopped = new AtomicInteger(1);
107+
for (int i = 0; i < this.factories.size(); i++) {
108+
final ConsumerController<?, ?> factory = this.factories.get(i);
109+
executorService.submit(() -> {
121110
try {
122-
final String id = future.get();
123-
log.info("status=stopped, {} of {}, factory={}", ++stopped, this.factories.size(), id);
124-
} catch (InterruptedException | ExecutionException e) {
125-
throw new RuntimeException(e);
111+
factory.close();
112+
log.info(
113+
"status=stopped, {} of {}, factory={}",
114+
stopped.getAndIncrement(), this.factories.size(), factory
115+
);
116+
return factory.toString();
117+
} catch (Exception e) {
118+
log.warn("status=failed-to-stop-consumer, consumer={}", factory);
119+
return String.format("failed to stop: %s, %s", factory.toString(), e.getMessage());
126120
}
127-
}
128-
} finally {
129-
executorService.shutdown();
130-
log.info("status=consumers-stopped, count={}", this.factories.size());
131-
121+
});
132122
}
133123
}
134124

src/main/java/com/mageddo/kafka/client/DefaultConsumer.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.atomic.AtomicBoolean;
77

88
import com.mageddo.kafka.client.internal.ObjectsUtils;
9+
import com.mageddo.kafka.client.internal.Threads;
910

1011
import org.apache.kafka.clients.consumer.Consumer;
1112
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -18,6 +19,7 @@
1819
import lombok.extern.slf4j.Slf4j;
1920

2021
import static com.mageddo.kafka.client.DefaultConsumingConfig.DEFAULT_POLL_TIMEOUT;
22+
import static com.mageddo.kafka.client.internal.StringUtils.clearNonAlpha;
2123

2224
@Slf4j
2325
public abstract class DefaultConsumer<K, V> implements ThreadConsumer {
@@ -34,14 +36,17 @@ public abstract class DefaultConsumer<K, V> implements ThreadConsumer {
3436

3537
protected abstract ConsumerConfig<K, V> consumerConfig();
3638

39+
protected abstract int getNumber();
40+
3741
@Override
3842
public void start() {
3943
if (started.get()) {
4044
log.warn("status=already-started, thread={}, config={}", this.id(), this.consumerConfig());
4145
return;
4246
}
4347
final Consumer<K, V> consumer = consumer();
44-
this.executor = new Thread(() -> {
48+
49+
this.executor = newThread(() -> {
4550
this.poll(consumer, consumerConfig());
4651
});
4752
this.executor.start();
@@ -100,7 +105,7 @@ public void close() {
100105

101106
@Override
102107
public String id() {
103-
return String.format("%d-%s_%s", this.executor.getId(), this.executor.getName(), this.consumerConfig());
108+
return String.format("%d-%s", this.executor.getId(), this.executor.getName());
104109
}
105110

106111
public boolean isClosed() {
@@ -158,6 +163,19 @@ void doRecoverWhenAvailable(RecoverContext<K, V> ctx, RecoverCallback<K, V> reco
158163
}
159164

160165
Exception getConsumerError() {
161-
return consumerError;
166+
return this.consumerError;
167+
}
168+
169+
private Thread newThread(Runnable r) {
170+
return Threads.newThread(r, this.createThreadId());
162171
}
172+
173+
private String createThreadId() {
174+
return String.format(
175+
"%s-%d",
176+
clearNonAlpha(consumerConfig().groupId()),
177+
this.getNumber()
178+
);
179+
}
180+
163181
}

src/main/java/com/mageddo/kafka/client/RecordConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class RecordConsumer<K, V> extends DefaultConsumer<K, V> {
1919

2020
private final Consumer<K, V> consumer;
2121
private final ConsumerConfig<K, V> consumerConfig;
22+
private final int number;
2223

2324
@Override
2425
protected void consume(ConsumerRecords<K, V> records) {
@@ -89,6 +90,11 @@ protected ConsumerConfig<K, V> consumerConfig() {
8990
return this.consumerConfig;
9091
}
9192

93+
@Override
94+
protected int getNumber() {
95+
return this.number;
96+
}
97+
9298
RetryPolicy getRetryPolicy() {
9399
return ObjectsUtils.firstNonNull(this.consumerConfig.retryPolicy(), DEFAULT_RETRY_STRATEGY);
94100
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.mageddo.kafka.client.internal;
2+
3+
import lombok.AccessLevel;
4+
import lombok.NoArgsConstructor;
5+
6+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
7+
public class StringUtils {
8+
public static String clearNonAlpha(String text) {
9+
if (text == null) {
10+
return null;
11+
}
12+
return text.replaceAll("^[a-zA-Z_\\-.]", "");
13+
}
14+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.mageddo.kafka.client.internal;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
6+
import lombok.AccessLevel;
7+
import lombok.NoArgsConstructor;
8+
9+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
10+
public class Threads {
11+
public static ExecutorService createPool(int size) {
12+
return Executors.newFixedThreadPool(size, r -> {
13+
Thread t = Executors.defaultThreadFactory()
14+
.newThread(r);
15+
t.setDaemon(true);
16+
return t;
17+
});
18+
}
19+
20+
public static Thread newThread(Runnable r) {
21+
return newThread(r, null);
22+
}
23+
24+
public static Thread newThread(Runnable r, String id) {
25+
final Thread t = new Thread(r);
26+
if (id != null) {
27+
t.setName(id);
28+
}
29+
t.setDaemon(true);
30+
return t;
31+
}
32+
}

0 commit comments

Comments
 (0)