|
62 | 62 | import os |
63 | 63 | import gc |
64 | 64 | import sys |
65 | | -import types |
66 | 65 | import struct |
67 | 66 | import weakref |
68 | 67 | import warnings |
@@ -438,7 +437,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs, |
438 | 437 | continue |
439 | 438 | if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY: |
440 | 439 | mem_usage = _get_memory_usage(pid) |
441 | | - print(mem_usage) |
442 | 440 | _last_memory_leak_check = time() |
443 | 441 | if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: |
444 | 442 | # Memory usage stays within bounds: everything is fine. |
@@ -618,34 +616,41 @@ def shutdown_all_workers(): |
618 | 616 | worker_sentinels = [p.sentinel for p in processes.values()] |
619 | 617 | ready = wait(readers + worker_sentinels) |
620 | 618 |
|
621 | | - broken = ("A process in the executor was terminated abruptly", None) |
| 619 | + broken = ("A worker process managed by the executor was unexpectedly " |
| 620 | + "terminated. This could be caused by a segmentation fault " |
| 621 | + "while calling the function or by an excessive memory usage " |
| 622 | + "causing the Operating System to kill the worker.", None, |
| 623 | + TerminatedWorkerError) |
622 | 624 | if result_reader in ready: |
623 | 625 | try: |
624 | 626 | result_item = result_reader.recv() |
625 | 627 | broken = None |
626 | 628 | if isinstance(result_item, _RemoteTraceback): |
627 | | - cause = result_item.tb |
628 | | - broken = ("A task has failed to un-serialize", cause) |
| 629 | + broken = ("A task has failed to un-serialize. Please " |
| 630 | + "ensure that the arguments of the function are " |
| 631 | + "all picklable.", result_item.tb, |
| 632 | + BrokenProcessPool) |
629 | 633 | except BaseException as e: |
630 | 634 | tb = getattr(e, "__traceback__", None) |
631 | 635 | if tb is None: |
632 | 636 | _, _, tb = sys.exc_info() |
633 | | - broken = ("A result has failed to un-serialize", |
634 | | - traceback.format_exception(type(e), e, tb)) |
| 637 | + broken = ("A result has failed to un-serialize. Please " |
| 638 | + "ensure that the objects returned by the function " |
| 639 | + "are always picklable.", |
| 640 | + traceback.format_exception(type(e), e, tb), |
| 641 | + BrokenProcessPool) |
635 | 642 | elif wakeup_reader in ready: |
636 | 643 | broken = None |
637 | 644 | result_item = None |
638 | 645 | thread_wakeup.clear() |
639 | | - if broken: |
640 | | - msg, cause = broken |
641 | | - # Mark the process pool broken so that submits fail right now. |
642 | | - executor_flags.flag_as_broken( |
643 | | - msg + ", the pool is not usable anymore.") |
644 | | - bpe = BrokenProcessPool( |
645 | | - msg + " while the future was running or pending.") |
646 | | - if cause is not None: |
| 646 | + if broken is not None: |
| 647 | + msg, cause_tb, exc_type = broken |
| 648 | + bpe = exc_type(msg) |
| 649 | + if cause_tb is not None: |
647 | 650 | bpe.__cause__ = _RemoteTraceback( |
648 | | - "\n'''\n{}'''".format(''.join(cause))) |
| 651 | + "\n'''\n{}'''".format(''.join(cause_tb))) |
| 652 | + # Mark the process pool broken so that submits fail right now. |
| 653 | + executor_flags.flag_as_broken(bpe) |
649 | 654 |
|
650 | 655 | # All futures in flight must be marked failed |
651 | 656 | for work_id, work_item in pending_work_items.items(): |
@@ -808,6 +813,15 @@ class LokyRecursionError(RuntimeError): |
808 | 813 |
|
809 | 814 |
|
810 | 815 | class BrokenProcessPool(_BPPException): |
| 816 | + """ |
| 817 | + Raised when the executor is broken while a future was in the running state. |
| 818 | + The cause can an error raised when unpickling the task in the worker |
| 819 | + process or when unpickling the result value in the parent process. It can |
| 820 | + also be caused by a worker process being terminated unexpectedly. |
| 821 | + """ |
| 822 | + |
| 823 | + |
| 824 | +class TerminatedWorkerError(BrokenProcessPool): |
811 | 825 | """ |
812 | 826 | Raised when a process in a ProcessPoolExecutor terminated abruptly |
813 | 827 | while a future was in the running state. |
@@ -998,8 +1012,8 @@ def _ensure_executor_running(self): |
998 | 1012 |
|
999 | 1013 | def submit(self, fn, *args, **kwargs): |
1000 | 1014 | with self._flags.shutdown_lock: |
1001 | | - if self._flags.broken: |
1002 | | - raise BrokenProcessPool(self._flags.broken) |
| 1015 | + if self._flags.broken is not None: |
| 1016 | + raise self._flags.broken |
1003 | 1017 | if self._flags.shutdown: |
1004 | 1018 | raise ShutdownExecutorError( |
1005 | 1019 | 'cannot schedule new futures after shutdown') |
|
0 commit comments