Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions elasticsearch/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
pool.close()
pool.join()

def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, **kwargs):
def scan(client, query=None, scroll='5m', raise_on_error=True,
preserve_order=False, global_kwargs={}, search_kwargs=None,
scroll_kwargs=None, **kwargs):
"""
Simple abstraction on top of the
:meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that
Expand All @@ -254,6 +256,13 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa
cause the scroll to paginate with preserving the order. Note that this
can be an extremely expensive operation and can easily lead to
unpredictable results, use with caution.
:arg global_kwargs: additional kwargs to be passed to both
:meth:`~elasticsearch.Elasticsearch.search` and to
:meth:`~elasticsearch.Elasticsearch.scroll`
:arg search_kwargs: additional kwargs to be passed to
:meth:`~elasticsearch.Elasticsearch.search`
:arg scroll_kwargs: additional kwargs to be passed to
:meth:`~elasticsearch.Elasticsearch.scroll`

Any additional keyword arguments will be passed to the initial
:meth:`~elasticsearch.Elasticsearch.search` call::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have this here we should remove the search_kwargs and just use kwargs there, it would also be backwards compatible just like the current solution and simpler.

Expand All @@ -268,7 +277,10 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa
if not preserve_order:
kwargs['search_type'] = 'scan'
# initial search
resp = client.search(body=query, scroll=scroll, **kwargs)
search_kwargs = search_kwargs or {}
search_kwargs.update(global_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these need to be reversed - start with global_kwargs.copy() and then do .update(search_kwargs) - that way search_kwargs will have priority and will be able to override global_kwargs. Other way might be counter intuitive.

search_kwargs.update(kwargs)
resp = client.search(body=query, scroll=scroll, **search_kwargs)

scroll_id = resp.get('_scroll_id')
if scroll_id is None:
Expand All @@ -280,7 +292,9 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa
if preserve_order and first_run:
first_run = False
else:
resp = client.scroll(scroll_id, scroll=scroll)
scroll_kwargs = scroll_kwargs or {}
scroll_kwargs.update(global_kwargs)
resp = client.scroll(scroll_id, scroll=scroll, **scroll_kwargs)

for hit in resp['hits']['hits']:
yield hit
Expand Down
34 changes: 26 additions & 8 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from elasticsearch import helpers, TransportError
from elasticsearch import helpers, TransportError, NotFoundError

from . import ElasticsearchTestCase
from ..test_cases import SkipTest
Expand Down Expand Up @@ -184,26 +184,44 @@ def test_errors_are_collected_properly(self):


class TestScan(ElasticsearchTestCase):
def test_order_can_be_preserved(self):
def setUp(self):
super(TestScan, self).setUp()
bulk = []
for x in range(100):
bulk.append({"index": {"_index": "test_index", "_type": "answers", "_id": x}})
bulk.append({"answer": x, "correct": x == 42})
self.client.bulk(bulk, refresh=True)

def test_order_can_be_preserved(self):
docs = list(helpers.scan(self.client, index="test_index", doc_type="answers", size=2, query={"sort": ["answer"]}, preserve_order=True))

self.assertEquals(100, len(docs))
self.assertEquals(list(map(str, range(100))), list(d['_id'] for d in docs))
self.assertEquals(list(range(100)), list(d['_source']['answer'] for d in docs))

def test_all_documents_are_read(self):
bulk = []
for x in range(100):
bulk.append({"index": {"_index": "test_index", "_type": "answers", "_id": x}})
bulk.append({"answer": x, "correct": x == 42})
self.client.bulk(bulk, refresh=True)
def test_general_kwargs_forwarded_to_search(self):
inexistent_index = 'test_index_123'
self.assertRaises(
NotFoundError,
lambda: list(helpers.scan(self.client, index=inexistent_index, doc_type="answers", size=2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for the lambda here, self.assertRaises(NotFoundError, list, helpers.scan(...)) will do the trick

)
global_kwargs = {'ignore': 404}
list(helpers.scan(self.client, index=inexistent_index, doc_type="answers", size=2, global_kwargs=global_kwargs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to verify the response here, even when the real test is to see no error was raised (or at least have a comment to that effect so readers know what is being tested)


def test_general_kwargs_forwarded_to_scroll(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a complex and expensive test that can be a bit cofusing. Could we perhaps just have a unit test using a mocked client (using ElasticsearchTestCase you can find example of how to supply your own body here: https://github.com/elastic/elasticsearch-py/blob/master/test_elasticsearch/test_transport.py#L171) and just manually verify proper params were passed through for each call?

with self.assertRaises(NotFoundError):
for page in helpers.scan(self.client, index="test_index", doc_type="answers", size=2):
# Deleting the index after first request was done makes sure
# we test the scroll method.
self.client.indices.delete('test_index', ignore=404)
self.setUp()
# Still raises a scanning error, but gets to that point only because
# ignore=404 was forwarded to scroll.
with self.assertRaises(helpers.ScanError):
for page in helpers.scan(self.client, index="test_index", doc_type="answers", size=2, global_kwargs={'ignore': 404}):
self.client.indices.delete('test_index', ignore=404)

def test_all_documents_are_read(self):
docs = list(helpers.scan(self.client, index="test_index", doc_type="answers", size=2))

self.assertEquals(100, len(docs))
Expand Down