Skip to content

Commit a0e1bf6

Browse files
committed
Experimental helper for doing bulk requests in parallel
1 parent 3400179 commit a0e1bf6

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

elasticsearch/helpers/parallel.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from multiprocessing.dummy import Pool
2+
from queue import Empty, Queue
3+
4+
from threading import Event
5+
6+
from . import streaming_bulk
7+
8+
def consume(queue, done):
9+
"""
10+
Create an iterator on top of a Queue.
11+
"""
12+
while True:
13+
try:
14+
yield queue.get(True, .01)
15+
except Empty:
16+
if done.is_set():
17+
break
18+
19+
def wrapped_bulk(client, input, output, done, **kwargs):
20+
"""
21+
Wrap a call to streaming_bulk by feeding it data frm a queue and writing
22+
the outputs to another queue.
23+
"""
24+
try:
25+
for result in streaming_bulk(client, consume(input, done), **kwargs):
26+
output.put(result)
27+
except:
28+
done.set()
29+
raise
30+
31+
def feed_data(actions, input, done):
32+
"""
33+
Feed data from an iterator into a queue.
34+
"""
35+
for a in actions:
36+
input.put(a, True)
37+
38+
# error short-circuit
39+
if done.is_set():
40+
break
41+
done.set()
42+
43+
44+
def parallel_bulk(client, actions, thread_count=5, **kwargs):
45+
"""
46+
Paralel version of the bulk helper. It runs a thread pool with a thread for
47+
a producer and ``thread_count`` threads for.
48+
"""
49+
done = Event()
50+
input, output = Queue(), Queue()
51+
pool = Pool(thread_count + 1)
52+
53+
results = [
54+
pool.apply_async(wrapped_bulk, (client, input, output, done), kwargs)
55+
for _ in range(thread_count)]
56+
pool.apply_async(feed_data, (actions, input, done))
57+
58+
while True:
59+
try:
60+
yield output.get(True, .01)
61+
except Empty:
62+
if done.is_set() and all(r.ready() for r in results):
63+
break
64+
65+
pool.close()
66+
pool.join()

0 commit comments

Comments
 (0)