Skip to content

Commit d9a5625

Browse files
author
ardizioa
committed
Implement Application event in order to subscribe kafka consumers
1 parent 473b86a commit d9a5625

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed
Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,35 @@
11
package com.aardizio;
22

3+
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.springframework.beans.factory.annotation.Qualifier;
35
import org.springframework.boot.SpringApplication;
46
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.boot.context.event.ApplicationReadyEvent;
8+
import org.springframework.context.ApplicationListener;
59
import org.springframework.web.reactive.config.EnableWebFlux;
610

11+
import reactor.kafka.receiver.KafkaReceiver;
712
import springfox.documentation.swagger2.annotations.EnableSwagger2;
813

914
@SpringBootApplication
1015
@EnableSwagger2
1116
@EnableWebFlux
12-
public class SpringReactiveApplication {
17+
public class SpringReactiveApplication implements ApplicationListener<ApplicationReadyEvent>{
18+
19+
@Qualifier("simpleConsumer")
20+
@Autowired
21+
private KafkaReceiver<String,String> receiver;
22+
1323

1424
public static void main(String[] args) {
1525
SpringApplication.run(SpringReactiveApplication.class, args);
1626
}
27+
28+
public void onApplicationEvent(ApplicationReadyEvent event){
29+
receiver.receive().subscribe(r -> {
30+
System.out.printf("Received message: %s\n", r);
31+
r.receiverOffset().acknowledge();
32+
});
33+
}
34+
1735
}

src/main/java/com/aardizio/config/KafkaConsumerConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ public KafkaReceiver<String,String> simpleConsumer() {
3737
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
3838
.subscription(Collections.singleton("prova"));
3939
KafkaReceiver<String,String> receiver = KafkaReceiver.create(receiverOptions);
40-
receiver.receive().subscribe(r -> {
41-
System.out.printf("Received message: %s\n", r);
42-
r.receiverOffset().acknowledge();
43-
});
4440
return receiver;
4541
}
4642

src/main/java/com/aardizio/config/KafkaProducerConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,4 @@ public KafkaSender<String,String> simpleProducer(){
3737
}
3838

3939

40-
41-
4240
}

0 commit comments

Comments
 (0)