温馨提示×

spring整合kafka如何进行消息转换

小樊
148
2024-12-14 12:55:13
栏目: 大数据

在Spring整合Kafka时,消息转换是一个重要的环节。为了实现消息转换,你可以使用Kafka的MessageConverter接口。Spring提供了两种常用的MessageConverter实现:StringHttpMessageConverterByteArrayHttpMessageConverter。但在Kafka消费者端,我们通常使用KafkaDeserializer来处理消息的反序列化。

以下是一个简单的示例,展示了如何在Spring整合Kafka时进行消息转换:

  1. 首先,创建一个自定义的KafkaDeserializer,用于实现消息的反序列化。在这个例子中,我们将把字符串消息转换为MyMessage对象:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.databind.ObjectMapper; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.io.IOException; public class MyKafkaDeserializer extends ErrorHandlingDeserializer<MyMessage> { private final ObjectMapper objectMapper = new ObjectMapper(); public MyKafkaDeserializer() { this(null); } public MyKafkaDeserializer(String keyDeserializer) { super(new JsonDeserializer<>(MyMessage.class)); } @Override protected MyMessage deserialize(String topic, byte[] data) throws IOException { return objectMapper.readValue(data, MyMessage.class); } } 
  1. 在你的Kafka消费者配置类中,将自定义的MyKafkaDeserializer添加到KafkaListenerEndpointRegistry中:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConcurrentKafkaListenerContainerFactory<String, MyMessage> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, MyMessage> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaDeserializer.class); return props; } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { MethodKafkaListenerEndpoint<String, MyMessage> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setId("myListener"); endpoint.setTopics("my-topic"); endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory()); endpoint.setBean(new MyKafkaConsumer()); registrar.registerEndpoint(endpoint); } @Override public void configureKafkaListenerEndpointRegistrars(KafkaListenerEndpointRegistry registry) { registry.registerEndpoints(configureKafkaListeners(new KafkaListenerEndpointRegistrar())); } } 
  1. 创建一个Kafka消费者监听器类MyKafkaConsumer,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaConsumer { @KafkaListener(id = "myListener", groupId = "my-group") public void listen(MyMessage message) { System.out.println("Received message: " + message); } } 
  1. 最后,确保你的MyMessage类已经正确实现了序列化和反序列化。你可以使用Jackson库来实现这个功能:
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; public class MyMessage { private String content; // Getters and setters public static MyMessage fromJson(String json) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(json, MyMessage.class); } public String toJson() throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(this); } } 

现在,当你的Spring应用从Kafka消费消息时,消息将被转换为MyMessage对象,然后由MyKafkaConsumer处理。

0