辅助工具(Helpers)

流式批量(Streaming Bulk)

elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), span_name='helpers.streaming_bulk', *args, **kwargs)

流式批量处理从传入的可迭代对象中消费操作,并逐个操作产生结果。对于非流式用例,请使用 bulk(),它是流式批量处理的包装器,会在整个输入被消费和发送后返回批量操作的摘要信息。

如果指定了 max_retries,它还会重试任何被 429 状态码拒绝的文档。使用 retry_on_status 配置哪些状态码会被重试。为此,它会等待(通过调用会阻塞的 time.sleep)``initial_backoff`` 秒,然后每次对同一分块的后续拒绝,等待时间会按 initial_backoff * 2**retry_number 的幂次递增,最多不超过 max_backoff 秒。

Parameters:
  • client (Elasticsearch) – 要使用的 Elasticsearch 实例

  • actions (Iterable[bytes | str | Dict[str, Any]]) – 包含要执行操作的可迭代对象

  • chunk_size (int) – 发送到 es 的每个分块中的文档数(默认:500)

  • max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)

  • raise_on_error (bool) – 当出现错误时抛出包含错误的 ``BulkIndexError``(通过 .errors 获取)。默认会抛出。

  • raise_on_exception (bool) – 如果为 False,则不传播从 bulk 调用抛出的异常,仅将失败的项报告为失败。

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调,应返回包含操作行和数据行的元组(如果应省略数据行则为 None)。

  • retry_on_status (int | Collection[int]) – 会触发重试的 HTTP 状态码(如果指定为 None,则仅状态码 429 会重试)。

  • max_retries (int) – 当收到 retry_on_status(默认为 429)时,文档将被重试的最大次数,设置为 0(默认)表示不重试

  • initial_backoff (float) – 第一次重试前应等待的秒数。后续每次重试的等待时间为 initial_backoff * 2**retry_number

  • max_backoff (float) – 重试等待的最大秒数

  • yield_ok (bool) – 如果设置为 False,将在输出中跳过成功的文档

  • ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表

  • client

  • actions

  • chunk_size

  • max_chunk_bytes

  • raise_on_error

  • expand_action_callback

  • raise_on_exception

  • max_retries

  • initial_backoff

  • max_backoff

  • yield_ok

  • ignore_status

  • retry_on_status

  • span_name (str)

  • args (Any)

  • kwargs (Any)

Return type:

Iterable[Tuple[bool, Dict[str, Any]]]

并行批量(Parallel Bulk)

elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)

批量处理辅助函数的并行版本,可同时在多个线程中运行。

Parameters:
  • client (Elasticsearch) – 要使用的 Elasticsearch 实例

  • actions (Iterable[bytes | str | Dict[str, Any]]) – 包含操作的迭代器

  • thread_count (int) – 用于批量请求的线程池大小

  • chunk_size (int) – 发送到 es 的每个分块中的文档数(默认:500)

  • max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)

  • raise_on_error – 当出现错误时抛出包含错误的 ``BulkIndexError``(通过 .errors 获取)。默认会抛出。

  • raise_on_exception – 如果为 False,则不传播从 bulk 调用抛出的异常,仅将失败的项报告为失败。

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调,应返回包含操作行和数据行的元组(如果应省略数据行则为 None)。

  • queue_size (int) – 主线程(生成要发送的分块)和处理线程之间的任务队列大小。

  • ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表

  • client

  • actions

  • thread_count

  • chunk_size

  • max_chunk_bytes

  • queue_size

  • expand_action_callback

  • ignore_status

  • args (Any)

  • kwargs (Any)

Return type:

Iterable[Tuple[bool, Any]]

批量(Bulk)

elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)

bulk() API 的辅助函数,提供更友好的接口 - 它消费一个操作迭代器,并将它们分块发送到 elasticsearch。它返回一个包含摘要信息的元组 - 成功执行的操作数量以及错误列表(如果 stats_only 设置为 True 则为错误数量)。请注意,默认情况下遇到错误时会抛出 BulkIndexError,因此像 stats_only 这样的选项仅在 raise_on_error 设置为 False 时适用。

收集错误时,原始文档数据会包含在错误字典中,这可能导致额外的内存使用。如果需要处理大量数据并希望忽略/收集错误,请考虑使用 streaming_bulk() 辅助函数,它只会返回错误而不会将它们存储在内存中。

Parameters:
  • client (Elasticsearch) – 要使用的 Elasticsearch 实例

  • actions (Iterable[bytes | str | Dict[str, Any]]) – 包含操作的迭代器

  • stats_only (bool) – 如果为 True,则仅报告成功/失败的操作数量,而不是成功数量和错误响应列表

  • ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表

  • client

  • actions

  • stats_only

  • ignore_status

  • args (Any)

  • kwargs (Any)

Return type:

Tuple[int, int | List[Dict[str, Any]]]

任何额外的关键字参数将传递给用于执行操作的 streaming_bulk(),更多接受的参数请参见 streaming_bulk()

扫描(Scan)

elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)

scroll() API 之上的简单抽象 - 一个简单的迭代器, 返回底层滚动请求返回的所有命中结果。

默认情况下,scan 不会按任何预定顺序返回结果。若要在滚动时保持返回文档的标准顺序 (按分数或显式排序定义),请使用 preserve_order=True。这可能是一个昂贵的操作, 会抵消使用 scan 带来的性能优势。

Parameters:
  • client (Elasticsearch) – 使用的 Elasticsearch 实例

  • query (Any | None) – search() API 的请求体

  • scroll (str) – 指定应为滚动搜索维护索引一致视图的时间

  • raise_on_error (bool) – 遇到错误时抛出异常 (ScanError) (某些分片执行失败)。 默认情况下我们会抛出异常。

  • preserve_order (bool) – 不将 search_type 设置为 scan - 这将导致滚动分页时 保持顺序。请注意,这可能是一个非常昂贵的操作,很容易导致不可预测的结果, 请谨慎使用。

  • size (int) – 每次迭代发送的批次大小 (每个分片)。

  • request_timeout (float | None) – 每次调用 scan 的显式超时时间

  • clear_scroll (bool) – 在方法完成或出错时,通过 clear scroll API 显式删除滚动 ID, 默认为 true。

  • scroll_kwargs (MutableMapping[str, Any] | None) – 传递给 scroll() 的 额外关键字参数

  • client

  • query

  • scroll

  • raise_on_error

  • preserve_order

  • size

  • request_timeout

  • clear_scroll

  • scroll_kwargs

  • kwargs (Any)

Return type:

Iterable[Dict[str, Any]]

任何额外的关键字参数都将传递给初始的 search() 调用:

scan(client,
    query={"query": {"match": {"title": "python"}}},
    index="orders-*",
    doc_type="books"
)

重建索引(Reindex)

elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})

将满足给定查询的所有文档从一个索引重新索引到另一个索引, 可能(如果指定了 target_client)在不同的集群上。 如果不指定查询,将重新索引所有文档。

2.3 版本开始,reindex() API 已作为 elasticsearch 本身的一部分提供。建议尽可能使用该 API 而不是此辅助函数。 此辅助函数主要用于向后兼容和需要更多灵活性的情况。

Note

此辅助函数不传输映射,只传输数据。

Parameters:
  • client (Elasticsearch) – 使用的 Elasticsearch 实例 (如果也指定了 target_client,则用于读取)

  • source_index (str | Collection[str]) – 从中读取文档的索引(或索引列表)

  • target_index (str) – 目标集群中要填充的索引名称

  • query (Any | None) – search() API 的请求体

  • target_client (Elasticsearch | None) – 可选,如果指定将用于写入 (从而实现集群间的重新索引)

  • chunk_size (int) – 每次发送到 es 的文档块大小 (默认: 500)

  • scroll (str) – 指定应为滚动搜索维护索引一致视图的时间

  • op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设置为 ‘create’。 如果未指定,将自动检测 target_index 是否为数据流。

  • scan_kwargs (MutableMapping[str, Any]) – 传递给 scan() 的额外关键字参数

  • bulk_kwargs (MutableMapping[str, Any]) – 传递给 bulk() 的额外关键字参数

  • client

  • source_index

  • target_index

  • query

  • target_client

  • chunk_size

  • scroll

  • op_type

  • scan_kwargs

  • bulk_kwargs

Return type:

Tuple[int, int | List[Dict[str, Any]]]