以下是Kafka消费者配置的核心参数及说明,关键参数需根据业务场景调整:
bootstrap.servers:必填,Kafka集群地址列表(如host1:port1,host2:port2),建议配置3个以上节点以防单点故障。group.id:必填,消费者组ID,同组消费者共同消费分区,不同组可并行消费同一主题。key.deserializer/value.deserializer:必填,消息键值的反序列化类(如StringDeserializer、IntegerDeserializer)。auto.offset.reset:无偏移量时的消费起点,可选latest(最新消息,默认)、earliest(最早消息)、none(无偏移量则报错)。enable.auto.commit:是否自动提交偏移量,默认true。生产环境建议设为false,手动提交以保证消息不丢失。auto.commit.interval.ms:自动提交间隔(毫秒),仅在enable.auto.commit=true时生效,默认5000。fetch.min.bytes:单次拉取的最小数据量(字节),默认1,增大可减少网络请求次数,但可能增加延迟。fetch.max.bytes:单次拉取的最大数据量(字节),默认50MB,需与消费者内存匹配,避免OOM。max.poll.records:单次poll()返回的最大消息数,默认500。处理慢时调小(如100),避免触发超时Rebalance。max.poll.interval.ms:两次poll()的最大间隔时间(毫秒),默认5分钟。处理耗时长的场景需增大,避免被误判为离线。heartbeat.interval.ms:发送心跳的间隔(毫秒),默认3000,需小于session.timeout.ms的1/3。session.timeout.ms:消费者被视为离线的超时时间(毫秒),默认10秒。网络抖动时需调大,避免频繁Rebalance。partition.assignment.strategy:分区分配策略,默认RangeAssignor(按范围分配)。可选RoundRobinAssignor(轮询分配,均衡负载)或StickyAssignor(粘性分配,减少Rebalance开销)。security.protocol:通信协议,默认PLAINTEXT。生产环境建议用SSL或SASL_SSL。ssl.keystore.location/ssl.truststore.location:SSL证书路径,用于双向认证。isolation.level:事务隔离级别,默认read_uncommitted(可读未提交消息)。如需严格一致性,设为read_committed。client.id:客户端标识,用于监控和日志追踪,建议设置为业务名称。调优建议:
fetch.max.bytes和max.poll.records,调小heartbeat.interval.ms。fetch.min.bytes,调大session.timeout.ms。enable.auto.commit=false),手动控制偏移量提交时机。参数详情可参考官方文档:Kafka Consumer Configs。