-
- Notifications
You must be signed in to change notification settings - Fork 32.3k
gh-124309: Modernize the staggered_race
implementation to support eager task factories #124390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gh-124309: Modernize the staggered_race
implementation to support eager task factories #124390
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup
I think I'm just going to refactor this to not use |
I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes |
I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated While we're here, |
A demo of what I mean wrt TaskGroup: """Support for running coroutines in parallel with staggered start times.""" __all__ = 'staggered_race', from . import locks from . import tasks from . import taskgroups async def staggered_race(coro_fns, delay, *, loop=None): """Run coroutines with staggered start times and take the first to finish. This method takes an iterable of coroutine functions. The first one is started immediately. From then on, whenever the immediately preceding one fails (raises an exception), or when *delay* seconds has passed, the next coroutine is started. This continues until one of the coroutines complete successfully, in which case all others are cancelled, or until all coroutines fail. The coroutines provided should be well-behaved in the following way: * They should only ``return`` if completed successfully. * They should always raise an exception if they did not complete successfully. In particular, if they handle cancellation, they should probably reraise, like this:: try: # do work except asyncio.CancelledError: # undo partially completed work raise Args: coro_fns: an iterable of coroutine functions, i.e. callables that return a coroutine object when called. Use ``functools.partial`` or lambdas to pass arguments. delay: amount of time, in seconds, between starting coroutines. If ``None``, the coroutines will run sequentially. loop: the event loop to use. Returns: tuple *(winner_result, winner_index, exceptions)* where - *winner_result*: the result of the winning coroutine, or ``None`` if no coroutines won. - *winner_index*: the index of the winning coroutine in ``coro_fns``, or ``None`` if no coroutines won. If the winning coroutine may return None on success, *winner_index* can be used to definitively determine whether any coroutine won. - *exceptions*: list of exceptions returned by the coroutines. ``len(exceptions)`` is equal to the number of coroutines actually started, and the order is the same as in ``coro_fns``. The winning coroutine's entry is ``None``. """ # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. winner_result = None winner_index = None exceptions = [] class _Done(Exception): pass async def run_one_coro(this_index, coro_fn, this_failed): try: result = await coro_fn() except (SystemExit, KeyboardInterrupt): raise except BaseException as e: exceptions[this_index] = e this_failed.set() # Kickstart the next coroutine else: # Store winner's results nonlocal winner_index, winner_result # There could be more than one winner winner_index = this_index winner_result = result raise _Done try: async with taskgroups.TaskGroup() as tg: for this_index, coro_fn in enumerate(coro_fns): this_failed = locks.Event() exceptions.append(None) tg.create_task(run_one_coro(this_index, coro_fn, this_failed)) try: await tasks.wait_for(this_failed.wait(), delay) except TimeoutError: pass except* _Done: pass return winner_result, winner_index, exceptions |
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst Outdated Show resolved Hide resolved
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZeroIntensity.
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Thanks @ZeroIntensity for the PR, and @kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13. |
…port eager task factories (pythonGH-124390) (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <zintensitydev@gmail.com> Co-authored-by: Thomas Grainger <tagrain@gmail.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com> Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Sorry, @ZeroIntensity and @kumaraditya303, I could not cleanly backport this to
|
GH-124573 is a backport of this pull request to the 3.13 branch. |
GH-124574 is a backport of this pull request to the 3.12 branch. |
…pport e… (#124574) gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390) Co-authored-by: Thomas Grainger <tagrain@gmail.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com> Co-authored-by: Kumar Aditya <kumaraditya@python.org> (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…n to support eager task factories (python#124390)" This reverts commit de929f3.
…wnstream (pythonGH-124810) * Revert "pythonGH-124639: add back loop param to staggered_race (pythonGH-124700)" This reverts commit e0a41a5. * Revert "pythongh-124309: Modernize the `staggered_race` implementation to support eager task factories (pythonGH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…ownstream (GH-124810) (#124817) gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) * Revert "GH-124639: add back loop param to staggered_race (GH-124700)" This reverts commit e0a41a5. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
Uh oh!
There was an error while loading. Please reload this page.