Skip to content

Commit 16f4958

Browse files
committed
proper callbacks support, usage and tests for it; docstrings; readme;
1 parent e4b0e7f commit 16f4958

File tree

14 files changed

+601
-68
lines changed

14 files changed

+601
-68
lines changed

README.md

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,49 @@
11
# asyncio-pool
22

3-
###### Supports python 3.5+ (including PyPy 6+, which is also 3.5)
3+
Pool of asyncio coroutines with familiar interface. Supports python 3.5+ (including PyPy 6+, which is also 3.5 atm)
4+
5+
AioPool makes sure _no more_ and _no less_ (if possible) than `size` spawned coroutines are active at the same time. _spawned_ means created and scheduled with one of the pool interface methods, _active_ means coroutine function started executing it's code, as opposed to _waiting_ -- which waits for pool space without entering coroutine function.
6+
7+
## Interface
8+
9+
Read [code doctrings](../blob/master/asyncio_pool/base_pool.py) for details.
10+
11+
##### AioPool(size=4, *, loop=None)
12+
13+
Creates pool of `size` concurrent tasks. Supports async context manager interface.
14+
15+
##### spawn(coro, cb=None, ctx=None)
16+
17+
Waits for pool space, then creates task for `coro` coroutine, returning future for it's result. Can spawn coroutine, created by `cb` with result of `coro` as first argument. `ctx` context is passed to callback as third positinal argument.
18+
19+
##### exec(coro, cb=None, ctx=None)
20+
21+
Waits for pool space, then creates task for `coro`, then waits for it to finish, then returns result of `coro` if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.
22+
23+
##### spawn_n(coro, cb=None, ctx=None)
24+
25+
Creates waiting task for `coro`, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.
26+
27+
##### join()
28+
29+
Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to *deadlock*.
30+
31+
##### cancel(*futures)
32+
33+
Cancels spawned tasks (active and waiting), finding them by provided `futures`. If no futures provided -- cancels all spawned tasks.
34+
35+
##### map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
36+
37+
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn`, waits for all of them to finish (including callbacks), returns results maintaining order of `iterable`.
38+
39+
##### map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
40+
41+
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn_n`, returns futures for task results maintaining order of `iterable`.
42+
43+
##### itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)
44+
45+
Spawns tasks with `map_n(fn, iterable, cb, ctx)`, then waits for results with `asyncio.wait` function, yielding ready results one by one if `flat` == True, otherwise yielding list of ready results.
46+
447

548

649
## Usage
@@ -11,9 +54,11 @@
1154

1255
Play with `python tests/loadtest.py -h` to understand what you want to use.
1356

14-
#### Usage examples (more in `tests/` and `examples/` dirs):
57+
Usage examples (more in `tests/` and `examples/` dirs):
1558

1659
```python
60+
61+
1762
async def worker(n): # dummy worker
1863
await aio.sleep(1 / n)
1964
return n
@@ -83,7 +128,47 @@ async def itermap_usage(todo=range(1,11)):
83128

84129

85130
async def callbacks_usage():
86-
pass # TODO
131+
132+
async def wrk(n): # custom dummy worker
133+
await aio.sleep(1 / n)
134+
return n
135+
136+
async def cb(res, err, ctx): # callback
137+
if err: # error handling
138+
exc, tb = err
139+
assert tb # the only purpose of this is logging
140+
return exc
141+
142+
pool, n = ctx # context can be anything you like
143+
await aio.sleep(1 / (n-1))
144+
return res + n
145+
146+
todo = range(5)
147+
futures = []
148+
149+
async with AioPool(size=2) as pool:
150+
for i in todo:
151+
fut = await pool.spawn_n(wrk(i), cb, (pool, i))
152+
futures.append(fut)
153+
154+
results = []
155+
for fut in futures:
156+
# there's a helper with this code:
157+
# from asyncio_pool import result_noraise
158+
# results.append(result_noraise(fut))
159+
try:
160+
results.append(fut.result())
161+
except Exception as e:
162+
results.append(e)
163+
164+
# First error happens for n == 0 in wrk, exception of it is passed to
165+
# callback, callback returns it to us. Second one happens in callback itself
166+
# and is passed to us by pool.
167+
assert all(isinstance(e, ZeroDivisionError) for e in results[:2])
168+
169+
# All n's in `todo` are passed through `wrk` and `cb` (cb adds wrk result
170+
# and # number, passed inside context), except for n == 0 and n == 1.
171+
assert sum(results[2:]) == 2 * (sum(todo) - 0 - 1)
87172

88173

89174
async def exec_usage(todo=range(1,11)):
@@ -174,17 +259,6 @@ async def details(todo=range(1,11)):
174259

175260
assert all(f.done() for f in itertools.chain(f1,f2)) # this is guaranteed
176261
assert 2 * sum(todo) == sum(f.result() for f in itertools.chain(f1,f2))
177-
```
178262

179263

180-
## TODO:
181-
182-
- [ ] callbacks
183-
- [x] setup
184-
- [x] cancelled, timeouts
185-
- [x] tests
186-
- [x] loadtests
187-
- [x] usage
188-
- [ ] docs
189-
- [ ] examples
190-
- [ ] readme
264+
```

asyncio_pool/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
class AioPool(BaseAioPool): pass
1212

1313
else:
14-
from .mx_asyncgen import MxAsyncGenPool
14+
from .mx_asyncgen import MxAsyncGenPool, iterwait
1515

1616
class AioPool(MxAsyncGenPool, BaseAioPool): pass

asyncio_pool/base_pool.py

Lines changed: 126 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,36 @@
22
'''Pool of asyncio coroutines with familiar interface, python3.5+ friendly'''
33

44
import traceback
5-
import collections
65
import asyncio as aio
76
from .utils import result_noraise
87

98

109
class BaseAioPool(object):
11-
''' BaseAioPool implements features, supposed to work in all supported
12-
python versions. Other features supposed to be implemented as mixins.'''
10+
# python3.5 friendly
1311

1412
def __init__(self, size=1024, *, loop=None):
13+
'''Pool of asyncio coroutines with familiar interface.
14+
15+
Pool makes sure _no more_ and _no less_ (if possible) than `size`
16+
spawned coroutines are active at the same time. _spawned_ means created
17+
and scheduled with one of the pool interface methods, _active_ means
18+
coroutine function started executing it's code, as opposed to
19+
_waiting_ -- which waits for pool space without entering coroutine
20+
function.
21+
22+
Support asynchronous context management protocol (`aenter`, `aexit`).
23+
24+
The main idea behind spwaning methods is -- they return newly created
25+
futures, not "native" ones, returned by `pool.create_task` or used for
26+
`await`. Read more about this in readme and docstrings below.
27+
'''
1528
self.loop = loop or aio.get_event_loop()
1629

1730
self.size = size
1831
self._executed = 0
1932
self._joined = set()
2033
self._waiting = {} # future -> task
21-
self._spawned = {} # future -> task
34+
self._active = {} # future -> task
2235
self.semaphore = aio.Semaphore(value=self.size, loop=self.loop)
2336

2437
async def __aenter__(self):
@@ -29,17 +42,23 @@ async def __aexit__(self, ext_type, exc, tb):
2942

3043
@property
3144
def n_active(self):
45+
'''Counts active coroutines'''
3246
return self.size - self.semaphore._value
3347

3448
@property
3549
def is_empty(self):
50+
'''Returns `True` if no coroutines are active or waiting.'''
3651
return 0 == len(self._waiting) == self.n_active
3752

3853
@property
3954
def is_full(self):
55+
'''Returns `True` if `size` coroutines are already active.'''
4056
return self.size <= len(self._waiting) + self.n_active
4157

4258
async def join(self):
59+
'''Waits (blocks) for all spawned coroutines to finish, both active and
60+
waiting. *Do not `join` inside spawned coroutine*.'''
61+
4362
if self.is_empty:
4463
return True
4564

@@ -58,19 +77,48 @@ def _release_joined(self):
5877
if not fut.done():
5978
fut.set_result(True)
6079

80+
def _build_callback(self, cb, res, err=None, ctx=None):
81+
# not sure if this is a safe code( in case any error:
82+
# return cb(res, err, ctx), None
83+
84+
bad_cb = RuntimeError('cb should accept at least one argument')
85+
to_pass = (res, err, ctx)
86+
87+
nargs = cb.__code__.co_argcount
88+
if nargs == 0:
89+
return None, bad_cb
90+
91+
# trusting user here, better ideas?
92+
if cb.__code__.co_varnames[0] in ('self', 'cls'):
93+
nargs -= 1 # class/instance method, skip first arg
94+
95+
if nargs == 0:
96+
return None, bad_cb
97+
98+
try:
99+
return cb(*to_pass[:nargs]), None
100+
except Exception as e:
101+
return None, e
102+
61103
async def _wrap(self, coro, future, cb=None, ctx=None):
62104
res, exc, tb = None, None, None
63105
try:
64106
res = await coro
65107
except Exception as _exc:
66108
exc = _exc
67-
tb = traceback.format_exc() # TODO tb object instead of text
109+
tb = traceback.format_exc()
68110
finally:
69111
self._executed += 1
70112

71-
if cb:
113+
while cb:
72114
err = None if exc is None else (exc, tb)
73-
wrapped = self._wrap(cb(res, err, ctx), future)
115+
116+
_cb, _cb_err = self._build_callback(cb, res, err, ctx)
117+
if _cb_err is not None:
118+
exc = _cb_err # pass to future
119+
break
120+
121+
wrapped = self._wrap(_cb, future)
74122
self.loop.create_task(wrapped)
75123
return
76124

@@ -82,7 +130,7 @@ async def _wrap(self, coro, future, cb=None, ctx=None):
82130
else:
83131
future.set_result(res)
84132

85-
del self._spawned[future]
133+
del self._active[future]
86134
if self.is_empty:
87135
self._release_joined()
88136

@@ -103,34 +151,81 @@ async def _spawn(self, future, coro, cb=None, ctx=None):
103151
else: # all good, can spawn now
104152
wrapped = self._wrap(coro, future, cb=cb, ctx=ctx)
105153
task = self.loop.create_task(wrapped)
106-
self._spawned[future] = task
154+
self._active[future] = task
107155
return future
108156

157+
async def spawn(self, coro, cb=None, ctx=None):
158+
'''Waits for pool space and creates task for given `coro` coroutine,
159+
returns a future for it's result.
160+
161+
If callback `cb` coroutine function (not coroutine itself!) is passed,
162+
`coro` result won't be assigned to created future, instead, `cb` will
163+
be executed with it as a first positional argument. Callback function
164+
should accept 1,2 or 3 positional arguments. Full callback sigature is
165+
`cb(res, err, ctx)`. It makes no sense to create a callback without
166+
`coro` result, so first positional argument is mandatory.
167+
168+
Second positional argument of callback will be error, which `is None`
169+
if coroutine did not crash and wasn't cancelled. In case any exception
170+
was raised during `coro` execution, error will be a tuple containing
171+
(`exc` exception object, `tb` traceback string). if you wish to ignore
172+
errors, you can pass callback without seconds and third positional
173+
arguments.
174+
175+
If context `ctx` is passed to `spawn`, it will be re-sent to callback
176+
as third argument. If you don't plan to use any context, you can create
177+
callback with positional arguments only for result and error.
178+
'''
179+
future = self.loop.create_future()
180+
self._waiting[future] = self.loop.create_future() # as a placeholder
181+
return await self._spawn(future, coro, cb=cb, ctx=ctx)
182+
109183
async def spawn_n(self, coro, cb=None, ctx=None):
184+
'''Creates waiting task for given `coro` regardless of pool space. If
185+
pool is not full, this task will be executed very soon. Main difference
186+
is that `spawn_n` does not block and returns future very quickly.
187+
188+
Read more about callbacks in `spawn` docstring.
189+
'''
110190
future = self.loop.create_future()
111191
task = self.loop.create_task(self._spawn(future, coro, cb=cb, ctx=ctx))
112192
self._waiting[future] = task
113193
return future
114194

115-
async def spawn(self, coro, cb=None, ctx=None):
116-
future = self.loop.create_future()
117-
self._waiting[future] = self.loop.create_future() # TODO omg ???
118-
return await self._spawn(future, coro, cb=cb, ctx=ctx)
119-
120195
async def exec(self, coro, cb=None, ctx=None):
121-
return await (await self.spawn(coro, cb=cb, ctx=ctx))
196+
'''Waits for pool space, then waits for `coro` (and it's callback if
197+
passed) to finish, returning result of `coro` or callback (if passed),
198+
or raising error if smth crashed in process or was cancelled.
199+
200+
Read more about callbacks in `spawn` docstring.
201+
'''
202+
return await (await self.spawn(coro, cb, ctx))
122203

123-
async def map_n(self, fn, iterable):
204+
async def map_n(self, fn, iterable, cb=None, ctx=None):
205+
'''Creates coroutine with `fn` function for each item in `iterable`,
206+
spawns each of them with `spawn_n`, returning futures.
207+
208+
Read more about callbacks in `spawn` docstring.
209+
'''
124210
futures = []
125211
for it in iterable:
126-
fut = await self.spawn_n(fn(it))
212+
fut = await self.spawn_n(fn(it), cb, ctx)
127213
futures.append(fut)
128214
return futures
129215

130-
async def map(self, fn, iterable, exc_as_result=True):
216+
async def map(self, fn, iterable, cb=None, ctx=None, *, exc_as_result=True):
217+
'''Spawns coroutines, created with `fn` function for each item in
218+
`iterable`, waits for all of them to finish, crash or be cancelled,
219+
returning resuls.
220+
221+
If coroutine or callback crashes or is cancelled, with `exc_as_result`
222+
== True exceptions object will be returned, with == False -- just None.
223+
224+
Read more about callbacks in `spawn` docstring.
225+
'''
131226
futures = []
132227
for it in iterable:
133-
fut = await self.spawn(fn(it))
228+
fut = await self.spawn(fn(it), cb, ctx)
134229
futures.append(fut)
135230

136231
await aio.wait(futures)
@@ -147,12 +242,12 @@ def _cancel(self, *futures):
147242

148243
if not len(futures): # meaning cancel all
149244
tasks.extend(self._waiting.values())
150-
tasks.extend(self._spawned.values())
245+
tasks.extend(self._active.values())
151246
_futures.extend(self._waiting.keys())
152-
_futures.extend(self._spawned.keys())
247+
_futures.extend(self._active.keys())
153248
else:
154249
for fut in futures:
155-
task = self._spawned.get(fut, self._waiting.get(fut))
250+
task = self._active.get(fut, self._waiting.get(fut))
156251
if task:
157252
tasks.append(task)
158253
_futures.append(fut)
@@ -161,6 +256,15 @@ def _cancel(self, *futures):
161256
return cancelled, _futures
162257

163258
async def cancel(self, *futures, exc_as_result=True):
259+
'''Cancels spawned or waiting tasks, found by their `futures`. If no
260+
`futures` are passed -- cancels all spwaned and waiting tasks.
261+
262+
Cancelling futures, returned by pool methods, usually won't help you
263+
to cancel executing tasks, so you have to use this method.
264+
265+
Returns tuple of (`cancelled` count of cancelled tasks, `results`
266+
collected from futures of cancelled tasks).
267+
'''
164268
cancelled, _futures = self._cancel(*futures)
165269
await aio.sleep(0) # let them actually cancel
166270
# need to collect them anyway, to supress warnings

0 commit comments

Comments
 (0)