Skip to content

Commit d39d91e

Browse files
authored
Builder and default threads fix (#7)
* fixing generics * honoring disabled consumer * respect default consumer * fixing version
1 parent 568ff30 commit d39d91e

File tree

8 files changed

+110
-36
lines changed

8 files changed

+110
-36
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Kafka Client is a vanilla java library that makes it easy to consume data from k
2020
# Getting Started
2121

2222
```groovy
23-
compile("com.mageddo.rapids-kafka-client:rapids-kafka-client:2.0.1")
23+
compile("com.mageddo.rapids-kafka-client:rapids-kafka-client:2.0.2")
2424
```
2525

2626
```java

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
projectUrl=https://github.com/mageddo-projects/kafka-client
22
group=com.mageddo.rapids-kafka-client
3-
version=2.0.1
3+
version=2.0.2
44
org.gradle.daemon=false
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.mageddo.kafka.client;
22

3-
import com.mageddo.kafka.client.ConsumerConfig;
4-
53
public interface Consumer {
6-
ConsumerConfig<?, ?> config();
4+
<K, V> ConsumerConfig<K, V> config();
75
}

src/main/java/com/mageddo/kafka/client/ConsumerConfigDefault.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,31 +192,48 @@ public static void waitFor() {
192192
.join();
193193
}
194194

195-
public static Builder<?, ?> builderOf(ConsumerConfig<?, ?> config) {
196-
return copyToBuilder(config, ConsumerConfigDefault.builder().build());
195+
public static <K, V> Builder<K, V> builderOf(ConsumerConfig<K, V> config) {
196+
return copyToBuilder(config, ConsumerConfigDefault.<K, V>builder().build());
197197
}
198198

199-
public static ConsumerConfigDefault<?, ?> copy(ConsumerConfig<?, ?> primary, ConsumerConfig<?, ?> secondary){
199+
public static <K, V> ConsumerConfigDefault<K, V> copy(ConsumerConfig<K, V> primary, ConsumerConfig<K, V> secondary) {
200200
return copyToBuilder(primary, secondary).build();
201201
}
202202

203-
public static Builder<?, ?> copyToBuilder(ConsumerConfig<?, ?> primary, ConsumerConfig<?, ?> secondary){
204-
final Builder builder = ConsumerConfigDefault.builder();
205-
fillWith(secondary, builder);
206-
fillWith(primary, builder);
203+
public static <K, V> Builder<K, V> copyToBuilder(ConsumerConfig<K, V> primary, ConsumerConfig<K, V> secondary) {
204+
final Builder<K, V> builder = copyToBuilder(
205+
secondary,
206+
ConsumerConfig.<K, V>builder().build(),
207+
ConsumerConfig.builder()
208+
);
209+
return copyToBuilder(primary, secondary, builder);
210+
}
211+
212+
public static <K, V> Builder<K, V> copyToBuilder(
213+
ConsumerConfig<K, V> primary, ConsumerConfig<K, V> secondary, Builder<K, V> builder
214+
) {
215+
fillPropsWith(secondary, builder);
216+
fillPropsWith(primary, builder);
207217
return builder
208218
.callback(firstNonNull(primary.callback(), secondary.callback()))
209219
.batchCallback(firstNonNull(primary.batchCallback(), secondary.batchCallback()))
210-
.topics(primary.topics().isEmpty() ? secondary.topics() : primary.topics())
211-
.consumers(primary.consumers() != CONSUMERS_NOT_SET ? primary.consumers() : secondary.consumers())
220+
.topics(
221+
primary.topics()
222+
.isEmpty() ? secondary.topics() : primary.topics()
223+
)
224+
.consumers(decideConsumerThreads(primary, secondary))
212225
.recoverCallback(firstNonNull(primary.recoverCallback(), secondary.recoverCallback()))
213226
.retryPolicy(firstNonNull(primary.retryPolicy(), secondary.retryPolicy()))
214227
.consumerSupplier(firstNonNull(primary.consumerSupplier(), secondary.consumerSupplier()))
215228
.pollInterval(firstNonNull(primary.pollInterval(), secondary.pollInterval()))
216229
.pollTimeout(firstNonNull(primary.pollTimeout(), secondary.pollTimeout()));
217230
}
218231

219-
private static void fillWith(ConsumerConfig<?, ?> source, Builder target) {
232+
private static <K, V> int decideConsumerThreads(ConsumerConfig<K, V> primary, ConsumerConfig<K, V> secondary) {
233+
return primary.consumers() != CONSUMERS_NOT_SET ? primary.consumers() : secondary.consumers();
234+
}
235+
236+
private static <K, V> void fillPropsWith(ConsumerConfig<K, V> source, Builder<K, V> target) {
220237
source
221238
.props()
222239
.forEach(target::prop);

src/main/java/com/mageddo/kafka/client/ConsumerController.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import lombok.SneakyThrows;
1515
import lombok.extern.slf4j.Slf4j;
1616

17+
import static com.mageddo.kafka.client.ConsumerConfigDefault.CONSUMERS_NOT_SET;
1718
import static com.mageddo.kafka.client.DefaultConsumingConfig.DEFAULT_RETRY_STRATEGY;
1819
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
1920
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
@@ -60,7 +61,10 @@ public void consume(ConsumerConfig<K, V> consumerConfig) {
6061
}
6162
this.started = true;
6263
this.consumerConfig = consumerConfig;
63-
if (consumerConfig.consumers() == Integer.MIN_VALUE) {
64+
final int threads = consumerConfig.consumers() != CONSUMERS_NOT_SET
65+
? consumerConfig.consumers()
66+
: 1;
67+
if (threads == Integer.MIN_VALUE) {
6468
log.info(
6569
"status=disabled-consumer, groupId={}, topics={}",
6670
this.consumerConfig.groupId(),
@@ -70,7 +74,7 @@ public void consume(ConsumerConfig<K, V> consumerConfig) {
7074
}
7175
this.checkReasonablePollInterval(consumerConfig);
7276

73-
for (int i = 0; i < consumerConfig.consumers(); i++) {
77+
for (int i = 0; i < threads; i++) {
7478
final ThreadConsumer<K, V> consumer = this.getInstance(this.create(consumerConfig), consumerConfig);
7579
consumer.start();
7680
}

src/main/java/com/mageddo/kafka/client/ConsumerStarter.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,43 +52,45 @@
5252
* }
5353
* </pre>
5454
*/
55-
public class ConsumerStarter {
55+
public class ConsumerStarter<K, V> {
5656

5757
private static final Logger log = LoggerFactory.getLogger(ConsumerStarter.class);
5858

59-
private final ConsumerConfig<?, ?> config;
59+
private final ConsumerConfig<K, V> config;
6060
private final List<ConsumerController<?, ?>> factories;
6161
private boolean started = false;
6262
private boolean stopped = false;
6363

64-
public ConsumerStarter(ConsumerConfig<?, ?> config) {
64+
public ConsumerStarter(ConsumerConfig<K, V> config) {
6565
this.config = config;
6666
this.factories = new ArrayList<>();
6767
}
6868

69-
public static ConsumerStarter startFromConfig(ConsumerConfig<?, ?> config, List<ConsumerConfig> configs) {
70-
return new ConsumerStarter(config).startFromConfig(configs);
69+
public static <K, V> ConsumerStarter<K, V> startFromConfig(
70+
ConsumerConfig<K, V> config, List<ConsumerConfig<K, V>> configs
71+
) {
72+
return new ConsumerStarter<K, V>(config).startFromConfig(configs);
7173
}
7274

73-
public static ConsumerStarter start(ConsumerConfig<?, ?> config, List<Consumer> consumers) {
74-
return new ConsumerStarter(config).start(consumers);
75+
public static <K, V> ConsumerStarter<K, V> start(ConsumerConfig<K, V> config, List<Consumer> consumers) {
76+
return new ConsumerStarter<K, V>(config).start(consumers);
7577
}
7678

77-
public ConsumerStarter start(List<Consumer> consumers) {
79+
public ConsumerStarter<K, V> start(List<Consumer> consumers) {
7880
this.startFromConfig(consumers
7981
.stream()
80-
.map(Consumer::config)
82+
.map(it -> (ConsumerConfig<K, V>) it.config())
8183
.collect(Collectors.toList())
8284
);
8385
return this;
8486
}
8587

86-
public ConsumerStarter startFromConfig(List<ConsumerConfig> consumers) {
88+
public ConsumerStarter<K, V> startFromConfig(List<ConsumerConfig<K, V>> consumers) {
8789
if (this.started) {
8890
throw new IllegalStateException("ConsumerConfig were already started");
8991
}
9092
this.started = true;
91-
for (final ConsumerConfig config : consumers) {
93+
for (final ConsumerConfig<K, V> config : consumers) {
9294
this.factories.add(this.start(this.buildConsumer(config)));
9395
}
9496
return this;
@@ -128,16 +130,16 @@ public void stop() {
128130
}
129131
}
130132

131-
public void waitFor(){
133+
public void waitFor() {
132134
ConsumerConfigDefault.waitFor();
133135
}
134136

135-
private ConsumerConfigDefault<?, ?> buildConsumer(ConsumerConfig<?, ?> config) {
137+
private ConsumerConfigDefault<K, V> buildConsumer(ConsumerConfig<K, V> config) {
136138
return ConsumerConfigDefault.copy(config, this.config);
137139
}
138140

139-
ConsumerController<?, ?> start(ConsumerConfig<?, ?> consumerConfig) {
140-
final ConsumerController consumerController = new ConsumerController<>();
141+
ConsumerController<K,V> start(ConsumerConfig<K,V> consumerConfig) {
142+
final ConsumerController<K,V> consumerController = new ConsumerController<>();
141143
consumerController.consume(consumerConfig);
142144
return consumerController;
143145
}

src/test/java/com/mageddo/kafka/client/ConsumerControllerTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.time.Duration;
44
import java.util.Collections;
55

6+
import org.apache.kafka.clients.consumer.Consumer;
67
import org.apache.kafka.clients.consumer.MockConsumer;
78
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
89
import org.apache.kafka.common.config.ConfigException;
@@ -23,6 +24,7 @@
2324
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.Mockito.atLeastOnce;
2526
import static org.mockito.Mockito.doReturn;
27+
import static org.mockito.Mockito.mock;
2628
import static org.mockito.Mockito.never;
2729
import static org.mockito.Mockito.spy;
2830
import static org.mockito.Mockito.verify;
@@ -182,4 +184,28 @@ void mustUseDefaultPollTimeoutWhenItsNotSet() {
182184

183185
}
184186

187+
@Test
188+
@SneakyThrows
189+
void mustUseDefaultConsumerThreadsWhenItsNotSet() {
190+
// arrange
191+
final var consumerConfig = ConsumerConfig
192+
.<String, byte[]>builder()
193+
.callback((callbackContext, record) -> System.out.println("nop"))
194+
.build();
195+
196+
doReturn(mock(Consumer.class))
197+
.when(this.consumerController)
198+
.create(any());
199+
doReturn(mock(ThreadConsumer.class))
200+
.when(this.consumerController)
201+
.getInstance(any(), any());
202+
203+
// act
204+
this.consumerController.consume(consumerConfig);
205+
206+
// assert
207+
verify(this.consumerController).getInstance(any(), any());
208+
209+
}
210+
185211
}

src/test/java/com/mageddo/kafka/client/ConsumerStarterTest.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void mustStartConsumerUsingTemplateConfigs() {
7777
.pollTimeout(templatePollTimeout)
7878
.retryPolicy(DEFAULT_RETRY_STRATEGY)
7979
.build();
80-
final var consumerStarter = spy(new ConsumerStarter(templateConfig));
80+
final var consumerStarter = spy(new ConsumerStarter<>(templateConfig));
8181

8282
final var consumerConfig = ConsumerConfig
8383
.builder()
@@ -101,18 +101,16 @@ void mustStartConsumerUsingTemplateConfigs() {
101101
assertEquals(templatePollInterval, actualConfig.pollInterval());
102102
assertEquals(templatePollTimeout, actualConfig.pollTimeout());
103103
assertEquals(1, actualConfig.props().size());
104-
105104
}
106105

107-
108106
@Test
109107
void consumerSpecificConfigMustOverrideTemplateConfigs() {
110108

111109
// arrange
112110
final var templateConfig = ConsumerConfigTemplates.builder()
113111
.prop(GROUP_ID_CONFIG, "my_group_id")
114112
.build();
115-
final var consumerStarter = spy(new ConsumerStarter(templateConfig));
113+
final var consumerStarter = spy(new ConsumerStarter<>(templateConfig));
116114

117115
final var groupId = "customGroupId";
118116
final var pollInterval = Duration.ofSeconds(33);
@@ -126,6 +124,7 @@ void consumerSpecificConfigMustOverrideTemplateConfigs() {
126124
.pollInterval(pollInterval)
127125
.pollTimeout(pollTimeout)
128126
.build();
127+
129128
doReturn(mock(ConsumerController.class))
130129
.when(consumerStarter)
131130
.start(any(ConsumerConfig.class));
@@ -148,4 +147,32 @@ void consumerSpecificConfigMustOverrideTemplateConfigs() {
148147

149148
}
150149

150+
@Test
151+
void mustNotOverrideConsumerThreadsWhenDefaultValueIsDisabled() {
152+
153+
// arrange
154+
final var consumingDisabled = Integer.MIN_VALUE;
155+
final var templateConfig = ConsumerConfigTemplates.builder()
156+
.consumers(consumingDisabled)
157+
.build();
158+
final var consumerStarter = spy(new ConsumerStarter<>(templateConfig));
159+
final var consumers = 333;
160+
final var consumerConfig = ConsumerConfig
161+
.builder()
162+
.consumers(consumers)
163+
.build();
164+
165+
doReturn(mock(ConsumerController.class))
166+
.when(consumerStarter)
167+
.start(any(ConsumerConfig.class));
168+
169+
// act
170+
consumerStarter.startFromConfig(List.of(consumerConfig));
171+
172+
// assert
173+
verify(consumerStarter).start(this.argumentCaptor.capture());
174+
final var actualConfig = this.argumentCaptor.getValue();
175+
assertEquals(consumingDisabled, actualConfig.consumers());
176+
177+
}
151178
}

0 commit comments

Comments
 (0)