Skip to content
Prev Previous commit
Next Next commit
Using mp.manager to solve the issue with join for MPConsumer
  • Loading branch information
vshlapakov committed Mar 24, 2015
commit 88465f70ef75c13bd6317496f1f8a40d0455b091
13 changes: 7 additions & 6 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time

from collections import namedtuple
from multiprocessing import Process, Queue as MPQueue, Event, Value
from multiprocessing import Process, Manager as MPManager

try:
from Queue import Empty
Expand Down Expand Up @@ -121,12 +121,13 @@ def __init__(self, client, group, topic, auto_commit=True,

# Variables for managing and controlling the data flow from
# consumer child process to master
self.queue = MPQueue(1024) # Child consumers dump messages into this
manager = MPManager()
self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
start = Event(), # Indicates the consumers to start fetch
exit = Event(), # Requests the consumers to shutdown
pause = Event()) # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
start = manager.Event(), # Indicates the consumers to start fetch
exit = manager.Event(), # Requests the consumers to shutdown
pause = manager.Event()) # Requests the consumers to pause fetch
self.size = manager.Value('i', 0) # Indicator of number of messages to fetch

# dict.keys() returns a view in py3 + it's not a thread-safe operation
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
Expand Down