在Debian上配置Kafka消费者涉及几个步骤,包括安装Java(Kafka的运行环境)、下载和配置Kafka、以及编写和运行Kafka消费者代码。以下是一个详细的步骤指南:
Kafka需要Java运行环境,因此首先需要安装Java。
sudo apt update sudo apt install openjdk-11-jdk 验证Java安装:
java -version 从Apache Kafka官方网站下载Kafka。
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz tar -xzf kafka_2.13-3.3.1.tgz cd kafka_2.13-3.3.1 启动Zookeeper(Kafka的协调服务):
bin/zookeeper-server-start.sh config/zookeeper.properties 在另一个终端启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties 创建一个Kafka主题,消费者将从中读取消息。
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 使用Java编写一个简单的Kafka消费者。以下是一个示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test-group"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }); } } finally { consumer.close(); } } } 确保你已经安装了Maven或Gradle来管理依赖项。以下是使用Maven的示例:
创建一个pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>kafka-consumer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </build> </project> 在项目目录中运行:
mvn clean package 运行消费者:
java -cp target/kafka-consumer-1.0-SNAPSHOT.jar SimpleConsumer 确保Kafka服务器正在运行,并且主题已经创建。然后运行消费者代码,你应该能够看到从主题中读取的消息。
通过以上步骤,你可以在Debian上成功配置和运行一个Kafka消费者。