Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pytest_asyncio/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def pytest_runtest_setup(item):
# to marked test functions
_markers_2_fixtures = {
'asyncio': 'event_loop',
'asyncio_clock': 'clock_event_loop',
}


Expand All @@ -204,6 +205,56 @@ def event_loop(request):
loop.close()


def _clock_event_loop_class():
"""
Create a new class for ClockEventLoop based on the current
class-type produced by `asyncio.new_event_loop()`. This is important
for instances in which the enent-loop-policy has been changed.
"""
class ClockEventLoop(asyncio.new_event_loop().__class__):
"""
A custom event loop that explicitly advances time when requested. Otherwise,
this event loop advances time as expected.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._offset = 0

def time(self):
"""
Return the time according the event loop's clock.

This time is adjusted by the stored offset that allows for advancement
with `advance_time`.
"""
return super().time() + self._offset

def advance_time(self, seconds):
'''
Advance time by a given offset in seconds. Returns an awaitable
that will complete after all tasks scheduled for after advancement
of time are proceeding.
'''
if seconds > 0:
# advance the clock by the given offset
self._offset += seconds

# Once the clock is adjusted, new tasks may have just been
# scheduled for running in the next pass through the event loop
return self.create_task(asyncio.sleep(0))

return ClockEventLoop


@pytest.yield_fixture
def clock_event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = _clock_event_loop_class()()
asyncio.get_event_loop_policy().set_event_loop(loop)
yield loop
loop.close()


def _unused_tcp_port():
"""Find an unused localhost TCP port from 1024-65535 and return it."""
with contextlib.closing(socket.socket()) as sock:
Expand Down
38 changes: 38 additions & 0 deletions tests/test_simple_35.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,41 @@ async def test_asyncio_marker_method(self, event_loop):
def test_async_close_loop(event_loop):
event_loop.close()
return 'ok'


@pytest.mark.asyncio_clock
async def test_mark_asyncio_clock():
"""
Test that coroutines marked with asyncio_clock are run with a ClockEventLoop
"""
assert hasattr(asyncio.get_event_loop(), 'advance_time')


def test_clock_loop_loop_fixture(clock_event_loop):
"""
Test that the clock_event_loop fixture returns a proper instance of the loop
"""
assert hasattr(asyncio.get_event_loop(), 'advance_time')
clock_event_loop.close()
return 'ok'


@pytest.mark.asyncio_clock
async def test_clock_loop_advance_time(clock_event_loop):
"""
Test the sliding time event loop fixture
"""
# A task is created that will sleep some number of seconds
SLEEP_TIME = 10

# create the task
task = clock_event_loop.create_task(asyncio.sleep(SLEEP_TIME))
assert not task.done()

# start the task
await clock_event_loop.advance_time(0)
assert not task.done()

# process the timeout
await clock_event_loop.advance_time(SLEEP_TIME)
assert task.done()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make test reliable I tend to await explicitly:

TIMEOUT=1 #define earlier in the file await asyncio.wait_for(task, TIMEOUT)

(and change short_nap/advance to 10)

I do this because though advance_time marks task's sleep as finished, but the exact number of event loop iterations between this and the actual closing of the task is implementation-dependent.