Skip to content

Commit 8e6b438

Browse files
committed
new feature: task cancellation; license and setup.py added
1 parent c22ee2e commit 8e6b438

File tree

11 files changed

+330
-53
lines changed

11 files changed

+330
-53
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
.pypyenv*
99
.pytest_*
1010
.mypy_cache
11+
dist
12+
build
13+
*.egg-info
1114
__pycache__
1215
local_settings.py
1316

LICENSE.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2018 Oleg Mihaylov (gistart)
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy of
4+
this software and associated documentation files (the "Software"), to deal in
5+
the Software without restriction, including without limitation the rights to
6+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
7+
of the Software, and to permit persons to whom the Software is furnished to do
8+
so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19+
SOFTWARE.

README.md

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,22 @@ async def spawn_usage(todo=range(1,4)):
4141
futures.append(fut)
4242
# At this point some of the workers already started.
4343

44-
# Context manager calls `join()` at exit, so this will finish when all
44+
# Context manager calls `join` at exit, so this will finish when all
4545
# workers return, crash or cancelled.
4646

4747
assert sum(todo) == sum(fut.result() for fut in futures) # all done
4848

4949

5050
async def map_usage(todo=range(100)):
5151
pool = AioPool(size=10)
52-
# Joins internally, collects results from all spawned workers,
52+
# Waits and collects results from all spawned workers,
5353
# returns them in same order as `todo`, if worker crashes or cancelled:
5454
# returns exception object as a result.
5555
# Basically, it wraps `spawn_usage` code into one call.
5656
results = await pool.map(worker, todo)
5757

58+
# await pool.join() # is not needed here, bcs no other tasks were spawned
59+
5860
assert isinstance(results[0], ZeroDivisionError) \
5961
and sum(results[1:]) == sum(todo)
6062

@@ -77,6 +79,61 @@ async def callbacks_usage():
7779
pass # TODO
7880

7981

82+
async def exec_usage(todo=range(1,11)):
83+
async with AioPool(size=4) as pool:
84+
futures = await pool.map_n(worker, todo)
85+
86+
# While other workers are waiting or active, you can "synchronously"
87+
# execute one task. It does not interrupt others, just waits for pool
88+
# space, then waits for task to finish and then returns it's result.
89+
important_res = await pool.exec(worker(2))
90+
assert 2 == important_res
91+
92+
# You can continue working as usual:
93+
moar = await pool.spawn(worker(10))
94+
95+
assert sum(todo) == sum(f.result() for f in futures)
96+
97+
98+
async def cancel_usage():
99+
100+
async def wrk(*arg, **kw):
101+
await aio.sleep(0.5)
102+
return 1
103+
104+
pool = AioPool(size=2)
105+
106+
f_quick = await pool.spawn_n(aio.sleep(0.1))
107+
f12 = await pool.spawn(wrk()), await pool.spawn_n(wrk())
108+
f35 = await pool.map_n(wrk, range(3))
109+
110+
# At this point, if you cancel futures, returned by pool methods,
111+
# you just won't be able to retrieve spawned task results, task
112+
# themselves will continue working. Don't do this:
113+
# f_quick.cancel()
114+
# use `pool.cancel` instead:
115+
116+
# cancel some
117+
await aio.sleep(0.1)
118+
cancelled, results = await pool.cancel(f12[0], f35[2]) # running and waiting
119+
assert 2 == cancelled # none of them had time to finish
120+
assert 2 == len(results) and \
121+
all(isinstance(res, aio.CancelledError) for res in results)
122+
123+
# cancel all others
124+
await aio.sleep(0.1)
125+
126+
# not interrupted and finished successfully
127+
assert f_quick.done() and f_quick.result() is None
128+
129+
cancelled, results = await pool.cancel() # all
130+
assert 3 == cancelled
131+
assert len(results) == 3 and \
132+
all(isinstance(res, aio.CancelledError) for res in results)
133+
134+
assert await pool.join() # joins successfully
135+
136+
80137
async def details(todo=range(1,11)):
81138
pool = AioPool(size=5)
82139

@@ -109,15 +166,15 @@ async def details(todo=range(1,11)):
109166
await pool.join()
110167

111168
assert all(f.done() for f in itertools.chain(f1,f2)) # this is guaranteed
112-
169+
assert 2 * sum(todo) == sum(f.result() for f in itertools.chain(f1,f2))
113170
```
114171

115172

116173
## TODO:
117174

118175
- [ ] callbacks
119-
- [ ] setup
120-
- [ ] cancelled, timeouts
176+
- [x] setup
177+
- [x] cancelled, timeouts
121178
- [x] tests
122179
- [ ] loadtests
123180
- [x] usage

asyncio_pool/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# coding: utf8
22

33
import sys
4+
5+
from .utils import result_noraise
46
from .base_pool import BaseAioPool
57

68

@@ -9,6 +11,6 @@
911
class AioPool(BaseAioPool): pass
1012

1113
else:
12-
from .mx_asynciter import MxAsyncIterPool
14+
from .mx_asyncgen import MxAsyncGenPool
1315

14-
class AioPool(MxAsyncIterPool, BaseAioPool): pass
16+
class AioPool(MxAsyncGenPool, BaseAioPool): pass

asyncio_pool/base_pool.py

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@
44
import traceback
55
import collections
66
import asyncio as aio
7-
from .utils import _get_future_result
7+
from .utils import result_noraise
88

99

1010
class BaseAioPool(object):
11+
''' BaseAioPool implements features, supposed to work in all supported
12+
python versions. Other features supposed to be implemented as mixins.'''
1113

1214
def __init__(self, size=1024, *, loop=None):
1315
self.loop = loop or aio.get_event_loop()
1416

1517
self.size = size
1618
self._executed = 0
17-
self._joined = collections.deque()
18-
self._waiting = collections.deque()
19+
self._joined = set()
20+
self._waiting = {} # future -> task
21+
self._spawned = {} # future -> task
1922
self.semaphore = aio.Semaphore(value=self.size, loop=self.loop)
2023

2124
async def __aenter__(self):
@@ -41,7 +44,7 @@ async def join(self):
4144
return True
4245

4346
fut = self.loop.create_future()
44-
self._joined.append(fut)
47+
self._joined.add(fut)
4548
try:
4649
return await fut
4750
finally:
@@ -72,33 +75,46 @@ async def _wrap(self, coro, future, cb=None, ctx=None):
7275
return
7376

7477
self.semaphore.release()
75-
if not exc:
76-
future.set_result(res)
77-
else:
78-
future.set_exception(exc)
7978

79+
if not future.done():
80+
if exc:
81+
future.set_exception(exc)
82+
else:
83+
future.set_result(res)
84+
85+
del self._spawned[future]
8086
if self.is_empty:
8187
self._release_joined()
8288

8389
async def _spawn(self, future, coro, cb=None, ctx=None):
90+
acq_error = False
8491
try:
8592
await self.semaphore.acquire()
8693
except Exception as e:
87-
future.set_exception(e)
88-
self._waiting.remove(future)
89-
wrapped = self._wrap(coro, future, cb=cb, ctx=ctx)
90-
self.loop.create_task(wrapped)
94+
acq_error = True
95+
if not future.done():
96+
future.set_exception(e)
97+
finally:
98+
del self._waiting[future]
99+
100+
if future.done():
101+
if not acq_error and future.cancelled(): # outside action
102+
self.semaphore.release()
103+
else: # all good, can spawn now
104+
wrapped = self._wrap(coro, future, cb=cb, ctx=ctx)
105+
task = self.loop.create_task(wrapped)
106+
self._spawned[future] = task
91107
return future
92108

93109
async def spawn_n(self, coro, cb=None, ctx=None):
94110
future = self.loop.create_future()
95-
self._waiting.append(future)
96-
self.loop.create_task(self._spawn(future, coro, cb=cb, ctx=ctx))
111+
task = self.loop.create_task(self._spawn(future, coro, cb=cb, ctx=ctx))
112+
self._waiting[future] = task
97113
return future
98114

99115
async def spawn(self, coro, cb=None, ctx=None):
100116
future = self.loop.create_future()
101-
self._waiting.append(future)
117+
self._waiting[future] = self.loop.create_future() # TODO omg ???
102118
return await self._spawn(future, coro, cb=cb, ctx=ctx)
103119

104120
async def exec(self, coro, cb=None, ctx=None):
@@ -113,16 +129,36 @@ async def map_n(self, fn, iterable):
113129

114130
async def map(self, fn, iterable, exc_as_result=True):
115131
futures = await self.map_n(fn, iterable)
116-
await self.join()
117-
118-
results = []
119-
for fut in futures:
120-
res = _get_future_result(fut, exc_as_result)
121-
results.append(res)
122-
return results
132+
await aio.wait(futures)
133+
return [result_noraise(fut, exc_as_result) for fut in futures]
123134

124135
async def iterwait(self, *arg, **kw): # TODO there's a way to support 3.5?
125136
raise NotImplementedError('python3.6+ required')
126137

127138
async def itermap(self, *arg, **kw): # TODO there's a way to support 3.5?
128139
raise NotImplementedError('python3.6+ required')
140+
141+
def _cancel(self, *futures):
142+
tasks, _futures = [], []
143+
144+
if not len(futures): # meaning cancel all
145+
tasks.extend(self._waiting.values())
146+
tasks.extend(self._spawned.values())
147+
_futures.extend(self._waiting.keys())
148+
_futures.extend(self._spawned.keys())
149+
else:
150+
for fut in futures:
151+
task = self._spawned.get(fut, self._waiting.get(fut))
152+
if task:
153+
tasks.append(task)
154+
_futures.append(fut)
155+
156+
cancelled = sum([1 for fut in tasks if fut.cancel()])
157+
return cancelled, _futures
158+
159+
async def cancel(self, *futures, exc_as_result=True):
160+
cancelled, _futures = self._cancel(*futures)
161+
await aio.sleep(0) # let them actually cancel
162+
# need to collect them anyway, to supress warnings
163+
results = [result_noraise(fut, exc_as_result) for fut in _futures]
164+
return cancelled, results

asyncio_pool/mx_asynciter.py renamed to asyncio_pool/mx_asyncgen.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# coding: utf8
2-
'''Mixin for BaseAioPool with async iterator features python3.6+'''
2+
'''Mixin for BaseAioPool with async generator features, python3.6+'''
33

44
import asyncio as aio
5-
from .utils import _get_future_result
5+
from .utils import result_noraise
66

77

8-
class MxAsyncIterPool(object):
8+
class MxAsyncGenPool(object):
9+
''' Asynchronous generator wrapper for asyncio.wait.'''
910

1011
async def iterwait(self, futures, *, flat=True, exc_as_result=True,
1112
timeout=None, yield_when=aio.ALL_COMPLETED):
@@ -16,9 +17,9 @@ async def iterwait(self, futures, *, flat=True, exc_as_result=True,
1617
timeout=timeout, return_when=yield_when)
1718
if flat:
1819
for fut in done:
19-
yield _get_future_result(fut, exc_as_result)
20+
yield result_noraise(fut, exc_as_result)
2021
else:
21-
yield [_get_future_result(f, exc_as_result) for f in done]
22+
yield [result_noraise(f, exc_as_result) for f in done]
2223

2324
async def itermap(self, fn, iterable, *, flat=True, exc_as_result=True,
2425
timeout=None, yield_when=aio.ALL_COMPLETED):

asyncio_pool/utils.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# coding: utf8
22

3+
import asyncio as aio
34

4-
def _get_future_result(future, exc_as_result=True):
5-
if future.exception():
6-
return future.exception() if exc_as_result else None
7-
else:
5+
6+
def result_noraise(future, exc_as_result=True):
7+
try:
88
return future.result()
9+
except Exception as exc:
10+
return exc if exc_as_result else None

0 commit comments

Comments
 (0)