Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elasticsearch/_async/client/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from .utils import NamespacedClient, _make_path, query_params, SKIP_IN_PATH
from .utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params


class NodesClient(NamespacedClient):
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/_async/client/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from .utils import SKIP_IN_PATH, NamespacedClient, query_params, _make_path
from .utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params


class SqlClient(NamespacedClient):
Expand Down
33 changes: 30 additions & 3 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging

from ..compat import map
from ..exceptions import TransportError
from ..exceptions import NotFoundError, TransportError
from ..helpers.actions import (
_ActionChunker,
_process_bulk_chunk_error,
Expand Down Expand Up @@ -406,6 +406,7 @@ async def async_reindex(
target_client=None,
chunk_size=500,
scroll="5m",
op_type=None,
scan_kwargs={},
bulk_kwargs={},
):
Expand Down Expand Up @@ -435,6 +436,9 @@ async def async_reindex(
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
:arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search
:arg op_type: Explicit operation type. Defaults to '_index'. Data streams must
be set to 'create'. If not specified, will auto-detect if target_index is a
data stream.
:arg scan_kwargs: additional kwargs to be passed to
:func:`~elasticsearch.helpers.async_scan`
:arg bulk_kwargs: additional kwargs to be passed to
Expand All @@ -445,18 +449,41 @@ async def async_reindex(
client, query=query, index=source_index, scroll=scroll, **scan_kwargs
)

async def _change_doc_index(hits, index):
async def _change_doc_index(hits, index, op_type):
async for h in hits:
h["_index"] = index
if op_type is not None:
h["_op_type"] = op_type
if "fields" in h:
h.update(h.pop("fields"))
yield h

kwargs = {"stats_only": True}
kwargs.update(bulk_kwargs)

is_data_stream = False
try:
# Verify if the target_index is data stream or index
data_streams = await target_client.indices.get_data_stream(
target_index, expand_wildcards="all"
)
is_data_stream = any(
data_stream["name"] == target_index
for data_stream in data_streams["data_streams"]
)
except (TransportError, KeyError, NotFoundError):
# If its not data stream, might be index
pass

if is_data_stream:
if op_type not in (None, "create"):
raise ValueError("Data streams must have 'op_type' set to 'create'")
else:
op_type = "create"

return await async_bulk(
target_client,
_change_doc_index(docs, target_index),
_change_doc_index(docs, target_index, op_type),
chunk_size=chunk_size,
**kwargs,
)
1 change: 1 addition & 0 deletions elasticsearch/_async/helpers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def async_reindex(
target_client: Optional[AsyncElasticsearch] = ...,
chunk_size: int = ...,
scroll: str = ...,
op_type: str = ...,
scan_kwargs: Optional[Mapping[str, Any]] = ...,
bulk_kwargs: Optional[Mapping[str, Any]] = ...,
) -> Tuple[int, Union[int, List[Any]]]: ...
2 changes: 1 addition & 1 deletion elasticsearch/client/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from .utils import NamespacedClient, _make_path, query_params, SKIP_IN_PATH
from .utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params


class NodesClient(NamespacedClient):
Expand Down
2 changes: 1 addition & 1 deletion elasticsearch/client/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from .utils import SKIP_IN_PATH, NamespacedClient, query_params, _make_path
from .utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params


class SqlClient(NamespacedClient):
Expand Down
33 changes: 30 additions & 3 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from operator import methodcaller

from ..compat import Mapping, Queue, map, string_types
from ..exceptions import TransportError
from ..exceptions import NotFoundError, TransportError
from .errors import BulkIndexError, ScanError

logger = logging.getLogger("elasticsearch.helpers")
Expand Down Expand Up @@ -622,6 +622,7 @@ def reindex(
target_client=None,
chunk_size=500,
scroll="5m",
op_type=None,
scan_kwargs={},
bulk_kwargs={},
):
Expand Down Expand Up @@ -651,6 +652,9 @@ def reindex(
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
:arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search
:arg op_type: Explicit operation type. Defaults to '_index'. Data streams must
be set to 'create'. If not specified, will auto-detect if target_index is a
data stream.
:arg scan_kwargs: additional kwargs to be passed to
:func:`~elasticsearch.helpers.scan`
:arg bulk_kwargs: additional kwargs to be passed to
Expand All @@ -659,18 +663,41 @@ def reindex(
target_client = client if target_client is None else target_client
docs = scan(client, query=query, index=source_index, scroll=scroll, **scan_kwargs)

def _change_doc_index(hits, index):
def _change_doc_index(hits, index, op_type):
for h in hits:
h["_index"] = index
if op_type is not None:
h["_op_type"] = op_type
if "fields" in h:
h.update(h.pop("fields"))
yield h

kwargs = {"stats_only": True}
kwargs.update(bulk_kwargs)

is_data_stream = False
try:
# Verify if the target_index is data stream or index
data_streams = target_client.indices.get_data_stream(
target_index, expand_wildcards="all"
)
is_data_stream = any(
data_stream["name"] == target_index
for data_stream in data_streams["data_streams"]
)
except (TransportError, KeyError, NotFoundError):
# If its not data stream, might be index
pass

if is_data_stream:
if op_type not in (None, "create"):
raise ValueError("Data streams must have 'op_type' set to 'create'")
else:
op_type = "create"

return bulk(
target_client,
_change_doc_index(docs, target_index),
_change_doc_index(docs, target_index, op_type),
chunk_size=chunk_size,
**kwargs
)
1 change: 1 addition & 0 deletions elasticsearch/helpers/actions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def reindex(
target_client: Optional[Elasticsearch] = ...,
chunk_size: int = ...,
scroll: str = ...,
op_type: str = ...,
scan_kwargs: Optional[Mapping[str, Any]] = ...,
bulk_kwargs: Optional[Mapping[str, Any]] = ...,
) -> Tuple[int, Union[int, List[Any]]]: ...
3 changes: 3 additions & 0 deletions elasticsearch/helpers/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def teardown_method(self, _):
if self.es_version() >= (7, 7):
expand_wildcards.append("hidden")

self.client.indices.delete_data_stream(
name="*", ignore=404, expand_wildcards=expand_wildcards
)
self.client.indices.delete(
index="*", ignore=404, expand_wildcards=expand_wildcards
)
Expand Down
65 changes: 65 additions & 0 deletions test_elasticsearch/test_async/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import asyncio
from datetime import datetime, timedelta, timezone

import pytest
from mock import MagicMock, patch
Expand Down Expand Up @@ -884,3 +885,67 @@ async def test_children_are_reindexed_correctly(
"_version": 1,
"found": True,
} == q


@pytest.fixture(scope="function")
async def reindex_data_stream_setup(async_client):
dt = datetime.now(tz=timezone.utc)
bulk = []
for x in range(100):
bulk.append({"index": {"_index": "test_index_stream", "_id": x}})
bulk.append(
{
"answer": x,
"correct": x == 42,
"type": "answers" if x % 2 == 0 else "questions",
"@timestamp": (dt - timedelta(days=x)).isoformat(),
}
)
await async_client.bulk(bulk, refresh=True)
await async_client.indices.put_index_template(
name="my-index-template",
body={
"index_patterns": ["py-*-*"],
"data_stream": {},
},
)
await async_client.indices.create_data_stream(name="py-test-stream")
await async_client.indices.refresh()
yield


class TestAsyncDataStreamReindex(object):
@pytest.mark.parametrize("op_type", [None, "create"])
async def test_reindex_index_datastream(
self, op_type, async_client, reindex_data_stream_setup
):
await helpers.async_reindex(
async_client,
source_index="test_index_stream",
target_index="py-test-stream",
scan_kwargs={"q": "type:answers"},
bulk_kwargs={"refresh": True},
op_type=op_type,
)
# await async_client.indices.refresh()
assert await async_client.indices.exists(index="py-test-stream")
assert (
50
== (await async_client.count(index="py-test-stream", q="type:answers"))[
"count"
]
)

async def test_reindex_index_datastream_op_type_index(
self, async_client, reindex_data_stream_setup
):
with pytest.raises(
ValueError, match="Data streams must have 'op_type' set to 'create'"
):
await helpers.async_reindex(
async_client,
source_index="test_index_stream",
target_index="py-test-stream",
query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}},
op_type="_index",
)
62 changes: 62 additions & 0 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta, timezone

import pytest
from mock import patch

from elasticsearch import TransportError, helpers
Expand Down Expand Up @@ -754,3 +757,62 @@ def test_children_are_reindexed_correctly(self):
},
q,
)


@pytest.fixture(scope="function")
def reindex_data_stream_setup(sync_client):
dt = datetime.now(tz=timezone.utc)
bulk = []
for x in range(100):
bulk.append({"index": {"_index": "test_index_stream", "_id": x}})
bulk.append(
{
"answer": x,
"correct": x == 42,
"type": "answers" if x % 2 == 0 else "questions",
"@timestamp": (dt - timedelta(days=x)).isoformat(),
}
)
sync_client.bulk(bulk, refresh=True)
sync_client.indices.put_index_template(
name="my-index-template",
body={
"index_patterns": ["py-*-*"],
"data_stream": {},
},
)
sync_client.indices.create_data_stream(name="py-test-stream")
sync_client.indices.refresh()


class TestDataStreamReindex(object):
@pytest.mark.parametrize("op_type", [None, "create"])
def test_reindex_index_datastream(
self, op_type, sync_client, reindex_data_stream_setup
):
helpers.reindex(
sync_client,
source_index="test_index_stream",
target_index="py-test-stream",
query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}},
op_type=op_type,
)
sync_client.indices.refresh()
assert sync_client.indices.exists(index="py-test-stream")
assert (
50 == sync_client.count(index="py-test-stream", q="type:answers")["count"]
)

def test_reindex_index_datastream_op_type_index(
self, sync_client, reindex_data_stream_setup
):
with pytest.raises(
ValueError, match="Data streams must have 'op_type' set to 'create'"
):
helpers.reindex(
sync_client,
source_index="test_index_stream",
target_index="py-test-stream",
query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}},
op_type="_index",
)