Skip to content
192 changes: 118 additions & 74 deletions async_generator/_impl.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,138 @@
import sys
from functools import wraps
from types import coroutine
from types import coroutine, CodeType
import inspect
from inspect import (
getcoroutinestate, CORO_CREATED, CORO_CLOSED, CORO_SUSPENDED
)
import collections.abc

# An async generator object (whether native in 3.6+ or the pure-Python
# version implemented below) is basically an async function with some
# extra wrapping logic. As an async function, it can call other async
# functions, which will probably at some point call a function that uses
# 'yield' to send traps to the event loop. Async generators also need
# to be able to send values to the context in which the generator is
# being iterated, and it's awfully convenient to be able to do that
# using 'yield' too. To distinguish between these two streams of
# yielded objects, the traps intended for the event loop are yielded
# as-is, and the values intended for the context that's iterating the
# generator are wrapped in some wrapper object (YieldWrapper here, or
# an internal Python type called AsyncGenWrappedValue in the native
# async generator implementation) before being yielded.
# The __anext__(), asend(), and athrow() methods of an async generator
# iterate the underlying async function until a wrapped value is received,
# and any non-wrapped values are passed through to the event loop.

# These functions are syntactically valid only on 3.6+, so we conditionally
# exec() the code defining them.
_native_asyncgen_helpers = """
async def _wrapper():
holder = [None]
while True:
# The simpler "value = None; while True: value = yield value"
# would hold a reference to the most recently wrapped value
# after it has been yielded out (until the next value to wrap
# comes in), so we use a one-element list instead.
holder.append((yield holder.pop()))
_wrapper = _wrapper()

async def _unwrapper():
@coroutine
def inner():
holder = [None]
while True:
holder.append((yield holder.pop()))
await inner()
yield None
_unwrapper = _unwrapper()
"""

if sys.implementation.name == "cpython" and sys.version_info >= (3, 6):
# On 3.6, with native async generators, we want to use the same
# wrapper type that native generators use. This lets @async_generators
# yield_from_ native async generators and vice versa.

import ctypes
from types import AsyncGeneratorType, GeneratorType
exec(_native_asyncgen_helpers)

# Transmute _wrapper to a regular generator object by modifying the
# ob_type field. The code object inside _wrapper will still think it's
# associated with an async generator, so it will yield out
# AsyncGenWrappedValues when it encounters a 'yield' statement;
# but the generator object will think it's a normal non-async
# generator, so it won't unwrap them. This way, we can obtain
# AsyncGenWrappedValues as normal manipulable Python objects.
#
# This sort of object type transmutation is categorically a Sketchy
# Thing To Do, because the functions associated with the new type
# (including tp_dealloc and so forth) will be operating on a
# structure whose in-memory layout matches that of the old type.
# In this case, it's OK, because async generator objects are just
# generator objects plus a few extra fields at the end; and these
# fields are two integers and a NULL-until-first-iteration object
# pointer ag_finalizer, so they don't hold any resources that need
# to be cleaned up. (We transmute the async generator to a regular
# generator before we first iterate it, so ag_finalizer stays NULL
# for the lifetime of the object, so failing to drop a reference
# to it during deallocation doesn't actually leak anything.)
# We have a unit test that verifies that __sizeof__() for generators
# and async generators continues to follow this pattern in future
# Python versions.

_type_p = ctypes.c_void_p.from_address(
id(_wrapper) + object().__sizeof__() - ctypes.sizeof(ctypes.c_void_p)
)
assert _type_p.value == id(AsyncGeneratorType)
_type_p.value = id(GeneratorType)

class YieldWrapper:
def __init__(self, payload):
self.payload = payload
supports_native_asyncgens = True

# Now _wrapper.send(x) returns an AsyncGenWrappedValue of x.
# We have to initially send(None) since the generator was just constructed;
# we remember the type of the return value (which is AsyncGenWrappedValue(None))
# to help with _is_wrapped.
YieldWrapper = type(_wrapper.send(None))

def _wrap(value):
return YieldWrapper(value)
# Advance _unwrapper to its first yield statement, for use by _unwrap().
_unwrapper.asend(None).send(None)

# Performance note: compared to the non-native-supporting implementation below,
# this _wrap() is about the same speed (434 +- 16 nsec here, 456 +- 24 nsec below)
# but this _unwrap() is much slower (1.17 usec vs 167 nsec). Since _unwrap is only
# needed on non-native generators, and we plan to have most @async_generators use
# native generators on 3.6+, this seems acceptable.

def _is_wrapped(box):
return isinstance(box, YieldWrapper)
_wrap = _wrapper.send

def _is_wrapped(box):
return isinstance(box, YieldWrapper)

def _unwrap(box):
return box.payload
def _unwrap(box):
try:
_unwrapper.asend(box).send(None)
except StopIteration as e:
return e.value
else:
raise TypeError("not wrapped")
else:
supports_native_asyncgens = False

class YieldWrapper:
__slots__ = ("payload",)

# This is the magic code that lets you use yield_ and yield_from_ with native
# generators.
#
# The old version worked great on Linux and MacOS, but not on Windows, because
# it depended on _PyAsyncGenValueWrapperNew. The new version segfaults
# everywhere, and I'm not sure why -- probably my lack of understanding
# of ctypes and refcounts.
#
# There are also some commented out tests that should be re-enabled if this is
# fixed:
#
# if sys.version_info >= (3, 6):
# # Use the same box type that the interpreter uses internally. This allows
# # yield_ and (more importantly!) yield_from_ to work in built-in
# # generators.
# import ctypes # mua ha ha.
#
# # We used to call _PyAsyncGenValueWrapperNew to create and set up new
# # wrapper objects, but that symbol isn't available on Windows:
# #
# # https://github.com/python-trio/async_generator/issues/5
# #
# # Fortunately, the type object is available, but it means we have to do
# # this the hard way.
#
# # We don't actually need to access this, but we need to make a ctypes
# # structure so we can call addressof.
# class _ctypes_PyTypeObject(ctypes.Structure):
# pass
# _PyAsyncGenWrappedValue_Type_ptr = ctypes.addressof(
# _ctypes_PyTypeObject.in_dll(
# ctypes.pythonapi, "_PyAsyncGenWrappedValue_Type"))
# _PyObject_GC_New = ctypes.pythonapi._PyObject_GC_New
# _PyObject_GC_New.restype = ctypes.py_object
# _PyObject_GC_New.argtypes = (ctypes.c_void_p,)
#
# _Py_IncRef = ctypes.pythonapi.Py_IncRef
# _Py_IncRef.restype = None
# _Py_IncRef.argtypes = (ctypes.py_object,)
#
# class _ctypes_PyAsyncGenWrappedValue(ctypes.Structure):
# _fields_ = [
# ('PyObject_HEAD', ctypes.c_byte * object().__sizeof__()),
# ('agw_val', ctypes.py_object),
# ]
# def _wrap(value):
# box = _PyObject_GC_New(_PyAsyncGenWrappedValue_Type_ptr)
# raw = ctypes.cast(ctypes.c_void_p(id(box)),
# ctypes.POINTER(_ctypes_PyAsyncGenWrappedValue))
# raw.contents.agw_val = value
# _Py_IncRef(value)
# return box
#
# def _unwrap(box):
# assert _is_wrapped(box)
# raw = ctypes.cast(ctypes.c_void_p(id(box)),
# ctypes.POINTER(_ctypes_PyAsyncGenWrappedValue))
# value = raw.contents.agw_val
# _Py_IncRef(value)
# return value
#
# _PyAsyncGenWrappedValue_Type = type(_wrap(1))
# def _is_wrapped(box):
# return isinstance(box, _PyAsyncGenWrappedValue_Type)
def __init__(self, payload):
self.payload = payload

def _wrap(value):
return YieldWrapper(value)

def _is_wrapped(box):
return isinstance(box, YieldWrapper)

def _unwrap(box):
return box.payload


# The magic @coroutine decorator is how you write the bottom level of
Expand Down
84 changes: 52 additions & 32 deletions async_generator/_tests/test_async_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import collections.abc
from functools import wraps
import gc
import inspect

from .conftest import mock_sleep
from .. import (
Expand Down Expand Up @@ -370,13 +371,12 @@ async def native_async_range(count):
for i in range(count):
yield i

# XX uncomment if/when we re-enable the ctypes hacks:
# async def native_async_range_twice(count):
# # make sure yield_from_ works inside a native async generator
# await yield_from_(async_range(count))
# yield None
# # make sure we can yield_from_ a native async generator
# await yield_from_(native_async_range(count))
async def native_async_range_twice(count, async_range):
# make sure yield_from_ works inside a native async generator
await yield_from_(async_range(count))
yield None
# make sure we can yield_from_ a native async generator
await yield_from_(native_async_range(count))
"""
)

Expand All @@ -400,11 +400,16 @@ async def yield_from_native():

assert await collect(yield_from_native()) == [0, 1, 2]

# XX uncomment if/when we re-enable the ctypes hacks:
# if sys.version_info >= (3, 6):
# assert await collect(native_async_range_twice(3)) == [
# 0, 1, 2, None, 0, 1, 2,
# ]
if sys.version_info >= (3, 6):
assert await collect(native_async_range_twice(3, async_range)) == [
0,
1,
2,
None,
0,
1,
2,
]


@async_generator
Expand Down Expand Up @@ -762,32 +767,47 @@ async def f():
@pytest.mark.skipif(not hasattr(sys, "getrefcount"), reason="CPython only")
def test_refcnt():
x = object()
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
base_count = sys.getrefcount(x)
l = [_impl._wrap(x) for _ in range(100)]
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
assert sys.getrefcount(x) >= base_count + 100
assert sys.getrefcount(x) == base_count + 100
l2 = [_impl._unwrap(box) for box in l]
assert sys.getrefcount(x) >= base_count + 200
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
assert sys.getrefcount(x) == base_count + 200
del l
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
assert sys.getrefcount(x) == base_count + 100
del l2
print(sys.getrefcount(x))
print(sys.getrefcount(x))
print(sys.getrefcount(x))
assert sys.getrefcount(x) == base_count
print(sys.getrefcount(x))


@pytest.mark.skipif(
not hasattr(None, "__sizeof__") or not hasattr(inspect, "isasyncgen"),
reason="CPython with native asyncgens only"
)
def test_gen_agen_size():
def gen(): # pragma: no cover
yield 42

dct = {}
exec("async def agen(): yield 50", dct)
agen = dct["agen"]

# As of CPython 3.7, an async generator object is a generator object plus one pointer
# (PyObject *ag_finalizer, nullptr before first iteration) and two ints
# (ag_hooks_inited, ag_closed). Since none of these members require any cleanup,
# our sketchy agen->gen type transmutation in _impl._wrapper is safe (at the time
# we do it, i.e., before first iteration). If the below assertion starts firing
# on a future Python version, someone will need to audit the new definition of
# PyAsyncGenObject (in CPython's Include/genobject.h) and make sure its new fields
# remain safe to ignore under the circumstances used in the _wrapper hack.
from ctypes import sizeof, c_int, c_void_p
gen_size = gen().__sizeof__()
agen_size = agen().__sizeof__()
expected_delta = 2 * sizeof(c_int) + sizeof(c_void_p)
assert gen_size + expected_delta == agen_size


def test_unwrap_not_wrapped():
with pytest.raises((TypeError, AttributeError)):
_impl._unwrap(42)


################################################################
Expand Down
11 changes: 8 additions & 3 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Basically:
That's it!


.. _yieldfrom:

Yield from
~~~~~~~~~~

Expand All @@ -47,9 +49,12 @@ But we do::
async def wrap_load_json_lines(stream_reader):
await yield_from_(load_json_lines(stream_reader))

You can only use ``yield_from_`` inside an ``@async_generator``
function, BUT the thing you PASS to ``yield_from_`` can be any kind of
async iterator, including native async generators.
You can use ``yield_from_`` inside a native async generator or an
``@async_generator`` function, and the argument can be any async
iterable. Remember that a native async generator is only created
when an ``async def`` block contains at least one ``yield``
statement; if you only ``yield_from_`` and never ``yield``,
use the ``@async_generator`` decorator.

Our ``yield_from_`` fully supports the classic ``yield from``
semantics, including forwarding ``asend`` and ``athrow`` calls into
Expand Down
1 change: 1 addition & 0 deletions newsfragments/16.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
On CPython 3.6+, restore the ability for a native async generator to ``yield_from_`` an ``@async_generator`` function and vice versa, by using the same value-wrapper type that Python uses internally if we're running on a Python version that supports native async generators. The new approach should be much less prone to refcounting bugs or to issues on different architectures.