Kafka消费者组(Consumer Group)是Kafka中的一个重要概念,它允许一组消费者协同工作,共同消费一个或多个主题(Topic)中的消息。以下是使用Kafka消费者组的基本步骤:
在创建消费者时,需要指定一个消费者组ID。这个ID用于标识一组共享同一个偏移量(Offset)的消费者。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-consumer-group"); // 指定消费者组ID props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
消费者组可以订阅一个或多个主题。订阅主题后,消费者会开始消费这些主题中的消息。
consumer.subscribe(Arrays.asList("my-topic"));
消费者会不断轮询(poll)主题中的消息,并处理这些消息。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
Kafka消费者组会自动管理偏移量。当消费者处理完一批消息后,它会提交这些消息的偏移量,以便下次从正确的位置继续消费。
当不再需要消费者时,应该关闭它以释放资源。
consumer.close();
enable.auto.commit
来控制是否自动提交偏移量。默认情况下,Kafka会自动提交偏移量。props.put("enable.auto.commit", "false"); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息 } consumer.commitSync(); // 手动同步提交偏移量 }
通过以上步骤,你可以有效地使用Kafka消费者组来消费消息。