在 Spring Cloud Kafka 中,可以使用 MessageFilter 来实现消息过滤。MessageFilter 是一个接口,需要实现 apply
方法,该方法接收一个 ConsumerRecord 作为参数,返回一个 boolean 值,表示是否接受该消息。如果返回 true,则消息会被消费;如果返回 false,则消息会被忽略。
要实现消息过滤,你需要按照以下步骤操作:
MessageFilter
接口的类,并实现 apply
方法。在这个方法中,你可以根据需要编写过滤逻辑。例如,你可以根据消息的某个属性来决定是否接受该消息。import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.cloud.kafka.support.KafkaMessageFilter; public class MyMessageFilter implements KafkaMessageFilter<String, String> { @Override public boolean apply(ConsumerRecord<String, String> record) { // 在这里编写过滤逻辑 String value = record.value(); // 例如,只接受 value 包含 "example" 的消息 return value != null && value.contains("example"); } }
MessageFilter
注册到 Kafka 消费者配置中。你需要注入 KafkaMessageFilter
Bean,并将其添加到 ConsumerFactory
的配置中。import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.StringDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { @Autowired private MyMessageFilter myMessageFilter; @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); return new DefaultKafkaConsumerFactory<>(props); } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setEndpoints(new MethodKafkaListenerEndpoint[]{ new MethodKafkaListenerEndpoint( () -> new MyKafkaListener(), "my-topic", true, null, null, String.class, new ErrorHandlingDeserializer<>(new JsonDeserializer<>()) ) }); } @Override public void registerKafkaListenerEndpoint(KafkaListenerEndpointRegistry registry) { registry.registerEndpoints(getKafkaListenerEndpoints()); } private MethodKafkaListenerEndpoint[] getKafkaListenerEndpoints() { // 这里应该返回你在 configureKafkaListeners 方法中注册的 Kafka 监听器端点 return new MethodKafkaListenerEndpoint[0]; } }
现在,你的消费者将只消费 MyMessageFilter
过滤后的消息。当有新的消息到达时,apply
方法会被调用,只有满足过滤条件的消息才会被消费。