辅助工具(Helpers)
流式批量(Streaming Bulk)
- elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), span_name='helpers.streaming_bulk', *args, **kwargs)
流式批量处理从传入的可迭代对象中消费操作,并逐个操作产生结果。对于非流式用例,请使用
bulk()
,它是流式批量处理的包装器,会在整个输入被消费和发送后返回批量操作的摘要信息。如果指定了
max_retries
,它还会重试任何被429
状态码拒绝的文档。使用retry_on_status
配置哪些状态码会被重试。为此,它会等待(通过调用会阻塞的 time.sleep)``initial_backoff`` 秒,然后每次对同一分块的后续拒绝,等待时间会按initial_backoff * 2**retry_number
的幂次递增,最多不超过max_backoff
秒。- Parameters:
client (Elasticsearch) – 要使用的
Elasticsearch
实例actions (Iterable[bytes | str | Dict[str, Any]]) – 包含要执行操作的可迭代对象
chunk_size (int) – 发送到 es 的每个分块中的文档数(默认:500)
max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)
raise_on_error (bool) – 当出现错误时抛出包含错误的 ``BulkIndexError``(通过 .errors 获取)。默认会抛出。
raise_on_exception (bool) – 如果为
False
,则不传播从bulk
调用抛出的异常,仅将失败的项报告为失败。expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调,应返回包含操作行和数据行的元组(如果应省略数据行则为 None)。
retry_on_status (int | Collection[int]) – 会触发重试的 HTTP 状态码(如果指定为 None,则仅状态码 429 会重试)。
max_retries (int) – 当收到 retry_on_status(默认为
429
)时,文档将被重试的最大次数,设置为 0(默认)表示不重试initial_backoff (float) – 第一次重试前应等待的秒数。后续每次重试的等待时间为
initial_backoff * 2**retry_number
max_backoff (float) – 重试等待的最大秒数
yield_ok (bool) – 如果设置为 False,将在输出中跳过成功的文档
ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表
client
actions
chunk_size
max_chunk_bytes
raise_on_error
expand_action_callback
raise_on_exception
max_retries
initial_backoff
max_backoff
yield_ok
ignore_status
retry_on_status
span_name (str)
args (Any)
kwargs (Any)
- Return type:
并行批量(Parallel Bulk)
- elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)
批量处理辅助函数的并行版本,可同时在多个线程中运行。
- Parameters:
client (Elasticsearch) – 要使用的
Elasticsearch
实例thread_count (int) – 用于批量请求的线程池大小
chunk_size (int) – 发送到 es 的每个分块中的文档数(默认:500)
max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)
raise_on_error – 当出现错误时抛出包含错误的 ``BulkIndexError``(通过 .errors 获取)。默认会抛出。
raise_on_exception – 如果为
False
,则不传播从bulk
调用抛出的异常,仅将失败的项报告为失败。expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调,应返回包含操作行和数据行的元组(如果应省略数据行则为 None)。
queue_size (int) – 主线程(生成要发送的分块)和处理线程之间的任务队列大小。
ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表
client
actions
thread_count
chunk_size
max_chunk_bytes
queue_size
expand_action_callback
ignore_status
args (Any)
kwargs (Any)
- Return type:
批量(Bulk)
- elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)
bulk()
API 的辅助函数,提供更友好的接口 - 它消费一个操作迭代器,并将它们分块发送到 elasticsearch。它返回一个包含摘要信息的元组 - 成功执行的操作数量以及错误列表(如果stats_only
设置为True
则为错误数量)。请注意,默认情况下遇到错误时会抛出BulkIndexError
,因此像stats_only
这样的选项仅在raise_on_error
设置为False
时适用。收集错误时,原始文档数据会包含在错误字典中,这可能导致额外的内存使用。如果需要处理大量数据并希望忽略/收集错误,请考虑使用
streaming_bulk()
辅助函数,它只会返回错误而不会将它们存储在内存中。- Parameters:
client (Elasticsearch) – 要使用的
Elasticsearch
实例stats_only (bool) – 如果为 True,则仅报告成功/失败的操作数量,而不是成功数量和错误响应列表
ignore_status (int | Collection[int]) – 要忽略的 HTTP 状态码列表
client
actions
stats_only
ignore_status
args (Any)
kwargs (Any)
- Return type:
任何额外的关键字参数将传递给用于执行操作的
streaming_bulk()
,更多接受的参数请参见streaming_bulk()
。
扫描(Scan)
- elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)
在
scroll()
API 之上的简单抽象 - 一个简单的迭代器, 返回底层滚动请求返回的所有命中结果。默认情况下,scan 不会按任何预定顺序返回结果。若要在滚动时保持返回文档的标准顺序 (按分数或显式排序定义),请使用
preserve_order=True
。这可能是一个昂贵的操作, 会抵消使用scan
带来的性能优势。- Parameters:
client (Elasticsearch) – 使用的
Elasticsearch
实例scroll (str) – 指定应为滚动搜索维护索引一致视图的时间
raise_on_error (bool) – 遇到错误时抛出异常 (
ScanError
) (某些分片执行失败)。 默认情况下我们会抛出异常。preserve_order (bool) – 不将
search_type
设置为scan
- 这将导致滚动分页时 保持顺序。请注意,这可能是一个非常昂贵的操作,很容易导致不可预测的结果, 请谨慎使用。size (int) – 每次迭代发送的批次大小 (每个分片)。
request_timeout (float | None) – 每次调用
scan
的显式超时时间clear_scroll (bool) – 在方法完成或出错时,通过 clear scroll API 显式删除滚动 ID, 默认为 true。
scroll_kwargs (MutableMapping[str, Any] | None) – 传递给
scroll()
的 额外关键字参数client
query
scroll
raise_on_error
preserve_order
size
request_timeout
clear_scroll
scroll_kwargs
kwargs (Any)
- Return type:
任何额外的关键字参数都将传递给初始的
search()
调用:scan(client, query={"query": {"match": {"title": "python"}}}, index="orders-*", doc_type="books" )
重建索引(Reindex)
- elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})
将满足给定查询的所有文档从一个索引重新索引到另一个索引, 可能(如果指定了 target_client)在不同的集群上。 如果不指定查询,将重新索引所有文档。
从
2.3
版本开始,reindex()
API 已作为 elasticsearch 本身的一部分提供。建议尽可能使用该 API 而不是此辅助函数。 此辅助函数主要用于向后兼容和需要更多灵活性的情况。Note
此辅助函数不传输映射,只传输数据。
- Parameters:
client (Elasticsearch) – 使用的
Elasticsearch
实例 (如果也指定了 target_client,则用于读取)source_index (str | Collection[str]) – 从中读取文档的索引(或索引列表)
target_index (str) – 目标集群中要填充的索引名称
target_client (Elasticsearch | None) – 可选,如果指定将用于写入 (从而实现集群间的重新索引)
chunk_size (int) – 每次发送到 es 的文档块大小 (默认: 500)
scroll (str) – 指定应为滚动搜索维护索引一致视图的时间
op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设置为 ‘create’。 如果未指定,将自动检测 target_index 是否为数据流。
scan_kwargs (MutableMapping[str, Any]) – 传递给
scan()
的额外关键字参数bulk_kwargs (MutableMapping[str, Any]) – 传递给
bulk()
的额外关键字参数client
source_index
target_index
query
target_client
chunk_size
scroll
op_type
scan_kwargs
bulk_kwargs
- Return type: