使用Linux Kafka进行实时数据处理可以分为以下几个步骤:
下载Kafka:
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz tar -xzf kafka_2.13-3.2.0.tgz cd kafka_2.13-3.2.0 启动Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties 修改server.properties:
broker.id:每个broker的唯一ID。listeners:监听地址和端口。log.dirs:日志存储目录。zookeeper.connect:Zookeeper连接字符串。修改zookeeper.properties:
dataDir:Zookeeper数据存储目录。bin/kafka-topics.sh --create --topic your_topic_name --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 bin/kafka-console-producer.sh --topic your_topic_name --bootstrap-server localhost:9092 输入消息并按回车键发送。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<String, String>("your_topic_name", "Hello, Kafka!"); producer.send(record); producer.close(); } } bin/kafka-console-consumer.sh --topic your_topic_name --from-beginning --bootstrap-server localhost:9092 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("your_topic_name")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } Kafka Streams是一个用于构建实时流处理应用程序和微服务的客户端库。
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class KafkaStreamsExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("your_topic_name"); source.mapValues(value -> value.toUpperCase()).to("processed_topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } } 通过以上步骤,你可以使用Linux Kafka进行实时数据处理。根据具体需求,可以进一步扩展和优化你的Kafka集群和应用。