-
Hello, I think I am not facing a package issue but rather a skill issue. I don't understand why .poll() is blocking my program. When I run the code below, nothing happens on the output however when I add the sys.stdout.flush() calls, it starts printing. from confluent_kafka import Consumer, KafkaError from confluent_kafka.admin import AdminClient, NewTopic import sys bootstrap_servers = 'broker-1:19092,broker-2:19092,broker-3:19092' conf = {'bootstrap.servers': bootstrap_servers, 'group.id': 'availability-tool', 'enable.auto.commit': 'true', 'auto.offset.reset': 'earliest'} consumer = Consumer(conf) TOPICS = ['asset_state'] consumer.subscribe(TOPICS) admin = AdminClient({'bootstrap.servers': bootstrap_servers}) print("List of topics in Kafka") for topic in admin.list_topics().topics: print(f"{topic}") print(f"List of topics for {consumer}") for topic in consumer.list_topics().topics: print(f"{topic}") # sys.stdout.flush() while True: msg = consumer.poll(2.0) if msg is None: print("...") continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) # sys.stdout.flush() consumer.close() I tested my program with kafka official console binaries to see that messages where sent to the consumer and they were. I did restart multiple times my program but even after a clean start, it isn't working Thanks in advance. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Also, the most optimal way I found to have efficient output is this : while True: sys.stdout.flush() # call to stdout flush() HERE msg = consumer.poll(2.0) if msg is None: print("...") continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) consumer.close() If anyone have some time to help me figure it out that would be nice ! |
Beta Was this translation helpful? Give feedback.
print call is stuck, using a logger fixes it. So poll() is not at fault.