Skip to content

Commit 2ba03d2

Browse files
committed
greatly simplify parallel_bulk by using Pool.imap
1 parent 2208387 commit 2ba03d2

File tree

1 file changed

+13
-55
lines changed

1 file changed

+13
-55
lines changed

elasticsearch/helpers/parallel.py

Lines changed: 13 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,24 @@
11
from multiprocessing.dummy import Pool
2-
from queue import Empty, Queue
32

4-
from threading import Event
3+
from . import _process_bulk_chunk, _chunk_actions, expand_action
54

6-
from . import streaming_bulk
75

8-
def consume(queue, done):
6+
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
7+
max_chunk_bytes=100 * 1014 * 1024,
8+
expand_action_callback=expand_action, **kwargs):
99
"""
10-
Create an iterator on top of a Queue.
10+
Parallel version of the bulk helper.
1111
"""
12-
while True:
13-
try:
14-
yield queue.get(True, .01)
15-
except Empty:
16-
if done.is_set():
17-
break
12+
actions = map(expand_action_callback, actions)
1813

19-
def wrapped_bulk(client, input, output, done, **kwargs):
20-
"""
21-
Wrap a call to streaming_bulk by feeding it data frm a queue and writing
22-
the outputs to another queue.
23-
"""
24-
try:
25-
for result in streaming_bulk(client, consume(input, done), **kwargs):
26-
output.put(result)
27-
except:
28-
done.set()
29-
raise
30-
31-
def feed_data(actions, input, done):
32-
"""
33-
Feed data from an iterator into a queue.
34-
"""
35-
for a in actions:
36-
input.put(a, True)
37-
38-
# error short-circuit
39-
if done.is_set():
40-
break
41-
done.set()
42-
43-
44-
def parallel_bulk(client, actions, thread_count=5, **kwargs):
45-
"""
46-
Paralel version of the bulk helper. It runs a thread pool with a thread for
47-
a producer and ``thread_count`` threads for.
48-
"""
49-
done = Event()
50-
input, output = Queue(), Queue()
51-
pool = Pool(thread_count + 1)
52-
53-
results = [
54-
pool.apply_async(wrapped_bulk, (client, input, output, done), kwargs)
55-
for _ in range(thread_count)]
56-
pool.apply_async(feed_data, (actions, input, done))
14+
pool = Pool(thread_count)
5715

58-
while True:
59-
try:
60-
yield output.get(True, .01)
61-
except Empty:
62-
if done.is_set() and all(r.ready() for r in results):
63-
break
16+
for result in pool.imap(
17+
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
18+
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
19+
):
20+
for item in result:
21+
yield item
6422

6523
pool.close()
6624
pool.join()

0 commit comments

Comments
 (0)