33# SPDX-License-Identifier: MIT
44#
55# MicroPython uasyncio module
6- # MIT license; Copyright (c) 2019-2020 Damien P. George
6+ # MIT license; Copyright (c) 2019-2022 Damien P. George
77#
88# This code comes from MicroPython, and has not been run through black or pylint there.
99# Altering these files significantly would make merging difficult, so we will not use
1919from . import core
2020
2121
22+ async def _run (waiter , aw ):
23+ try :
24+ result = await aw
25+ status = True
26+ except BaseException as er :
27+ result = None
28+ status = er
29+ if waiter .data is None :
30+ # The waiter is still waiting, cancel it.
31+ if waiter .cancel ():
32+ # Waiter was cancelled by us, change its CancelledError to an instance of
33+ # CancelledError that contains the status and result of waiting on aw.
34+ # If the wait_for task subsequently gets cancelled externally then this
35+ # instance will be reset to a CancelledError instance without arguments.
36+ waiter .data = core .CancelledError (status , result )
37+
2238async def wait_for (aw , timeout , sleep = core .sleep ):
2339 """Wait for the *aw* awaitable to complete, but cancel if it takes longer
2440 than *timeout* seconds. If *aw* is not a task then a task will be created
@@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep):
3652 if timeout is None :
3753 return await aw
3854
39- async def runner (waiter , aw ):
40- nonlocal status , result
41- try :
42- result = await aw
43- s = True
44- except BaseException as er :
45- s = er
46- if status is None :
47- # The waiter is still waiting, set status for it and cancel it.
48- status = s
49- waiter .cancel ()
50-
5155 # Run aw in a separate runner task that manages its exceptions.
52- status = None
53- result = None
54- runner_task = core .create_task (runner (core .cur_task , aw ))
56+ runner_task = core .create_task (_run (core .cur_task , aw ))
5557
5658 try :
5759 # Wait for the timeout to elapse.
5860 await sleep (timeout )
5961 except core .CancelledError as er :
60- if status is True :
61- # aw completed successfully and cancelled the sleep, so return aw's result.
62- return result
63- elif status is None :
62+ status = er .value
63+ if status is None :
6464 # This wait_for was cancelled externally, so cancel aw and re-raise.
65- status = True
6665 runner_task .cancel ()
6766 raise er
67+ elif status is True :
68+ # aw completed successfully and cancelled the sleep, so return aw's result.
69+ return er .args [1 ]
6870 else :
6971 # aw raised an exception, propagate it out to the caller.
7072 raise status
7173
7274 # The sleep finished before aw, so cancel aw and raise TimeoutError.
73- status = True
7475 runner_task .cancel ()
7576 await runner_task
7677 raise core .TimeoutError
@@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout):
8586 return wait_for (aw , timeout , core .sleep_ms )
8687
8788
88- async def gather (* aws , return_exceptions = False ):
89+ class _Remove :
90+ @staticmethod
91+ def remove (t ):
92+ pass
93+
94+
95+ def gather (* aws , return_exceptions = False ):
8996 """Run all *aws* awaitables concurrently. Any *aws* that are not tasks
9097 are promoted to tasks.
9198
9299 Returns a list of return values of all *aws*
93-
94- This is a coroutine.
95100 """
101+ def done (t , er ):
102+ # Sub-task "t" has finished, with exception "er".
103+ nonlocal state
104+ if gather_task .data is not _Remove :
105+ # The main gather task has already been scheduled, so do nothing.
106+ # This happens if another sub-task already raised an exception and
107+ # woke the main gather task (via this done function), or if the main
108+ # gather task was cancelled externally.
109+ return
110+ elif not return_exceptions and not isinstance (er , StopIteration ):
111+ # A sub-task raised an exception, indicate that to the gather task.
112+ state = er
113+ else :
114+ state -= 1
115+ if state :
116+ # Still some sub-tasks running.
117+ return
118+ # Gather waiting is done, schedule the main gather task.
119+ core ._task_queue .push (gather_task )
96120
97121 ts = [core ._promote_to_task (aw ) for aw in aws ]
98122 for i in range (len (ts )):
99- try :
100- # TODO handle cancel of gather itself
101- # if ts[i].coro:
102- # iter(ts[i]).waiting.push_head(cur_task)
103- # try:
104- # yield
105- # except CancelledError as er:
106- # # cancel all waiting tasks
107- # raise er
108- ts [i ] = await ts [i ]
109- except (core .CancelledError , Exception ) as er :
110- if return_exceptions :
111- ts [i ] = er
112- else :
113- raise er
123+ if ts [i ].state is not True :
124+ # Task is not running, gather not currently supported for this case.
125+ raise RuntimeError ("can't gather" )
126+ # Register the callback to call when the task is done.
127+ ts [i ].state = done
128+
129+ # Set the state for execution of the gather.
130+ gather_task = core .cur_task
131+ state = len (ts )
132+ cancel_all = False
133+
134+ # Wait for the a sub-task to need attention.
135+ gather_task .data = _Remove
136+ try :
137+ yield
138+ except core .CancelledError as er :
139+ cancel_all = True
140+ state = er
141+
142+ # Clean up tasks.
143+ for i in range (len (ts )):
144+ if ts [i ].state is done :
145+ # Sub-task is still running, deregister the callback and cancel if needed.
146+ ts [i ].state = True
147+ if cancel_all :
148+ ts [i ].cancel ()
149+ elif isinstance (ts [i ].data , StopIteration ):
150+ # Sub-task ran to completion, get its return value.
151+ ts [i ] = ts [i ].data .value
152+ else :
153+ # Sub-task had an exception with return_exceptions==True, so get its exception.
154+ ts [i ] = ts [i ].data
155+
156+ # Either this gather was cancelled, or one of the sub-tasks raised an exception with
157+ # return_exceptions==False, so reraise the exception here.
158+ if state is not 0 :
159+ raise state
160+
161+ # Return the list of return values of each sub-task.
114162 return ts
0 commit comments