Skip to content

Commit e4b0e7f

Browse files
committed
map uses spawn under the hood, instead of spawn_n; loadtest added
1 parent 8e6b438 commit e4b0e7f

File tree

4 files changed

+113
-6
lines changed

4 files changed

+113
-6
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@
44

55

66
## Usage
7-
(more in `tests/` and `examples/` dirs): # TODO
7+
8+
`spawn` and `map` methods is probably what you should use in 99% of cases. Their overhead is minimal (~3% execution time), and even in worst cases memory usage is insignificant.
9+
10+
`spawn_n`, `map_n` and `itermap` methods give you more control and flexibily, but they come with a price of higher overhead. They spawn all tasks that you want, and most of the tasks wait their turn "in background". If you spawn too much (10**6+ tasks) -- you'll use most of the memory you have in system, also you'll lose a lot of time on "concurrency management" of all the tasks spawned.
11+
12+
Play with `python tests/loadtest.py -h` to understand what you want to use.
13+
14+
#### Usage examples (more in `tests/` and `examples/` dirs):
815

916
```python
1017
async def worker(n): # dummy worker
@@ -176,7 +183,7 @@ async def details(todo=range(1,11)):
176183
- [x] setup
177184
- [x] cancelled, timeouts
178185
- [x] tests
179-
- [ ] loadtests
186+
- [x] loadtests
180187
- [x] usage
181188
- [ ] docs
182189
- [ ] examples

asyncio_pool/base_pool.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ async def map_n(self, fn, iterable):
128128
return futures
129129

130130
async def map(self, fn, iterable, exc_as_result=True):
131-
futures = await self.map_n(fn, iterable)
131+
futures = []
132+
for it in iterable:
133+
fut = await self.spawn(fn(it))
134+
futures.append(fut)
135+
132136
await aio.wait(futures)
133137
return [result_noraise(fut, exc_as_result) for fut in futures]
134138

tests/loadtest.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# coding: utf8
2+
3+
import os
4+
import sys
5+
curr_dir = os.path.dirname(os.path.abspath(__file__))
6+
sys.path.insert(0, os.path.split(curr_dir)[0])
7+
8+
import time
9+
import argparse
10+
import asyncio as aio
11+
from asyncio_pool import AioPool, result_noraise
12+
13+
14+
async def loadtest_spawn(tasks, pool_size, duration):
15+
futures = []
16+
async with AioPool(size=pool_size) as pool:
17+
for i in range(tasks):
18+
fut = await pool.spawn(aio.sleep(duration))
19+
futures.append(fut)
20+
21+
return [result_noraise(f) for f in futures]
22+
23+
24+
async def loadtest_spawn_n(tasks, pool_size, duration):
25+
futures = []
26+
async with AioPool(size=pool_size) as pool:
27+
for i in range(tasks):
28+
fut = await pool.spawn_n(aio.sleep(duration))
29+
futures.append(fut)
30+
31+
return [result_noraise(f) for f in futures]
32+
33+
34+
async def loadtest_map(tasks, pool_size, duration):
35+
async def wrk(i):
36+
await aio.sleep(duration)
37+
38+
async with AioPool(size=pool_size) as pool:
39+
return await pool.map(wrk, range(tasks))
40+
41+
42+
async def loadtest_itermap(tasks, pool_size, duration):
43+
async def wrk(i):
44+
await aio.sleep(duration)
45+
46+
results = []
47+
async with AioPool(size=pool_size) as pool:
48+
async for res in pool.itermap(wrk, range(tasks)):
49+
results.append(res)
50+
51+
return results
52+
53+
54+
def print_stats(args, exec_time):
55+
ideal = args.task_duration * (args.tasks / args.pool_size)
56+
57+
overhead = exec_time - ideal
58+
overhead_perc = ((exec_time / ideal) - 1) * 100
59+
60+
per_task = overhead / args.tasks
61+
per_task_perc = (((args.task_duration + per_task) / args.task_duration) - 1) * 100
62+
63+
print(f'{ideal:15.5f}s -- ideal result')
64+
print(f'{exec_time:15.5f}s -- were executing')
65+
print(f'{overhead:15.5f}s -- overhead total')
66+
print(f'{overhead_perc:15.5f}% -- overhead total percent')
67+
print(f'{per_task:15.5f}s -- overhead per task')
68+
print(f'{per_task_perc:15.5f}% -- overhead per task percent')
69+
70+
71+
if __name__ == "__main__":
72+
methods = {
73+
'spawn': loadtest_spawn,
74+
'spawn_n': loadtest_spawn_n,
75+
'map': loadtest_map,
76+
'itermap': loadtest_itermap,
77+
}
78+
79+
p = argparse.ArgumentParser()
80+
p.add_argument('method', choices=methods.keys())
81+
p.add_argument('--tasks', '-t', type=int, default=10**5)
82+
p.add_argument('--task-duration', '-d', type=float, default=0.2)
83+
p.add_argument('--pool-size', '-p', type=int, default=10**3)
84+
args = p.parse_args()
85+
86+
print('>>> Running %d tasks in pool of size=%s, each task takes %.3f sec.' %
87+
(args.tasks, args.pool_size, args.task_duration))
88+
print('>>> This will run more than %.5f seconds' %
89+
(args.task_duration * (args.tasks / args.pool_size)))
90+
91+
ts_start = time.perf_counter()
92+
m = methods.get(args.method)(args.tasks, args.pool_size, args.task_duration)
93+
aio.get_event_loop().run_until_complete(m)
94+
exec_time = time.perf_counter() - ts_start
95+
print_stats(args, exec_time)

tests/test_base.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# coding: utf8
22

3+
import sys
34
import pytest
45
import warnings
56
import asyncio as aio
@@ -70,17 +71,17 @@ async def outer(n, pool):
7071
joined = [loop.create_task(outer(j, pool)) for j in to_release]
7172
await pool.join()
7273

73-
assert len(released) < len(to_release)
74+
assert len(released) <= len(to_release)
7475
await aio.wait(joined)
7576
assert len(todo) == len(done) and len(released) == len(to_release)
7677

7778

78-
@pytest.mark.filterwarnings('ignore:coroutines') # doesn't work??
79+
# @pytest.mark.filterwarnings('ignore:coroutines') # doesn't work??
80+
@pytest.mark.skipif((3,6) <= sys.version_info < (3,7) , reason='no 3.6')
7981
@pytest.mark.asyncio
8082
async def test_cancel():
8183
# seems to break other tests in different files in py36 only
8284
# TODO investigate
83-
return True
8485

8586
async def wrk(*arg, **kw):
8687
await aio.sleep(0.5)

0 commit comments

Comments
 (0)