Step 1: Create a New Spring Boot Starter Project
creating a new Spring Boot Starter Project using STS. While configuring the project, select Spring Web, Spring for Apache Kafka, and Spring Boot DevTools as dependencies
Step 2: Enable Kafka in the Main Class
To integrate Apache Kafka with Spring Boot,
package com.dev.spring.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; @SpringBootApplication @EnableKafka public class SpringBoot2ApacheKafkaTestApplication { public static void main(String[] args) { SpringApplication.run(SpringBoot2ApacheKafkaTestApplication.class, args); } }
Step 3: Create a Custom MessageRepository Class
Next, create a MessageRepository class to store incoming messages.
package com.dev.spring.kafka.message.repository; import java.util.ArrayList; import java.util.List; import org.springframework.stereotype.Component; @Component public class MessageRepository { private List<String> list = new ArrayList<>(); public void addMessage(String message) { list.add(message); } public String getAllMessages() { return list.toString(); } }
Step 4: Create a MessageProducer Class
Create a MessageProducer class to send messages to the Kafka topic.
package com.dev.spring.kafka.sender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class MessageProducer { private Logger log = LoggerFactory.getLogger(MessageProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${myapp.kafka.topic}") private String topic; public void sendMessage(String message) { log.info("MESSAGE SENT FROM PRODUCER END -> " + message); kafkaTemplate.send(topic, message); } }
Step 5: Create a MessageConsumer Class
Now, create a MessageConsumer class to consume messages from the Kafka topic.
package com.dev.spring.kafka.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.dev.spring.kafka.message.repository.MessageRepository; @Component public class MessageConsumer { private Logger log = LoggerFactory.getLogger(MessageConsumer.class); @Autowired private MessageRepository messageRepo; @KafkaListener(topics = "${myapp.kafka.topic}", groupId = "xyz") public void consume(String message) { log.info("MESSAGE RECEIVED AT CONSUMER END -> " + message); messageRepo.addMessage(message); } }
Step 6: Create a KafkaRestController Class
Finally, create a KafkaRestController class to handle REST requests for sending and retrieving messages.
package com.dev.spring.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.dev.spring.kafka.message.repository.MessageRepository; import com.dev.spring.kafka.sender.MessageProducer; @RestController public class KafkaRestController { @Autowired private MessageProducer producer; @Autowired private MessageRepository messageRepo; // Send message to Kafka @GetMapping("/send") public String sendMsg(@RequestParam("msg") String message) { producer.sendMessage(message); return "'" + message + "' sent successfully!"; } // Read all messages @GetMapping("/getAll") public String getAllMessages() { return messageRepo.getAllMessages(); } }
Step 7: Create the application.yml File
Lastly, configure your application by creating an application.yml
server: port: 9090 spring: kafka: producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: bootstrap-servers: localhost:9092 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer myapp: kafka: topic: myKafkaTest
Top comments (0)