Skip to content

Commit 07b96a9

Browse files
miss-islingtontzickel
authored andcommitted
bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly. (cherry picked from commit 97bfe8d) Co-authored-by: tzickel <tzickel@users.noreply.github.com>
1 parent 58376c6 commit 07b96a9

File tree

3 files changed

+57
-24
lines changed

3 files changed

+57
-24
lines changed

Lib/multiprocessing/pool.py

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,9 @@ class Pool(object):
147147
'''
148148
_wrap_exception = True
149149

150-
def Process(self, *args, **kwds):
151-
return self._ctx.Process(*args, **kwds)
150+
@staticmethod
151+
def Process(ctx, *args, **kwds):
152+
return ctx.Process(*args, **kwds)
152153

153154
def __init__(self, processes=None, initializer=None, initargs=(),
154155
maxtasksperchild=None, context=None):
@@ -175,13 +176,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),
175176

176177
self._worker_handler = threading.Thread(
177178
target=Pool._handle_workers,
178-
args=(self, )
179+
args=(self._cache, self._taskqueue, self._ctx, self.Process,
180+
self._processes, self._pool, self._inqueue, self._outqueue,
181+
self._initializer, self._initargs, self._maxtasksperchild,
182+
self._wrap_exception)
179183
)
180184
self._worker_handler.daemon = True
181185
self._worker_handler._state = RUN
182186
self._worker_handler.start()
183187

184-
185188
self._task_handler = threading.Thread(
186189
target=Pool._handle_tasks,
187190
args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -207,43 +210,62 @@ def __init__(self, processes=None, initializer=None, initargs=(),
207210
exitpriority=15
208211
)
209212

210-
def _join_exited_workers(self):
213+
@staticmethod
214+
def _join_exited_workers(pool):
211215
"""Cleanup after any worker processes which have exited due to reaching
212216
their specified lifetime. Returns True if any workers were cleaned up.
213217
"""
214218
cleaned = False
215-
for i in reversed(range(len(self._pool))):
216-
worker = self._pool[i]
219+
for i in reversed(range(len(pool))):
220+
worker = pool[i]
217221
if worker.exitcode is not None:
218222
# worker exited
219223
util.debug('cleaning up worker %d' % i)
220224
worker.join()
221225
cleaned = True
222-
del self._pool[i]
226+
del pool[i]
223227
return cleaned
224228

225229
def _repopulate_pool(self):
230+
return self._repopulate_pool_static(self._ctx, self.Process,
231+
self._processes,
232+
self._pool, self._inqueue,
233+
self._outqueue, self._initializer,
234+
self._initargs,
235+
self._maxtasksperchild,
236+
self._wrap_exception)
237+
238+
@staticmethod
239+
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
240+
outqueue, initializer, initargs,
241+
maxtasksperchild, wrap_exception):
226242
"""Bring the number of pool processes up to the specified number,
227243
for use after reaping workers which have exited.
228244
"""
229-
for i in range(self._processes - len(self._pool)):
230-
w = self.Process(target=worker,
231-
args=(self._inqueue, self._outqueue,
232-
self._initializer,
233-
self._initargs, self._maxtasksperchild,
234-
self._wrap_exception)
235-
)
236-
self._pool.append(w)
245+
for i in range(processes - len(pool)):
246+
w = Process(ctx, target=worker,
247+
args=(inqueue, outqueue,
248+
initializer,
249+
initargs, maxtasksperchild,
250+
wrap_exception)
251+
)
252+
pool.append(w)
237253
w.name = w.name.replace('Process', 'PoolWorker')
238254
w.daemon = True
239255
w.start()
240256
util.debug('added worker')
241257

242-
def _maintain_pool(self):
258+
@staticmethod
259+
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
260+
initializer, initargs, maxtasksperchild,
261+
wrap_exception):
243262
"""Clean up any exited workers and start replacements for them.
244263
"""
245-
if self._join_exited_workers():
246-
self._repopulate_pool()
264+
if Pool._join_exited_workers(pool):
265+
Pool._repopulate_pool_static(ctx, Process, processes, pool,
266+
inqueue, outqueue, initializer,
267+
initargs, maxtasksperchild,
268+
wrap_exception)
247269

248270
def _setup_queues(self):
249271
self._inqueue = self._ctx.SimpleQueue()
@@ -396,16 +418,20 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
396418
return result
397419

398420
@staticmethod
399-
def _handle_workers(pool):
421+
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
422+
inqueue, outqueue, initializer, initargs,
423+
maxtasksperchild, wrap_exception):
400424
thread = threading.current_thread()
401425

402426
# Keep maintaining workers until the cache gets drained, unless the pool
403427
# is terminated.
404-
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
405-
pool._maintain_pool()
428+
while thread._state == RUN or (cache and thread._state != TERMINATE):
429+
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
430+
outqueue, initializer, initargs,
431+
maxtasksperchild, wrap_exception)
406432
time.sleep(0.1)
407433
# send sentinel to stop workers
408-
pool._taskqueue.put(None)
434+
taskqueue.put(None)
409435
util.debug('worker handler exiting')
410436

411437
@staticmethod
@@ -781,7 +807,7 @@ class ThreadPool(Pool):
781807
_wrap_exception = False
782808

783809
@staticmethod
784-
def Process(*args, **kwds):
810+
def Process(ctx, *args, **kwds):
785811
from .dummy import Process
786812
return Process(*args, **kwds)
787813

Lib/test/_test_multiprocessing.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2285,6 +2285,12 @@ def test_release_task_refs(self):
22852285
# they were released too.
22862286
self.assertEqual(CountedObject.n_instances, 0)
22872287

2288+
def test_del_pool(self):
2289+
p = self.Pool(1)
2290+
wr = weakref.ref(p)
2291+
del p
2292+
gc.collect()
2293+
self.assertIsNone(wr())
22882294

22892295
def raising():
22902296
raise KeyError("key")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.

0 commit comments

Comments
 (0)