异步辅助函数

所有辅助函数的异步版本都可以在 elasticsearch.helpers 中找到, 并且都以 async_* 作为前缀。你会注意到,这些 API 与同步版 辅助工具(Helpers) 文档中的内容是完全一致的。

所有接受迭代器或生成器的异步辅助函数,同样也支持异步迭代器和异步生成器。

Streaming Bulk

async elasticsearch.helpers.async_streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), *args, **kwargs)

流式批量处理从传入的可迭代对象中消耗操作,并逐个操作产生结果。 对于非流式用例,请使用:func:~elasticsearch.helpers.async_bulk, 它是流式批量的包装器,一旦整个输入被消耗和发送,就会返回批量操作的摘要信息。

如果指定``max_retries``,它还将重试任何被拒绝且状态码为``429``的文档。 使用``retry_on_status``配置哪些状态码将被重试。为此,它将等待 (通过调用会阻塞的asyncio.sleep)``initial_backoff``秒, 然后对于同一块的每次后续拒绝,每次等待时间加倍,直到``max_backoff``秒。

Parameters:
  • client (AsyncElasticsearch) – 使用的:class:`~elasticsearch.AsyncElasticsearch`实例

  • actions (Iterable[bytes | str | Dict[str, Any]] | AsyncIterable[bytes | str | Dict[str, Any]]) – 包含要执行的操作的可迭代对象或异步可迭代对象

  • chunk_size (int) – 发送到es的每个块中的文档数(默认:500)

  • max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)

  • raise_on_error (bool) – 当出现错误时,抛出包含最后一个块错误的``BulkIndexError`` (作为`.errors`)。默认情况下我们会抛出。

  • raise_on_exception (bool) – 如果为``False``,则不传播来自``bulk``调用的异常, 仅将失败的项报告为失败。

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调, 应返回包含操作行和数据行的元组 (如果应省略数据行,则为`None`)。

  • retry_on_status (int | Collection[int]) – 将触发重试的HTTP状态码。 (如果指定为`None`,则仅状态码429会重试)。

  • max_retries (int) – 当收到retry_on_status(默认为``429``)时, 文档将被重试的最大次数, 设置为0(默认)表示不重试

  • initial_backoff (float) – 第一次重试前应等待的秒数。 任何后续重试将是``initial_backoff * 2**retry_number``的幂

  • max_backoff (float) – 重试将等待的最大秒数

  • yield_ok (bool) – 如果设置为False,将在输出中跳过成功的文档

  • ignore_status (int | Collection[int]) – 你想忽略的HTTP状态码列表

  • client

  • actions

  • chunk_size

  • max_chunk_bytes

  • raise_on_error

  • expand_action_callback

  • raise_on_exception

  • max_retries

  • initial_backoff

  • max_backoff

  • yield_ok

  • ignore_status

  • retry_on_status

  • args (Any)

  • kwargs (Any)

Return type:

AsyncIterable[Tuple[bool, Dict[str, Any]]]

Bulk

async elasticsearch.helpers.async_bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)

用于 bulk() API 的辅助工具,提供更人性化的接口—— 它接收一个操作迭代器,并将操作分批发送到 Elasticsearch。返回一个包含摘要信息的元组—— 成功执行的操作数量以及错误列表(如果 stats_only 设为 True 则返回错误数量)。 注意默认情况下遇到错误时会抛出 BulkIndexError,因此 stats_only 等选项仅在 raise_on_error 设为 False 时生效。

收集错误时,原始文档数据会包含在错误字典中,这可能导致额外的高内存消耗。如果需要处理大量数据 并希望忽略/收集错误,请考虑使用 async_streaming_bulk() 辅助工具, 它仅返回错误而不将其存储在内存中。

Parameters:
Return type:

Tuple[int, int | List[Any]]

任何额外的关键字参数将传递给用于执行操作的 async_streaming_bulk(),更多可接受参数请参阅 async_streaming_bulk()

Scan

async elasticsearch.helpers.async_scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)

scroll() API 的简单抽象—— 一个简单迭代器,按底层滚动请求返回所有命中结果。

默认情况下 scan 不会按任何预定顺序返回结果。若要在滚动时使返回文档具有标准顺序 (按评分或显式排序定义),请使用 preserve_order=True。这可能是昂贵操作, 会抵消使用 scan 带来的性能优势。

Parameters:
  • client (AsyncElasticsearch) – 使用的 AsyncElasticsearch 实例

  • query (Any | None) – search() API 的请求体

  • scroll (str) – 指定应为滚动搜索保持索引一致视图的时间

  • raise_on_error (bool) – 遇到错误时抛出异常(ScanError)(某些分片执行失败)。默认会抛出。

  • preserve_order (bool) – 不将 search_type 设为 scan——这将使滚动分页时保持顺序。 注意这可能是极其昂贵的操作,容易导致不可预测结果,请谨慎使用。

  • size (int) – 每次迭代发送的批次大小(每分片)。

  • request_timeout (float | None) – 每次调用 scan 的显式超时

  • clear_scroll (bool) – 在方法完成或出错时,通过 clear scroll API 显式删除滚动 ID,默认为 true。

  • scroll_kwargs (MutableMapping[str, Any] | None) – 传递给 scroll() 的额外参数

  • client

  • query

  • scroll

  • raise_on_error

  • preserve_order

  • size

  • request_timeout

  • clear_scroll

  • scroll_kwargs

  • kwargs (Any)

Return type:

AsyncIterable[Dict[str, Any]]

任何额外的关键字参数将传递给初始的 search() 调用:

async_scan(
    client,
    query={"query": {"match": {"title": "python"}}},
    index="orders-*"
)

Reindex

async elasticsearch.helpers.async_reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})

将满足给定查询的所有文档从一个索引重新索引到另一个索引(如果指定了 target_client, 可能位于不同集群)。如果不指定查询,则会重新索引所有文档。

2.3 版本起,Elasticsearch 本身提供了 reindex() API。 建议尽可能使用该 API 而非此辅助工具。此辅助工具主要用于向后兼容和需要更高灵活性的场景。

Note

此辅助工具不传输映射,仅传输数据。

Parameters:
  • client (AsyncElasticsearch) – 使用的 AsyncElasticsearch 实例(如果同时指定了 target_client 则用于读取)

  • source_index (str | Collection[str]) – 读取文档的源索引(或索引列表)

  • target_index (str) – 目标集群中要填充的索引名称

  • query (Any) – search() API 的请求体

  • target_client (AsyncElasticsearch | None) – 可选,若指定将用于写入(从而实现跨集群重新索引)

  • chunk_size (int) – 每次发送到 ES 的文档块大小(默认:500)

  • scroll (str) – 指定应为滚动搜索保持索引一致视图的时间

  • op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设为 ‘create’。 如果未指定,将自动检测 target_index 是否为数据流。

  • scan_kwargs (MutableMapping[str, Any]) – 传递给 async_scan() 的额外参数

  • bulk_kwargs (MutableMapping[str, Any]) – 传递给 async_bulk() 的额外参数

  • client

  • source_index

  • target_index

  • query

  • target_client

  • chunk_size

  • scroll

  • op_type

  • scan_kwargs

  • bulk_kwargs

Return type:

Tuple[int, int | List[Any]]