温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Springboot如何集成Kafka进行批量消费

发布时间:2021-12-14 14:04:44 来源:亿速云 阅读:1343 作者:iii 栏目:开发技术

本篇内容主要讲解“Springboot如何集成Kafka进行批量消费”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Springboot如何集成Kafka进行批量消费”吧!

引入依赖

<dependency>                 <groupId>org.springframework.kafka</groupId>                 <artifactId>spring-kafka</artifactId>                 <version>1.3.11.RELEASE</version>             </dependency>

因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

Springboot如何集成Kafka进行批量消费

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

创建配置类

/**      * kafka 配置类      */     @Configuration     @EnableKafka     public class KafkaConsumerConfig {         private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);         @Value("${kafka.bootstrap.servers}")         private String kafkaBootstrapServers;         @Value("${kafka.group.id}")         private String kafkaGroupId;         @Value("${kafka.topic}")         private String kafkaTopic;         public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";         public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";         @Bean         public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {             ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();             factory.setConsumerFactory(consumerFactory());             // 设置并发量,小于或者等于 Topic 的分区数             factory.setConcurrency(5);             // 设置为批量监听             factory.setBatchListener(Boolean.TRUE);             factory.getContainerProperties().setPollTimeout(30000);             return factory;         }         public ConsumerFactory<String, String> consumerFactory() {             return new DefaultKafkaConsumerFactory<>(consumerConfigs());         }         public Map<String, Object> consumerConfigs() {             Map<String, Object> props = new HashMap<>();             //设置接入点,请通过控制台获取对应Topic的接入点。             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);             //设置SSL根证书的路径,请记得将XXX修改为自己的路径。             //与SASL路径类似,该文件也不能被打包到jar中。             System.setProperty("java.security.auth.login.config", CONFIG_PATH);             props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);             //根证书存储的密码,保持不变。             props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");             //接入协议,目前支持使用SASL_SSL协议接入。             props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");             //SASL鉴权方式,保持不变。             props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");             // 自动提交             props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);             //两次Poll之间的最大允许间隔。             //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。             props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);             //设置单次拉取的量,走公网访问时,该参数会有较大影响。             props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);             props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);             //每次Poll的最大数量。             //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。             props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);             //消息的反序列化方式。             props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");             props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");             //当前消费实例所属的消费组,请在控制台申请之后填写。             //属于同一个组的消费实例,会负载消费消息。             props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);             //Hostname校验改成空。             props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");             return props;         }     }

注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。

Kafka 消费者

/**      * kafka 消息消费类      */     @Component     public class KafkaMessageListener {         private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);         @KafkaListener(topics = {"${kafka.topic}"})         public void listen(List<ConsumerRecord<String, String>> recordList) {             for (ConsumerRecord<String,String> record : recordList) {                 // 打印消息的分区以及偏移量                 LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());                 String value = record.value();                 System.out.println("value = " + value);                 // 处理业务逻辑 ...             }         }     }

因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。

到此,相信大家对“Springboot如何集成Kafka进行批量消费”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI