Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 152 additions & 74 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,58 @@

import asyncio
import logging
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Callable,
Collection,
Dict,
Iterable,
List,
MutableMapping,
Optional,
Tuple,
TypeVar,
Union,
)

from ..exceptions import NotFoundError, TransportError
from ..helpers.actions import (
_TYPE_BULK_ACTION,
_TYPE_BULK_ACTION_BODY,
_TYPE_BULK_ACTION_HEADER,
_TYPE_BULK_ACTION_HEADER_AND_BODY,
_ActionChunker,
_process_bulk_chunk_error,
_process_bulk_chunk_success,
expand_action,
)
from ..helpers.errors import ScanError
from ..serializer import Serializer
from .client import AsyncElasticsearch # noqa

logger = logging.getLogger("elasticsearch.helpers")


async def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
T = TypeVar("T")


async def _chunk_actions(
actions: AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY],
chunk_size: int,
max_chunk_bytes: int,
serializer: Serializer,
) -> AsyncIterable[
Tuple[
List[
Union[
Tuple[_TYPE_BULK_ACTION_HEADER],
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
]
],
List[bytes],
]
]:
"""
Split actions into chunks by number or size, serialize them into strings in
the process.
Expand All @@ -49,19 +86,24 @@ async def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):


async def _process_bulk_chunk(
client,
bulk_actions,
bulk_data,
raise_on_exception=True,
raise_on_error=True,
ignore_status=(),
*args,
**kwargs,
):
client: AsyncElasticsearch,
bulk_actions: List[bytes],
bulk_data: List[
Union[
Tuple[_TYPE_BULK_ACTION_HEADER],
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
]
],
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status: Union[int, Collection[int]] = (),
*args: Any,
**kwargs: Any,
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:
"""
Send a bulk request to elasticsearch and process the output.
"""
if not isinstance(ignore_status, (list, tuple)):
if isinstance(ignore_status, int):
ignore_status = (ignore_status,)

try:
Expand All @@ -77,7 +119,7 @@ async def _process_bulk_chunk(
)
else:
gen = _process_bulk_chunk_success(
resp=resp,
resp=resp.raw,
bulk_data=bulk_data,
ignore_status=ignore_status,
raise_on_error=raise_on_error,
Expand All @@ -86,21 +128,25 @@ async def _process_bulk_chunk(
yield item


def aiter(x):
def aiter(x: Union[Iterable[T], AsyncIterable[T]]) -> AsyncIterator[T]:
"""Turns an async iterable or iterable into an async iterator"""
if hasattr(x, "__anext__"):
return x
return x # type: ignore[return-value]
elif hasattr(x, "__aiter__"):
return x.__aiter__()
return x.__aiter__() # type: ignore[union-attr]

async def f():
for item in x:
async def f() -> AsyncIterable[T]:
nonlocal x
ix: Iterable[T] = x # type: ignore[assignment]
for item in ix:
yield item

return f().__aiter__()


async def azip(*iterables):
async def azip(
*iterables: Union[Iterable[T], AsyncIterable[T]]
) -> AsyncIterable[Tuple[T, ...]]:
"""Zips async iterables and iterables into an async iterator
with the same behavior as zip()
"""
Expand All @@ -113,21 +159,23 @@ async def azip(*iterables):


async def async_streaming_bulk(
client,
actions,
chunk_size=500,
max_chunk_bytes=100 * 1024 * 1024,
raise_on_error=True,
expand_action_callback=expand_action,
raise_on_exception=True,
max_retries=0,
initial_backoff=2,
max_backoff=600,
yield_ok=True,
ignore_status=(),
*args,
**kwargs,
):
client: AsyncElasticsearch,
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
chunk_size: int = 500,
max_chunk_bytes: int = 100 * 1024 * 1024,
raise_on_error: bool = True,
expand_action_callback: Callable[
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
] = expand_action,
raise_on_exception: bool = True,
max_retries: int = 0,
initial_backoff: float = 2,
max_backoff: float = 600,
yield_ok: bool = True,
ignore_status: Union[int, Collection[int]] = (),
*args: Any,
**kwargs: Any,
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:

"""
Streaming bulk consumes actions from the iterable passed in and yields
Expand Down Expand Up @@ -166,25 +214,44 @@ async def async_streaming_bulk(
client = client.options()
client._client_meta = (("h", "bp"),)

async def map_actions():
async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
async for item in aiter(actions):
yield expand_action_callback(item)

serializer = client.transport.serializers.get_serializer("application/json")

bulk_data: List[
Union[
Tuple[_TYPE_BULK_ACTION_HEADER],
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
]
]
bulk_actions: List[bytes]
async for bulk_data, bulk_actions in _chunk_actions(
map_actions(), chunk_size, max_chunk_bytes, serializer
):

for attempt in range(max_retries + 1):
to_retry, to_retry_data = [], []
to_retry: List[bytes] = []
to_retry_data: List[
Union[
Tuple[_TYPE_BULK_ACTION_HEADER],
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
]
] = []
if attempt:
await asyncio.sleep(
min(max_backoff, initial_backoff * 2 ** (attempt - 1))
)

try:
async for data, (ok, info) in azip(
data: Union[
Tuple[_TYPE_BULK_ACTION_HEADER],
Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
]
ok: bool
info: Dict[str, Any]
async for data, (ok, info) in azip( # type: ignore
bulk_data,
_process_bulk_chunk(
client,
Expand Down Expand Up @@ -228,8 +295,13 @@ async def map_actions():


async def async_bulk(
client, actions, stats_only=False, ignore_status=(), *args, **kwargs
):
client: AsyncElasticsearch,
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
stats_only: bool = False,
ignore_status: Union[int, Collection[int]] = (),
*args: Any,
**kwargs: Any,
) -> Tuple[int, Union[int, List[Any]]]:
"""
Helper for the :meth:`~elasticsearch.AsyncElasticsearch.bulk` api that provides
a more human friendly interface - it consumes an iterator of actions and
Expand Down Expand Up @@ -266,7 +338,7 @@ async def async_bulk(
# make streaming_bulk yield successful results so we can count them
kwargs["yield_ok"] = True
async for ok, item in async_streaming_bulk(
client, actions, ignore_status=ignore_status, *args, **kwargs
client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc]
):
# go through request-response pairs and detect failures
if not ok:
Expand All @@ -280,17 +352,17 @@ async def async_bulk(


async def async_scan(
client,
query=None,
scroll="5m",
raise_on_error=True,
preserve_order=False,
size=1000,
request_timeout=None,
clear_scroll=True,
scroll_kwargs=None,
**kwargs,
):
client: AsyncElasticsearch,
query: Optional[Any] = None,
scroll: str = "5m",
raise_on_error: bool = True,
preserve_order: bool = False,
size: int = 1000,
request_timeout: Optional[float] = None,
clear_scroll: bool = True,
scroll_kwargs: Optional[MutableMapping[str, Any]] = None,
**kwargs: Any,
) -> AsyncIterable[Dict[str, Any]]:
"""
Simple abstraction on top of the
:meth:`~elasticsearch.AsyncElasticsearch.scroll` api - a simple iterator that
Expand Down Expand Up @@ -321,22 +393,23 @@ async def async_scan(
:meth:`~elasticsearch.AsyncElasticsearch.scroll`

Any additional keyword arguments will be passed to the initial
:meth:`~elasticsearch.AsyncElasticsearch.search` call::
:meth:`~elasticsearch.AsyncElasticsearch.search` call:

async_scan(es,
.. code-block:: python

async_scan(
es,
query={"query": {"match": {"title": "python"}}},
index="orders-*",
doc_type="books"
index="orders-*"
)

"""
scroll_kwargs = scroll_kwargs or {}

if not preserve_order:
query = query.copy() if query else {}
query["sort"] = "_doc"

def pop_transport_kwargs(kw):
def pop_transport_kwargs(kw: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
# Grab options that should be propagated to every
# API call within this helper instead of just 'search()'
transport_kwargs = {}
Expand Down Expand Up @@ -367,9 +440,9 @@ def pop_transport_kwargs(kw):
search_kwargs = kwargs.copy()
search_kwargs["scroll"] = scroll
search_kwargs["size"] = size
resp = await client.search(body=query, **search_kwargs)
resp = await client.search(body=query, **search_kwargs) # type: ignore[call-arg]

scroll_id = resp.raw.get("_scroll_id")
scroll_id: Optional[str] = resp.raw.get("_scroll_id")
scroll_transport_kwargs = pop_transport_kwargs(scroll_kwargs)
if scroll_transport_kwargs:
scroll_client = client.options(**scroll_transport_kwargs)
Expand All @@ -382,9 +455,10 @@ def pop_transport_kwargs(kw):
yield hit

# Default to 0 if the value isn't included in the response
shards_successful = resp.raw["_shards"].get("successful", 0)
shards_skipped = resp.raw["_shards"].get("skipped", 0)
shards_total = resp.raw["_shards"].get("total", 0)
shards_info: Dict[str, int] = resp.raw["_shards"]
shards_successful = shards_info.get("successful", 0)
shards_skipped = shards_info.get("skipped", 0)
shards_total = shards_info.get("total", 0)

# check if we have any errors
if (shards_successful + shards_skipped) < shards_total:
Expand Down Expand Up @@ -416,17 +490,17 @@ def pop_transport_kwargs(kw):


async def async_reindex(
client,
source_index,
target_index,
query=None,
target_client=None,
chunk_size=500,
scroll="5m",
op_type=None,
scan_kwargs={},
bulk_kwargs={},
):
client: AsyncElasticsearch,
source_index: Union[str, Collection[str]],
target_index: str,
query: Any = None,
target_client: Optional[AsyncElasticsearch] = None,
chunk_size: int = 500,
scroll: str = "5m",
op_type: Optional[str] = None,
scan_kwargs: MutableMapping[str, Any] = {},
bulk_kwargs: MutableMapping[str, Any] = {},
) -> Tuple[int, Union[int, List[Any]]]:

"""
Reindex all documents from one index that satisfy a given query
Expand Down Expand Up @@ -466,7 +540,11 @@ async def async_reindex(
client, query=query, index=source_index, scroll=scroll, **scan_kwargs
)

async def _change_doc_index(hits, index, op_type):
async def _change_doc_index(
hits: AsyncIterable[Dict[str, Any]],
index: str,
op_type: Optional[str],
) -> AsyncIterable[Dict[str, Any]]:
async for h in hits:
h["_index"] = index
if op_type is not None:
Expand Down
Loading