使用Linux Kafka进行实时数据流处理涉及以下几个步骤:
sudo apt install openjdk-8-jdk
sudo yum install java-1.8.0-openjdk
java -version
tar -zxvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0.tgz kafka
cd /usr/local/kafka/config
server.properties
文件:vim server.properties
修改以下配置项:
broker.id
:设置为唯一标识符,例如 1
。log.dirs
:设置Kafka日志存储目录,例如 /tmp/kafka-logs
。zookeeper.connect
:设置Zookeeper连接地址,例如 zk_ip:2181
。listeners
:设置Kafka监听地址,例如 PLAINTEXT://your_server_ip:9092
。log.retention.hours
:设置日志保留时间,例如 168
(表示一周)。delete.topic.enable
:设置为 true
以允许删除topic。nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
ps -ef | grep zookeeper
测试Kafka生产者和消费者:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server your_server_ip:9092
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server your_server_ip:9092
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; public class KafkaStreamsApp { public static void main(String[] args) { StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("input-topic"); KStream<String, String> processed = source.mapValues(value -> processValue(value)); processed.to("output-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig()); streams.start(); } private static String processValue(String value) { // 处理逻辑 return value; } private static Properties getStreamsConfig() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } }
java -jar kafka-streams-app.jar
通过上述步骤,您可以在Linux上安装和配置Kafka,并使用Kafka Streams进行实时数据流处理。