Multiprocess queue blocked? #1474
-
I'm trying to continuously consume messages and send them in batch to a client, but the batch upload may take longer than it takes to receive messages and we're expecting a throughput of 600K+ messages per day. I'm looking to use a multiprocessing queue to avoid losing information as opposed to avoiding duplicates, which is why I don't auto-commit. If that's bad practice, please let me know. It looks like I'm always getting None values even though I receive messages just fine if I have this all in a single for-loop. This also works well if I use the kafka-python module, but that's because the consumer works like an iterable, but I'd rather use this module for speed. """ Continuously consume and process while consuming """ import confluent_kafka import json import logging import logging.config import multiprocessing import queue import sys import time # in-house import config import _deserialize logging.basicConfig(level = logging.DEBUG) def get_messages(instance_queue): logging.info("Calling consumer") while True: message = instance_consumer.poll(1.0) if message is None: continue if message.value() is None: continue if message.error(): logging.exception(message.error(), exc_info=True) continue logging.debug(message) message_decoded = _deserialize.deserialize_json(message.value()) if message_decoded is None: continue logging.debug(f"Topic Name={message.topic()}, Message={message_decoded['id']}") instance_queue.put(message_decoded) def process_messages(instance_queue): """ Send to outbound client """ while True: try: message = instance_queue.get(timeout=1) except queue.Empty: pass else: # do stuff with messages # Run Kakfa ======================================================================================= def callback_error(data): logging.error(data) time.sleep(3) # server has time to run instance_consumer = confluent_kafka.Consumer({ 'bootstrap.servers': ",".join(config.dict_setup_aws['servers']), 'auto.offset.reset': 'earliest', 'group.id' : 'Client-Consumer', 'error_cb': callback_error, 'enable.auto.commit': False # 'auto.commit.interval.ms' : 30 * 1000, # adjust later }) instance_consumer.subscribe(config.dict_setup_aws['topics']) q = multiprocessing.Queue() process_consume = multiprocessing.Process(target=get_messages, args=[q]) process_consume.start() process_upload = multiprocessing.Process(target=process_messages, args=[q]) process_upload.start() process_consume.join() process_upload.join() instance_consumer.close() sys.exit() The logs just hang at "Calling consumer" |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Client instances can't be used across forks, and from a superficial glance it looks like multiprocessing will call get_messages from multiple forked sub-processes. |
Beta Was this translation helpful? Give feedback.
Client instances can't be used across forks, and from a superficial glance it looks like multiprocessing will call get_messages from multiple forked sub-processes.
I would recommend polling messages in the main process and passing their key,payload,topic,partition,offset to the child processes.