# Kafka如何安装并实现单机测试 ## 一、Kafka简介 Apache Kafka是由LinkedIn开发并开源的高性能分布式消息系统,具有以下核心特性: - **高吞吐量**:单机可支持每秒百万级消息处理 - **持久化存储**:消息可持久化到磁盘并支持多副本 - **分布式架构**:天然支持水平扩展 - **低延迟**:消息投递延迟可控制在毫秒级 - **高容错性**:支持自动故障转移 ### 1.1 核心组件 | 组件 | 说明 | |---------------|----------------------------------------------------------------------| | Producer | 消息生产者,负责发布消息到指定Topic | | Consumer | 消息消费者,订阅Topic并处理消息 | | Broker | Kafka服务实例,负责消息存储和转发 | | Topic | 消息类别/主题,逻辑上的消息分类 | | Partition | Topic的物理分片,每个Partition是一个有序、不可变的消息队列 | | Zookeeper | 负责集群元数据管理、Broker选举等协调工作(Kafka 2.8+开始支持去ZK模式)| ## 二、单机环境安装 ### 2.1 环境准备 **系统要求**: - 推荐Linux/MacOS(Windows可能有兼容性问题) - JDK 1.8+(建议OpenJDK 11) - 至少4GB可用内存 - 10GB以上磁盘空间 ```bash # 检查Java环境 java -version
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz tar -xzf kafka_2.13-3.3.1.tgz cd kafka_2.13-3.3.1
目录结构说明:
bin/ # 操作脚本 config/ # 配置文件 libs/ # 依赖库 logs/ # 日志文件(启动后生成)
修改config/server.properties
核心参数:
# Broker唯一标识 broker.id=0 # 监听地址 listeners=PLNTEXT://localhost:9092 # 日志存储目录 log.dirs=/tmp/kafka-logs # ZooKeeper连接地址 zookeeper.connect=localhost:2181 # 自动创建Topic(测试环境建议开启) auto.create.topics.enable=true
Kafka依赖ZooKeeper,单机版可使用内置ZK:
# 后台启动ZooKeeper nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zk.log 2>&1 & # 验证启动 ps aux | grep zookeeper
# 启动Kafka服务 nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 & # 检查是否启动成功 tail -f logs/server.log
成功日志示例:
[KafkaServer id=0] started (kafka.server.KafkaServer)
# 创建Topic(1分区1副本) bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic test-topic # 查看Topic列表 bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 查看Topic详情 bin/kafka-topics.sh --describe \ --topic test-topic \ --bootstrap-server localhost:9092
生产者发送消息:
bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-topic > Hello Kafka > This is a test message
消费者接收消息:
bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic test-topic \ --from-beginning
内置压测工具:
# 生产者性能测试 bin/kafka-producer-perf-test.sh \ --topic perf-test \ --num-records 100000 \ --record-size 1000 \ --throughput 2000 \ --producer-props bootstrap.servers=localhost:9092 # 消费者性能测试 bin/kafka-consumer-perf-test.sh \ --topic perf-test \ --bootstrap-server localhost:9092 \ --messages 100000
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> </dependency>
public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("test-topic", Integer.toString(i), "Message-" + i)); } producer.close(); } }
public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value()); } } } }
端口冲突:
netstat -tulnp | grep 9092
ZooKeeper连接问题:
zookeeper.connect
配置tail -f zk.log
磁盘空间不足:
df -h
acks
配置(0/1/all)group.id
是否相同(相同group会分片消费)auto.offset.reset
配置(earliest/latest/none)参数 | 建议值 | 说明 |
---|---|---|
num.network.threads | 3 | 网络线程数 |
num.io.threads | 8 | IO线程数(建议>=磁盘数) |
socket.send.buffer.bytes | 1024000 | 发送缓冲区大小 |
socket.receive.buffer.bytes | 1024000 | 接收缓冲区大小 |
log.retention.hours | 168 | 日志保留时间(小时) |
JMX监控:
export JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties
Kafka Eagle可视化工具:
docker pull smartloli/kafka-eagle
本文详细介绍了Kafka单机环境的: 1. 安装与配置步骤 2. 基础功能验证方法 3. Java客户端开发示例 4. 常见问题解决方案
后续可进一步探索: - Kafka Connect数据集成 - Kafka Streams流处理 - KRaft模式(去ZooKeeper化部署)
注意事项:生产环境需配置多副本、监控告警等机制,本文单机配置仅适用于开发和测试场景。 “`
该文档共计约2300字,包含: - 安装部署详细步骤 - 配置说明与性能测试 - Java客户端示例代码 - 常见问题解决方案 - 格式规范的Markdown排版
可通过实际执行文中命令快速搭建可用的Kafka测试环境。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。