- Notifications
You must be signed in to change notification settings - Fork 1.2k
Forward kwargs from scan helper to scroll method as well. #321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -232,7 +232,9 @@ def parallel_bulk(client, actions, thread_count=4, chunk_size=500, | |
| pool.close() | ||
| pool.join() | ||
| | ||
| def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, **kwargs): | ||
| def scan(client, query=None, scroll='5m', raise_on_error=True, | ||
| preserve_order=False, global_kwargs={}, search_kwargs=None, | ||
| scroll_kwargs=None, **kwargs): | ||
| """ | ||
| Simple abstraction on top of the | ||
| :meth:`~elasticsearch.Elasticsearch.scroll` api - a simple iterator that | ||
| | @@ -254,6 +256,13 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa | |
| cause the scroll to paginate with preserving the order. Note that this | ||
| can be an extremely expensive operation and can easily lead to | ||
| unpredictable results, use with caution. | ||
| :arg global_kwargs: additional kwargs to be passed to both | ||
| :meth:`~elasticsearch.Elasticsearch.search` and to | ||
| :meth:`~elasticsearch.Elasticsearch.scroll` | ||
| :arg search_kwargs: additional kwargs to be passed to | ||
| :meth:`~elasticsearch.Elasticsearch.search` | ||
| :arg scroll_kwargs: additional kwargs to be passed to | ||
| :meth:`~elasticsearch.Elasticsearch.scroll` | ||
| | ||
| Any additional keyword arguments will be passed to the initial | ||
| :meth:`~elasticsearch.Elasticsearch.search` call:: | ||
| | @@ -268,7 +277,10 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa | |
| if not preserve_order: | ||
| kwargs['search_type'] = 'scan' | ||
| # initial search | ||
| resp = client.search(body=query, scroll=scroll, **kwargs) | ||
| search_kwargs = search_kwargs or {} | ||
| search_kwargs.update(global_kwargs) | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these need to be reversed - start with | ||
| search_kwargs.update(kwargs) | ||
| resp = client.search(body=query, scroll=scroll, **search_kwargs) | ||
| | ||
| scroll_id = resp.get('_scroll_id') | ||
| if scroll_id is None: | ||
| | @@ -280,7 +292,9 @@ def scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=Fa | |
| if preserve_order and first_run: | ||
| first_run = False | ||
| else: | ||
| resp = client.scroll(scroll_id, scroll=scroll) | ||
| scroll_kwargs = scroll_kwargs or {} | ||
| scroll_kwargs.update(global_kwargs) | ||
| resp = client.scroll(scroll_id, scroll=scroll, **scroll_kwargs) | ||
| | ||
| for hit in resp['hits']['hits']: | ||
| yield hit | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| from elasticsearch import helpers, TransportError | ||
| from elasticsearch import helpers, TransportError, NotFoundError | ||
| | ||
| from . import ElasticsearchTestCase | ||
| from ..test_cases import SkipTest | ||
| | @@ -184,26 +184,44 @@ def test_errors_are_collected_properly(self): | |
| | ||
| | ||
| class TestScan(ElasticsearchTestCase): | ||
| def test_order_can_be_preserved(self): | ||
| def setUp(self): | ||
| super(TestScan, self).setUp() | ||
| bulk = [] | ||
| for x in range(100): | ||
| bulk.append({"index": {"_index": "test_index", "_type": "answers", "_id": x}}) | ||
| bulk.append({"answer": x, "correct": x == 42}) | ||
| self.client.bulk(bulk, refresh=True) | ||
| | ||
| def test_order_can_be_preserved(self): | ||
| docs = list(helpers.scan(self.client, index="test_index", doc_type="answers", size=2, query={"sort": ["answer"]}, preserve_order=True)) | ||
| | ||
| self.assertEquals(100, len(docs)) | ||
| self.assertEquals(list(map(str, range(100))), list(d['_id'] for d in docs)) | ||
| self.assertEquals(list(range(100)), list(d['_source']['answer'] for d in docs)) | ||
| | ||
| def test_all_documents_are_read(self): | ||
| bulk = [] | ||
| for x in range(100): | ||
| bulk.append({"index": {"_index": "test_index", "_type": "answers", "_id": x}}) | ||
| bulk.append({"answer": x, "correct": x == 42}) | ||
| self.client.bulk(bulk, refresh=True) | ||
| def test_general_kwargs_forwarded_to_search(self): | ||
| inexistent_index = 'test_index_123' | ||
| self.assertRaises( | ||
| NotFoundError, | ||
| lambda: list(helpers.scan(self.client, index=inexistent_index, doc_type="answers", size=2)) | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for the | ||
| ) | ||
| global_kwargs = {'ignore': 404} | ||
| list(helpers.scan(self.client, index=inexistent_index, doc_type="answers", size=2, global_kwargs=global_kwargs)) | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be nice to verify the response here, even when the real test is to see no error was raised (or at least have a comment to that effect so readers know what is being tested) | ||
| | ||
| def test_general_kwargs_forwarded_to_scroll(self): | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a complex and expensive test that can be a bit cofusing. Could we perhaps just have a unit test using a mocked client (using | ||
| with self.assertRaises(NotFoundError): | ||
| for page in helpers.scan(self.client, index="test_index", doc_type="answers", size=2): | ||
| # Deleting the index after first request was done makes sure | ||
| # we test the scroll method. | ||
| self.client.indices.delete('test_index', ignore=404) | ||
| self.setUp() | ||
| # Still raises a scanning error, but gets to that point only because | ||
| # ignore=404 was forwarded to scroll. | ||
| with self.assertRaises(helpers.ScanError): | ||
| for page in helpers.scan(self.client, index="test_index", doc_type="answers", size=2, global_kwargs={'ignore': 404}): | ||
| self.client.indices.delete('test_index', ignore=404) | ||
| | ||
| def test_all_documents_are_read(self): | ||
| docs = list(helpers.scan(self.client, index="test_index", doc_type="answers", size=2)) | ||
| | ||
| self.assertEquals(100, len(docs)) | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have this here we should remove the
search_kwargsand just usekwargsthere, it would also be backwards compatible just like the current solution and simpler.