DEV Community

Nucu Labs
Nucu Labs

Posted on

Apache Kafka: How-to set offsets to a fixed time

Hello ๐Ÿ‘‹,

This is a short article about setting offsets in Apache Kafka for a consumer group.

Normally, to reset offsets in Kafka you need to use the kafka-consumer-groups.sh tool, this means downloading the zip archive with Kafka's source code and setting up the Java SDK. All Kafka's tools are dependent on Java and this isn't that nice or developer friendly...

Sometimes getting Java correctly and getting the tools to run they don't work ๐Ÿคท๐Ÿปโ€โ™‚๏ธ. Either the tool versions are incompatible with the Kafka version on the server or the command executes successfully but it doesn't seem to do anything...

Another method to set offsets for a consumer it is to use a Kafka library, and to do it through code.

I have Python installed on my setup and all I need to do is to install the confluent-kafka library:

pip install confluent-kafka 
Enter fullscreen mode Exit fullscreen mode

And then run the following code snippet to reset the consumer's offsets to a specific timestamp:

 from confluent_kafka import Consumer, TopicPartition import time # Configuration consumer_config = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest', 'enable.partition.eof': True } topic = 'my-topic' timestamp_ms = int(time.mktime(time.strptime("2025-04-01 12:00:00", "%Y-%m-%d %H:%M:%S")) * 1000) # or time in miliseconds  # Create consumer consumer = Consumer(consumer_config) # Get metadata to discover partitions metadata = consumer.list_topics(topic) partitions = [TopicPartition(topic, p.id, timestamp_ms) for p in metadata.topics[topic].partitions.values()] # Lookup offsets for the timestamp offsets = consumer.offsets_for_times(partitions, timeout=10.0) # Assign partitions with correct offsets consumer.assign(offsets) # Start consuming try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print("Error:", msg.error()) continue print(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}") break except KeyboardInterrupt: pass finally: consumer.close() 
Enter fullscreen mode Exit fullscreen mode

Thanks for reading!

Top comments (0)