在Spring Cloud Kafka中,消费者组是通过消费者配置属性group.id
来定义的。消费者组内的每个消费者实例都必须使用相同的group.id
。当消费者组中的消费者实例数量发生变化时,Kafka会自动重新分配分区给消费者组中的消费者实例。
以下是如何在Spring Cloud Kafka中处理消费者组的步骤:
在项目的pom.xml
文件中添加Spring Cloud Kafka和Kafka客户端的依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
在application.yml
或application.properties
文件中配置Kafka消费者属性,例如:
spring: cloud: kafka: consumer: group-id: my-consumer-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer bootstrap-servers: localhost:9092
在这个例子中,我们定义了一个名为my-consumer-group
的消费者组,并设置了自动偏移重置策略为earliest
,以便在消费者启动时从最早的记录开始消费。同时,我们还配置了键值的反序列化器为StringDeserializer
。
定义一个消费者接口,用于处理接收到的消息:
public interface MyKafkaConsumer { void consume(ConsumerRecord<String, String> record); }
创建一个实现MyKafkaConsumer
接口的类,并在其中使用@KafkaListener
注解来监听特定的主题:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaConsumerImpl implements MyKafkaConsumer { @Override @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") public void consume(ConsumerRecord<String, String> record) { System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } }
在这个例子中,我们使用@KafkaListener
注解来指定监听的主题(my-topic
)和消费者组(my-consumer-group
)。当有新消息到达时,consume
方法将被调用。
启动Spring Boot应用程序,Spring Cloud Kafka将自动处理消费者组并将分区分配给消费者组中的消费者实例。消费者实例将根据其group.id
加入或离开消费者组,并在消费者组发生变化时重新分配分区。