Kafka消息压缩需在**生产者(Producer)、Broker、消费者(Consumer)**端配置,具体步骤如下:
在Producer的配置文件(如producer.properties)或代码中,添加以下参数:
compression.type=gzip # 可选值:gzip、snappy、lz4、zstd(默认不压缩) # 仅对gzip有效:设置压缩级别(1-9,9为最高压缩比) compression.gzip.level=9 示例(Java代码):
props.put("compression.type", "snappy"); // 指定压缩算法 KafkaProducer<String, String> producer = new KafkaProducer<>(props); 修改Kafka Broker的配置文件server.properties:
# 允许的压缩类型(需与Producer端一致) compression.type=snappy # 可选值:gzip、snappy、lz4、zstd # 可选:设置特定压缩算法的参数(如LZ4级别) compression.codec.lz4.level=9 注意:Broker端通常无需单独配置压缩类型,会默认使用Producer发送的压缩格式,除非需覆盖。
消费者无需手动配置压缩,Kafka会自动解压缩消息。若需验证,可通过以下参数查看压缩状态:
# 打印消息压缩信息(调试用,生产环境不建议) print.offset=true 创建或修改主题时,可指定压缩类型(覆盖全局配置):
# 创建主题时设置压缩 kafka-topics.sh --create --topic compressed-topic --config compression.type=lz4 --bootstrap-server localhost:9092 # 修改已有主题的压缩类型 kafka-topics.sh --alter --topic compressed-topic --config compression.type=zstd --bootstrap-server localhost:9092 使用命令行工具发送和接收消息,观察消息内容是否为压缩格式(如二进制数据):
# 发送压缩消息 kafka-console-producer --broker-list localhost:9092 --topic test-topic --property compression.type=gzip # 接收并查看消息(正常应为解压后的原始内容) kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning 配置参考来源: