Skip to content
39 changes: 22 additions & 17 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def close(self):
self._reader.close()

def wakeup(self):
self._writer.send_bytes(b"")
try:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a _closed flag might be better than catching the exception? You could also then check the flag in the clear method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, this looks better.

self._writer.send_bytes(b"")
except OSError:
# This can happen if the QueueManagerThread has been shutdown by
# another thread before this wakeup call.
pass

def clear(self):
while self._reader.poll():
Expand Down Expand Up @@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
Expand Down Expand Up @@ -339,6 +346,8 @@ def shutdown_worker():

# Release the queue's resources as soon as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a requirement for this PR, but it seems to me that the _queue_management_worker function has grown quite long and complicated. Do you think it could be turned into an object with a bunch of short and readable helper methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be a very much easier to read indeed. I will try to make a new PR with this change when I get a chance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to your PR then @tomMoral :-)

call_queue.close()
call_queue.join_thread()
thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
Expand Down Expand Up @@ -547,29 +556,30 @@ def __init__(self, max_workers=None, mp_context=None,
self._queue_count = 0
self._pending_work_items = {}

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
pending_work_items=self._pending_work_items,
thread_wakeup=self._queue_management_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

def _start_queue_management_thread(self):
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
Expand Down Expand Up @@ -671,16 +681,11 @@ def shutdown(self, wait=True):
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
if self._call_queue is not None:
self._call_queue.close()
if wait:
self._call_queue.join_thread()
self._call_queue = None
self._call_queue = None
self._result_queue = None
self._processes = None

if self._queue_management_thread_wakeup:
self._queue_management_thread_wakeup.close()
self._queue_management_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,32 @@ def test_shutdown_deadlock(self):
with self.assertRaises(BrokenProcessPool):
f.result()

def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
# Reported in GH#6034.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=get_context(self.ctx)) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

# Start the executor and get the queue_management_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
queue_manager = executor._queue_management_thread

# Submit a task that fails at pickle and shutdown the executor
# without waiting
f = executor.submit(id, ErrorAtPickle())
executor.shutdown(wait=False)
with self.assertRaises(PicklingError):
f.result()

# Make sure the executor is eventually shutdown and do not leave
# dangling threads
queue_manager.join()


create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down