Kafka中的消息确认(acknowledgment)是通过消费者与Kafka集群之间的交互来实现的。当消费者处理完一个消息后,它会向Kafka发送一个确认信号,表明该消息已经被成功处理。这样可以确保消息不会被重复消费,并且可以帮助Kafka跟踪消费者的处理进度。
Kafka支持两种消息确认机制:
props中设置enable.auto.commit属性为true。例如:Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "true"); 需要注意的是,自动确认模式不适用于需要确保消息不被重复消费的场景。
Consumer接口的acknowledge()方法来实现。要使用手动确认,你需要在消费者的props中设置enable.auto.commit属性为false,并实现一个AcknowledgingConsumer。例如:Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "false"); // 创建一个AcknowledgingConsumer AcknowledgingConsumer<String, String> acknowledgingConsumer = new AbstractConsumer<String, String>(props) { @Override public void onConsume(Collection<ConsumerRecord<String, String>> records, ConsumerContext context) { for (ConsumerRecord<String, String> record : records) { // 处理消息 System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value()); // 发送确认信号 context.commitSync(); } } }; 在这个例子中,我们在onConsume()方法中处理消息,并在处理完每个消息后调用context.commitSync()来发送确认信号。这样可以确保消息不会被重复消费,并且可以帮助Kafka跟踪消费者的处理进度。