Skip to content

Commit 7193d14

Browse files
authored
Fix routing of background thread output when no parent is set explicitly (#1451)
1 parent b8f5dfc commit 7193d14

File tree

6 files changed

+183
-146
lines changed

6 files changed

+183
-146
lines changed

ipykernel/displayhook.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(self, session, pub_socket):
2929

3030
self._parent_header: ContextVar[dict[str, Any]] = ContextVar("parent_header")
3131
self._parent_header.set({})
32+
self._parent_header_global = {}
3233

3334
def get_execution_count(self):
3435
"""This method is replaced in kernelapp"""
@@ -57,11 +58,16 @@ def __call__(self, obj):
5758

5859
@property
5960
def parent_header(self):
60-
return self._parent_header.get()
61+
try:
62+
return self._parent_header.get()
63+
except LookupError:
64+
return self._parent_header_global
6165

6266
def set_parent(self, parent):
6367
"""Set the parent header."""
64-
self._parent_header.set(extract_header(parent))
68+
parent_header = extract_header(parent)
69+
self._parent_header.set(parent_header)
70+
self._parent_header_global = parent_header
6571

6672

6773
class ZMQShellDisplayHook(DisplayHook):
@@ -83,11 +89,16 @@ def __init__(self, *args, **kwargs):
8389

8490
@property
8591
def parent_header(self):
86-
return self._parent_header.get()
92+
try:
93+
return self._parent_header.get()
94+
except LookupError:
95+
return self._parent_header_global
8796

8897
def set_parent(self, parent):
89-
"""Set the parent for outbound messages."""
90-
self._parent_header.set(extract_header(parent))
98+
"""Set the parent header."""
99+
parent_header = extract_header(parent)
100+
self._parent_header.set(parent_header)
101+
self._parent_header_global = parent_header
91102

92103
def start_displayhook(self):
93104
"""Start the display hook."""

ipykernel/iostream.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,6 @@ def __init__(
456456
"parent_header"
457457
)
458458
self._parent_header.set({})
459-
self._thread_to_parent = {}
460-
self._thread_to_parent_header = {}
461459
self._parent_header_global = {}
462460
self._master_pid = os.getpid()
463461
self._flush_pending = False
@@ -512,21 +510,11 @@ def __init__(
512510
@property
513511
def parent_header(self):
514512
try:
515-
# asyncio-specific
513+
# asyncio or thread-specific
516514
return self._parent_header.get()
517515
except LookupError:
518-
try:
519-
# thread-specific
520-
identity = threading.current_thread().ident
521-
# retrieve the outermost (oldest ancestor,
522-
# discounting the kernel thread) thread identity
523-
while identity in self._thread_to_parent:
524-
identity = self._thread_to_parent[identity]
525-
# use the header of the oldest ancestor
526-
return self._thread_to_parent_header[identity]
527-
except KeyError:
528-
# global (fallback)
529-
return self._parent_header_global
516+
# global (fallback)
517+
return self._parent_header_global
530518

531519
@parent_header.setter
532520
def parent_header(self, value):

ipykernel/ipkernel.py

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import asyncio
66
import builtins
7-
import gc
87
import getpass
98
import os
109
import signal
@@ -17,15 +16,13 @@
1716
import comm
1817
from IPython.core import release
1918
from IPython.utils.tokenutil import line_at_cursor, token_at_cursor
20-
from jupyter_client.session import extract_header
2119
from traitlets import Any, Bool, HasTraits, Instance, List, Type, default, observe, observe_compat
2220
from zmq.eventloop.zmqstream import ZMQStream
2321

2422
from .comm.comm import BaseComm
2523
from .comm.manager import CommManager
2624
from .compiler import XCachingCompiler
2725
from .eventloops import _use_appnope
28-
from .iostream import OutStream
2926
from .kernelbase import Kernel as KernelBase
3027
from .kernelbase import _accepts_parameters
3128
from .zmqshell import ZMQInteractiveShell
@@ -167,14 +164,6 @@ def __init__(self, **kwargs):
167164

168165
appnope.nope()
169166

170-
self._new_threads_parent_header = {}
171-
self._initialize_thread_hooks()
172-
173-
if hasattr(gc, "callbacks"):
174-
# while `gc.callbacks` exists since Python 3.3, pypy does not
175-
# implement it even as of 3.9.
176-
gc.callbacks.append(self._clean_thread_parent_frames)
177-
178167
help_links = List(
179168
[
180169
{
@@ -374,8 +363,6 @@ def _dummy_context_manager(self, *args):
374363

375364
async def execute_request(self, stream, ident, parent):
376365
"""Override for cell output - cell reconciliation."""
377-
parent_header = extract_header(parent)
378-
self._associate_new_top_level_threads_with(parent_header)
379366
await super().execute_request(stream, ident, parent)
380367

381368
async def do_execute(
@@ -750,83 +737,6 @@ def do_clear(self):
750737
self.shell.reset(False)
751738
return dict(status="ok")
752739

753-
def _associate_new_top_level_threads_with(self, parent_header):
754-
"""Store the parent header to associate it with new top-level threads"""
755-
self._new_threads_parent_header = parent_header
756-
757-
def _initialize_thread_hooks(self):
758-
"""Store thread hierarchy and thread-parent_header associations."""
759-
stdout = self._stdout
760-
stderr = self._stderr
761-
kernel_thread_ident = threading.get_ident()
762-
kernel = self
763-
_threading_Thread_run = threading.Thread.run
764-
_threading_Thread__init__ = threading.Thread.__init__
765-
766-
def run_closure(self: threading.Thread):
767-
"""Wrap the `threading.Thread.start` to intercept thread identity.
768-
769-
This is needed because there is no "start" hook yet, but there
770-
might be one in the future: https://bugs.python.org/issue14073
771-
772-
This is a no-op if the `self._stdout` and `self._stderr` are not
773-
sub-classes of `OutStream`.
774-
"""
775-
776-
try:
777-
parent = self._ipykernel_parent_thread_ident # type:ignore[attr-defined]
778-
except AttributeError:
779-
return
780-
for stream in [stdout, stderr]:
781-
if isinstance(stream, OutStream):
782-
if parent == kernel_thread_ident:
783-
stream._thread_to_parent_header[self.ident] = (
784-
kernel._new_threads_parent_header
785-
)
786-
else:
787-
stream._thread_to_parent[self.ident] = parent
788-
_threading_Thread_run(self)
789-
790-
def init_closure(self: threading.Thread, *args, **kwargs):
791-
_threading_Thread__init__(self, *args, **kwargs)
792-
self._ipykernel_parent_thread_ident = threading.get_ident() # type:ignore[attr-defined]
793-
794-
threading.Thread.__init__ = init_closure # type:ignore[method-assign]
795-
threading.Thread.run = run_closure # type:ignore[method-assign]
796-
797-
def _clean_thread_parent_frames(
798-
self, phase: t.Literal["start", "stop"], info: dict[str, t.Any]
799-
):
800-
"""Clean parent frames of threads which are no longer running.
801-
This is meant to be invoked by garbage collector callback hook.
802-
803-
The implementation enumerates the threads because there is no "exit" hook yet,
804-
but there might be one in the future: https://bugs.python.org/issue14073
805-
806-
This is a no-op if the `self._stdout` and `self._stderr` are not
807-
sub-classes of `OutStream`.
808-
"""
809-
# Only run before the garbage collector starts
810-
if phase != "start":
811-
return
812-
active_threads = {thread.ident for thread in threading.enumerate()}
813-
for stream in [self._stdout, self._stderr]:
814-
if isinstance(stream, OutStream):
815-
thread_to_parent_header = stream._thread_to_parent_header
816-
for identity in list(thread_to_parent_header.keys()):
817-
if identity not in active_threads:
818-
try:
819-
del thread_to_parent_header[identity]
820-
except KeyError:
821-
pass
822-
thread_to_parent = stream._thread_to_parent
823-
for identity in list(thread_to_parent.keys()):
824-
if identity not in active_threads:
825-
try:
826-
del thread_to_parent[identity]
827-
except KeyError:
828-
pass
829-
830740

831741
# This exists only for backwards compatibility - use IPythonKernel instead
832742

ipykernel/kernelbase.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import uuid
1818
import warnings
1919
from collections.abc import Mapping
20-
from contextvars import ContextVar
20+
from contextvars import Context, ContextVar, copy_context
2121
from datetime import datetime
2222
from functools import partial
2323
from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal
@@ -72,6 +72,8 @@
7272
" ipykernel 6.0 (2021). {target} does not seem to return an awaitable"
7373
)
7474

75+
T = t.TypeVar("T")
76+
7577

7678
def _accepts_parameters(meth, param_names):
7779
parameters = inspect.signature(meth).parameters
@@ -201,6 +203,7 @@ def _default_ident(self):
201203
_control_parent_ident: bytes = b""
202204
_shell_parent: ContextVar[dict[str, Any]]
203205
_shell_parent_ident: ContextVar[bytes]
206+
_shell_context: Context
204207
# Kept for backward-compatibility, accesses _control_parent_ident and _shell_parent_ident,
205208
# see https://github.com/jupyterlab/jupyterlab/issues/17785
206209
_parent_ident: Mapping[str, bytes]
@@ -320,13 +323,14 @@ def __init__(self, **kwargs):
320323
self._shell_parent.set({})
321324
self._shell_parent_ident = ContextVar("shell_parent_ident")
322325
self._shell_parent_ident.set(b"")
326+
self._shell_context = copy_context()
323327

324328
# For backward compatibility so that _parent_ident["shell"] and _parent_ident["control"]
325329
# work as they used to for ipykernel >= 7
326330
self._parent_ident = LazyDict(
327331
{
328332
"control": lambda: self._control_parent_ident,
329-
"shell": lambda: self._shell_parent_ident.get(),
333+
"shell": lambda: self._get_shell_context_var(self._shell_parent_ident),
330334
}
331335
)
332336

@@ -768,6 +772,8 @@ def set_parent(self, ident, parent, channel="shell"):
768772
else:
769773
self._shell_parent_ident.set(ident)
770774
self._shell_parent.set(parent)
775+
# preserve the last call to set_parent
776+
self._shell_context = copy_context()
771777

772778
def get_parent(self, channel=None):
773779
"""Get the parent request associated with a channel.
@@ -794,7 +800,20 @@ def get_parent(self, channel=None):
794800

795801
if channel == "control":
796802
return self._control_parent
797-
return self._shell_parent.get()
803+
804+
return self._get_shell_context_var(self._shell_parent)
805+
806+
def _get_shell_context_var(self, var: ContextVar[T]) -> T:
807+
"""Lookup a ContextVar, falling back on the shell context
808+
809+
Allows for user-launched Threads to still resolve to the shell's main context
810+
811+
necessary for e.g. display from threads.
812+
"""
813+
try:
814+
return var.get()
815+
except LookupError:
816+
return self._shell_context[var]
798817

799818
def send_response(
800819
self,
@@ -1455,7 +1474,7 @@ def getpass(self, prompt="", stream=None):
14551474
)
14561475
return self._input_request(
14571476
prompt,
1458-
self._shell_parent_ident.get(),
1477+
self._get_shell_context_var(self._shell_parent_ident),
14591478
self.get_parent("shell"),
14601479
password=True,
14611480
)
@@ -1472,7 +1491,7 @@ def raw_input(self, prompt=""):
14721491
raise StdinNotImplementedError(msg)
14731492
return self._input_request(
14741493
str(prompt),
1475-
self._shell_parent_ident.get(),
1494+
self._get_shell_context_var(self._shell_parent_ident),
14761495
self.get_parent("shell"),
14771496
password=False,
14781497
)

ipykernel/zmqshell.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,20 @@ def __init__(self, *args, **kwargs):
7373
super().__init__(*args, **kwargs)
7474
self._parent_header = contextvars.ContextVar("parent_header")
7575
self._parent_header.set({})
76+
self._parent_header_global = {}
7677

7778
@property
7879
def parent_header(self):
79-
return self._parent_header.get()
80+
try:
81+
return self._parent_header.get()
82+
except LookupError:
83+
return self._parent_header_global
8084

8185
def set_parent(self, parent):
8286
"""Set the parent for outbound messages."""
83-
self._parent_header.set(extract_header(parent))
87+
parent_header = extract_header(parent)
88+
self._parent_header.set(parent_header)
89+
self._parent_header_global = parent_header
8490

8591
def _flush_streams(self):
8692
"""flush IO Streams prior to display"""
@@ -698,11 +704,23 @@ def set_next_input(self, text, replace=False):
698704

699705
@property
700706
def parent_header(self):
701-
return self._parent_header.get()
707+
try:
708+
return self._parent_header.get()
709+
except LookupError:
710+
return self._parent_header_global
711+
712+
@parent_header.setter
713+
def parent_header(self, value):
714+
self._parent_header_global = value
715+
self._parent_header.set(value)
702716

703717
def set_parent(self, parent):
704-
"""Set the parent header for associating output with its triggering input"""
705-
self._parent_header.set(parent)
718+
"""Set the parent header for associating output with its triggering input
719+
720+
When called from a thread, sets the thread-local value, which persists
721+
until the next call from this thread.
722+
"""
723+
self.parent_header = parent
706724
self.displayhook.set_parent(parent) # type:ignore[attr-defined]
707725
self.display_pub.set_parent(parent) # type:ignore[attr-defined]
708726
if hasattr(self, "_data_pub"):
@@ -713,7 +731,12 @@ def set_parent(self, parent):
713731
sys.stderr.set_parent(parent)
714732

715733
def get_parent(self):
716-
"""Get the parent header."""
734+
"""Get the parent header.
735+
736+
If set_parent has never been called from the current thread,
737+
the value from the last call to set_parent from _any_ thread will be used
738+
(typically the currently running cell).
739+
"""
717740
return self.parent_header
718741

719742
def init_magics(self):

0 commit comments

Comments
 (0)