Skip to content

Commit 5d4640d

Browse files
committed
Move parallel_bulk helper into helpers/__init__
1 parent a7d66c8 commit 5d4640d

File tree

2 files changed

+33
-26
lines changed

2 files changed

+33
-26
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import unicode_literals
22

33
import logging
4+
from multiprocessing.dummy import Pool
45
from operator import methodcaller
56

67
from ..exceptions import ElasticsearchException, TransportError
@@ -226,8 +227,38 @@ def bulk(client, actions, stats_only=False, **kwargs):
226227

227228
return success, failed if stats_only else errors
228229

229-
# preserve the name for backwards compatibility
230-
bulk_index = bulk
230+
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
231+
max_chunk_bytes=100 * 1014 * 1024,
232+
expand_action_callback=expand_action, **kwargs):
233+
"""
234+
Parallel version of the bulk helper.
235+
236+
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
237+
:arg actions: iterator containing the actions
238+
:arg thread_count: size of the threadpool to use for the bulk requests
239+
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
240+
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
241+
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
242+
from the execution of the last chunk when some occur. By default we raise.
243+
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
244+
call to ``bulk`` and just report the items that failed as failed.
245+
:arg expand_action_callback: callback executed on each action passed in,
246+
should return a tuple containing the action line and the data line
247+
(`None` if data line should be omitted).
248+
"""
249+
actions = map(expand_action_callback, actions)
250+
251+
pool = Pool(thread_count)
252+
253+
for result in pool.imap(
254+
lambda chunk: list(_process_bulk_chunk(client, chunk, **kwargs)),
255+
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
256+
):
257+
for item in result:
258+
yield item
259+
260+
pool.close()
261+
pool.join()
231262

232263
def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, **kwargs):
233264
"""

elasticsearch/helpers/parallel.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)