Kafka依赖Java运行环境,需安装OpenJDK 8或更高版本(推荐Java 11)。执行以下命令安装并验证:
sudo yum install -y java-11-openjdk-devel # 安装Java 11 java -version # 验证安装(需显示Java版本信息)
从Apache官网下载最新稳定版Kafka(如3.5.2),解压至指定目录(如/usr/local/kafka
):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz # 下载Kafka tar -xzf kafka_2.12-3.5.2.tgz -C /usr/local/ # 解压 sudo mv /usr/local/kafka_2.12-3.5.2 /usr/local/kafka # 重命名目录
Kafka需通过Zookeeper实现集群管理和元数据存储。编辑Zookeeper配置文件(/usr/local/kafka/config/zookeeper.properties
),设置集群模式(单节点示例):
dataDir=/usr/local/kafka/zookeeper_data # 数据存储目录 clientPort=2181 # 客户端连接端口 initLimit=5 # Leader与Follower初始同步时间 syncLimit=2 # Leader与Follower同步超时时间 server.1=localhost:2888:3888 # 集群节点配置(单节点可省略)
启动Zookeeper服务:
nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties > /dev/null 2>&1 &
Kafka的核心配置文件为/usr/local/kafka/config/server.properties
,需调整以下关键参数:
broker.id=0 # 集群中每个Broker的唯一ID(0~255),多Broker需不同
listeners=PLAINTEXT://0.0.0.0:9092 # 监听所有网络接口的9092端口 advertised.listeners=PLAINTEXT://your_server_ip:9092 # 客户端连接的地址(替换为服务器公网/内网IP)
log.dirs=/usr/local/kafka/kafka_logs # 日志存储目录(建议使用独立磁盘,多目录用逗号分隔)
zookeeper.connect=localhost:2181 # 单节点Zookeeper;多节点用逗号分隔(如zk1:2181,zk2:2181,zk3:2181)
num.partitions=8 # Topic默认分区数(根据业务吞吐量调整,建议≥3) default.replication.factor=3 # Topic默认副本因子(≥2,生产环境建议3) min.insync.replicas=2 # 写操作需确认的最小副本数(≤default.replication.factor,保证数据持久性)
log.retention.hours=168 # 日志保留时间(小时,默认7天,可根据需求调整为24小时或更长) log.segment.bytes=1073741824 # 日志段大小(1GB,建议1~10GB,过大影响删除效率) log.retention.check.interval.ms=300000 # 日志检查间隔(5分钟)
num.network.threads=3 # 网络请求处理线程数(建议≥3,根据CPU核心数调整) num.io.threads=8 # 磁盘I/O线程数(建议≥8,根据磁盘数量和性能调整) socket.send.buffer.bytes=102400 # 发送缓冲区大小(100KB,默认100KB) socket.receive.buffer.bytes=102400 # 接收缓冲区大小(100KB,默认100KB) socket.request.max.bytes=104857600 # 请求最大字节数(100MB,默认100MB)
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null 2>&1 &
创建Systemd服务文件(/etc/systemd/system/kafka.service
),内容如下:
[Unit] Description=Apache Kafka Server After=network.target zookeeper.service # 依赖Zookeeper服务 [Service] Type=simple User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
启用并启动服务:
sudo systemctl daemon-reload sudo systemctl enable kafka sudo systemctl start kafka
/usr/local/kafka/bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic test_topic
/usr/local/kafka/bin/kafka-console-producer.sh --topic test_topic --bootstrap-server localhost:9092
输入消息(如Hello Kafka
)并按回车。
/usr/local/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server localhost:9092
应能看到发送的Hello Kafka
消息。
开放Kafka(9092)和Zookeeper(2181)端口:
sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent sudo firewall-cmd --zone=public --add-port=2181/tcp --permanent sudo firewall-cmd --reload
log.dirs
),避免机械硬盘。/data1/kafka_logs,/data2/kafka_logs
),提高IO吞吐量。调整Kafka的JVM堆内存(根据服务器内存调整,建议不超过物理内存的70%):
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" # 在kafka-server-start.sh前添加
log4j.properties
),避免日志文件过大占用磁盘空间。