java - Kafka Consumer receiving messages

Java - Kafka Consumer receiving messages

To receive messages from a Kafka topic using a Kafka consumer in Java, you need to set up a Kafka consumer configuration, subscribe to a topic, and continuously poll for new messages. Below is a step-by-step guide on how to achieve this:

Step-by-Step Guide

1. Add Kafka Dependencies

First, add the necessary Kafka dependencies to your pom.xml if you are using Maven:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.2</version> <!-- Ensure this matches the version you're using --> </dependency> 

2. Configure the Kafka Consumer

Set up the Kafka consumer properties. This includes the bootstrap servers, deserializer classes, and other relevant properties.

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Define properties for the Kafka consumer Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Create the Kafka consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to the topic consumer.subscribe(Collections.singletonList("my-topic")); // Poll for new messages try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } } 

Explanation

  1. Dependencies:

    • Ensure you have the Kafka clients library added to your project dependencies.
  2. Properties Configuration:

    • bootstrap.servers: List of Kafka broker addresses.
    • group.id: Consumer group ID.
    • key.deserializer and value.deserializer: Classes to deserialize the key and value of messages.
    • auto.offset.reset: Determines what to do when there is no initial offset in Kafka or if the current offset does not exist on the server.
  3. KafkaConsumer Creation:

    • Create an instance of KafkaConsumer using the properties defined.
  4. Subscription:

    • Use subscribe to subscribe to one or more topics. Here, Collections.singletonList("my-topic") is used to subscribe to a single topic named "my-topic".
  5. Polling for Messages:

    • Use a loop to continuously poll for new messages from the Kafka topic. poll(Duration.ofMillis(100)) fetches records for the specified duration.
    • Iterate through ConsumerRecords to process each ConsumerRecord.
  6. Closing the Consumer:

    • Close the consumer to release resources when done. This is typically done in a finally block to ensure it always happens, even if an exception is thrown.

Notes

  • Error Handling: Add appropriate error handling to manage exceptions and potential issues in a production environment.
  • Offset Management: Depending on your requirements, you may need to manage offsets manually to ensure exactly-once processing semantics.
  • Multi-threading: If you need to handle high throughput, consider using multiple consumers in a consumer group to parallelize the processing.

This example provides a basic implementation of a Kafka consumer in Java to receive messages from a Kafka topic. Adjust the configurations and logic according to your specific use case and Kafka setup.

Examples

  1. How to create a Kafka Consumer in Java? Description: Implement a basic Kafka consumer that subscribes to a topic and receives messages.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } } 
  2. Java code to consume messages from Kafka topic? Description: Write Java code to consume messages from a specified Kafka topic.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } finally { consumer.close(); } } } 
  3. How to handle Kafka consumer offset commits in Java? Description: Implement a Kafka consumer with manual offset commit handling in Java.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } consumer.commitSync(); } } finally { consumer.close(); } } } 
  4. Java Kafka consumer example with manual offset control? Description: Demonstrate a Kafka consumer example in Java with manual control over offset commits.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } consumer.commitSync(); } } finally { consumer.close(); } } } 
  5. Java Kafka consumer subscribe to multiple topics? Description: Subscribe a Kafka consumer to multiple topics and consume messages in Java.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1", "topic2")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } finally { consumer.close(); } } } 
  6. How to handle Kafka consumer rebalancing in Java? Description: Handle Kafka consumer rebalancing events in a Java application.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } finally { consumer.close(); } } } 
  7. Java Kafka consumer with custom deserializer? Description: Create a Kafka consumer with a custom message deserializer in Java.

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class.getName()); KafkaConsumer<String, CustomObject> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, CustomObject> records = consumer.poll(100); for (ConsumerRecord<String, CustomObject> record : records) { System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value()); } } } finally { consumer.close(); } } } 

More Tags

wampserver zip xlwings pyusb web-audio-api phpword m2m in-clause download-manager casing

More Programming Questions

More Chemical thermodynamics Calculators

More Organic chemistry Calculators

More Weather Calculators

More Stoichiometry Calculators