Skip to content

Commit ccef253

Browse files
committed
Merge branch 'parallel'
2 parents e3d25e8 + 2ba03d2 commit ccef253

File tree

2 files changed

+89
-55
lines changed

2 files changed

+89
-55
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def expand_action(data):
4040
return action, data.get('_source', data)
4141

4242
def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
43+
"""
44+
Split actions into chunks by number or size, serialize them into strings in
45+
the process.
46+
"""
4347
bulk_actions = []
4448
size, action_count = 0, 0
4549
for action, data in actions:
@@ -65,6 +69,64 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
6569
if bulk_actions:
6670
yield bulk_actions
6771

72+
def _process_bulk_chunk(client, bulk_actions, raise_on_exception=True, raise_on_error=True, **kwargs):
73+
"""
74+
Send a bulk request to elasticsearch and process the output.
75+
"""
76+
# if raise on error is set, we need to collect errors per chunk before raising them
77+
errors = []
78+
79+
try:
80+
# send the actual request
81+
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
82+
except TransportError as e:
83+
# default behavior - just propagate exception
84+
if raise_on_exception:
85+
raise e
86+
87+
# if we are not propagating, mark all actions in current chunk as failed
88+
err_message = str(e)
89+
exc_errors = []
90+
91+
# deserialize the data back, thisis expensive but only run on
92+
# errors if raise_on_exception is false, so shouldn't be a real
93+
# issue
94+
bulk_data = iter(map(client.transport.serializer.loads, bulk_actions))
95+
while True:
96+
try:
97+
# collect all the information about failed actions
98+
action = next(bulk_data)
99+
op_type, action = action.popitem()
100+
info = {"error": err_message, "status": e.status_code, "exception": e}
101+
if op_type != 'delete':
102+
info['data'] = next(bulk_data)
103+
info.update(action)
104+
exc_errors.append({op_type: info})
105+
except StopIteration:
106+
break
107+
108+
# emulate standard behavior for failed actions
109+
if raise_on_error:
110+
raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
111+
else:
112+
for err in exc_errors:
113+
yield False, err
114+
return
115+
116+
# go through request-reponse pairs and detect failures
117+
for op_type, item in map(methodcaller('popitem'), resp['items']):
118+
ok = 200 <= item.get('status', 500) < 300
119+
if not ok and raise_on_error:
120+
errors.append({op_type: item})
121+
122+
if ok or not errors:
123+
# if we are not just recording all errors to be able to raise
124+
# them all at once, yield items individually
125+
yield ok, {op_type: item}
126+
127+
if errors:
128+
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
129+
68130
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024,
69131
raise_on_error=True, expand_action_callback=expand_action,
70132
raise_on_exception=True, **kwargs):
@@ -122,63 +184,11 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 *
122184
should return a tuple containing the action line and the data line
123185
(`None` if data line should be omitted).
124186
"""
125-
serializer = client.transport.serializer
126187
actions = map(expand_action_callback, actions)
127188

128-
# if raise on error is set, we need to collect errors per chunk before raising them
129-
errors = []
130-
131-
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
132-
try:
133-
# send the actual request
134-
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
135-
except TransportError as e:
136-
# default behavior - just propagate exception
137-
if raise_on_exception:
138-
raise e
139-
140-
# if we are not propagating, mark all actions in current chunk as failed
141-
err_message = str(e)
142-
exc_errors = []
143-
144-
# deserialize the data back, thisis expensive but only run on
145-
# errors if raise_on_exception is false, so shouldn't be a real
146-
# issue
147-
bulk_data = iter(map(serializer.loads, bulk_actions))
148-
while True:
149-
try:
150-
# collect all the information about failed actions
151-
action = next(bulk_data)
152-
op_type, action = action.popitem()
153-
info = {"error": err_message, "status": e.status_code, "exception": e}
154-
if op_type != 'delete':
155-
info['data'] = next(bulk_data)
156-
info.update(action)
157-
exc_errors.append({op_type: info})
158-
except StopIteration:
159-
break
160-
161-
# emulate standard behavior for failed actions
162-
if raise_on_error:
163-
raise BulkIndexError('%i document(s) failed to index.' % len(exc_errors), exc_errors)
164-
else:
165-
for err in exc_errors:
166-
yield False, err
167-
continue
168-
169-
# go through request-reponse pairs and detect failures
170-
for op_type, item in map(methodcaller('popitem'), resp['items']):
171-
ok = 200 <= item.get('status', 500) < 300
172-
if not ok and raise_on_error:
173-
errors.append({op_type: item})
174-
175-
if not errors:
176-
# if we are not just recording all errors to be able to raise
177-
# them all at once, yield items individually
178-
yield ok, {op_type: item}
179-
180-
if errors:
181-
raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
189+
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
190+
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
191+
yield result
182192

183193
def bulk(client, actions, stats_only=False, **kwargs):
184194
"""

elasticsearch/helpers/parallel.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from multiprocessing.dummy import Pool
2+
3+
from . import _process_bulk_chunk, _chunk_actions, expand_action
4+
5+
6+
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
7+
max_chunk_bytes=100 * 1014 * 1024,
8+
expand_action_callback=expand_action, **kwargs):
9+
"""
10+
Parallel version of the bulk helper.
11+
"""
12+
actions = map(expand_action_callback, actions)
13+
14+
pool = Pool(thread_count)
15+
16+
for result in pool.imap(
17+
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
18+
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
19+
):
20+
for item in result:
21+
yield item
22+
23+
pool.close()
24+
pool.join()

0 commit comments

Comments
 (0)