异步辅助函数
所有辅助函数的异步版本都可以在 elasticsearch.helpers
中找到,
并且都以 async_*
作为前缀。你会注意到,这些 API 与同步版
辅助工具(Helpers) 文档中的内容是完全一致的。
所有接受迭代器或生成器的异步辅助函数,同样也支持异步迭代器和异步生成器。
Streaming Bulk
- async elasticsearch.helpers.async_streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), *args, **kwargs)
流式批量处理从传入的可迭代对象中消耗操作,并逐个操作产生结果。 对于非流式用例,请使用:func:~elasticsearch.helpers.async_bulk, 它是流式批量的包装器,一旦整个输入被消耗和发送,就会返回批量操作的摘要信息。
如果指定``max_retries``,它还将重试任何被拒绝且状态码为``429``的文档。 使用``retry_on_status``配置哪些状态码将被重试。为此,它将等待 (通过调用会阻塞的asyncio.sleep)``initial_backoff``秒, 然后对于同一块的每次后续拒绝,每次等待时间加倍,直到``max_backoff``秒。
- Parameters:
client (AsyncElasticsearch) – 使用的:class:`~elasticsearch.AsyncElasticsearch`实例
actions (Iterable[bytes | str | Dict[str, Any]] | AsyncIterable[bytes | str | Dict[str, Any]]) – 包含要执行的操作的可迭代对象或异步可迭代对象
chunk_size (int) – 发送到es的每个块中的文档数(默认:500)
max_chunk_bytes (int) – 请求的最大字节大小(默认:100MB)
raise_on_error (bool) – 当出现错误时,抛出包含最后一个块错误的``BulkIndexError`` (作为`.errors`)。默认情况下我们会抛出。
raise_on_exception (bool) – 如果为``False``,则不传播来自``bulk``调用的异常, 仅将失败的项报告为失败。
expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – 对每个传入操作执行的回调, 应返回包含操作行和数据行的元组 (如果应省略数据行,则为`None`)。
retry_on_status (int | Collection[int]) – 将触发重试的HTTP状态码。 (如果指定为`None`,则仅状态码429会重试)。
max_retries (int) – 当收到retry_on_status(默认为``429``)时, 文档将被重试的最大次数, 设置为0(默认)表示不重试
initial_backoff (float) – 第一次重试前应等待的秒数。 任何后续重试将是``initial_backoff * 2**retry_number``的幂
max_backoff (float) – 重试将等待的最大秒数
yield_ok (bool) – 如果设置为False,将在输出中跳过成功的文档
ignore_status (int | Collection[int]) – 你想忽略的HTTP状态码列表
client
actions
chunk_size
max_chunk_bytes
raise_on_error
expand_action_callback
raise_on_exception
max_retries
initial_backoff
max_backoff
yield_ok
ignore_status
retry_on_status
args (Any)
kwargs (Any)
- Return type:
Bulk
- async elasticsearch.helpers.async_bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)
用于
bulk()
API 的辅助工具,提供更人性化的接口—— 它接收一个操作迭代器,并将操作分批发送到 Elasticsearch。返回一个包含摘要信息的元组—— 成功执行的操作数量以及错误列表(如果stats_only
设为True
则返回错误数量)。 注意默认情况下遇到错误时会抛出BulkIndexError
,因此stats_only
等选项仅在raise_on_error
设为False
时生效。收集错误时,原始文档数据会包含在错误字典中,这可能导致额外的高内存消耗。如果需要处理大量数据 并希望忽略/收集错误,请考虑使用
async_streaming_bulk()
辅助工具, 它仅返回错误而不将其存储在内存中。
- Parameters:
client (AsyncElasticsearch) – 使用的
AsyncElasticsearch
实例actions (Iterable[bytes | str | Dict[str, Any]] | AsyncIterable[bytes | str | Dict[str, Any]]) – 包含操作的迭代器
stats_only (bool) – 如果为 True 则仅报告成功/失败的操作数量,而非成功数量加错误响应列表
ignore_status (int | Collection[int]) – 需要忽略的 HTTP 状态码列表
client
actions
stats_only
ignore_status
args (Any)
kwargs (Any)
- Return type:
任何额外的关键字参数将传递给用于执行操作的
async_streaming_bulk()
,更多可接受参数请参阅async_streaming_bulk()
。
Scan
- async elasticsearch.helpers.async_scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)
对
scroll()
API 的简单抽象—— 一个简单迭代器,按底层滚动请求返回所有命中结果。默认情况下 scan 不会按任何预定顺序返回结果。若要在滚动时使返回文档具有标准顺序 (按评分或显式排序定义),请使用
preserve_order=True
。这可能是昂贵操作, 会抵消使用scan
带来的性能优势。
- Parameters:
client (AsyncElasticsearch) – 使用的
AsyncElasticsearch
实例scroll (str) – 指定应为滚动搜索保持索引一致视图的时间
raise_on_error (bool) – 遇到错误时抛出异常(
ScanError
)(某些分片执行失败)。默认会抛出。preserve_order (bool) – 不将
search_type
设为scan
——这将使滚动分页时保持顺序。 注意这可能是极其昂贵的操作,容易导致不可预测结果,请谨慎使用。size (int) – 每次迭代发送的批次大小(每分片)。
request_timeout (float | None) – 每次调用
scan
的显式超时clear_scroll (bool) – 在方法完成或出错时,通过 clear scroll API 显式删除滚动 ID,默认为 true。
scroll_kwargs (MutableMapping[str, Any] | None) – 传递给
scroll()
的额外参数client
query
scroll
raise_on_error
preserve_order
size
request_timeout
clear_scroll
scroll_kwargs
kwargs (Any)
- Return type:
AsyncIterable[Dict[str, Any]]
任何额外的关键字参数将传递给初始的
search()
调用:async_scan( client, query={"query": {"match": {"title": "python"}}}, index="orders-*" )
Reindex
- async elasticsearch.helpers.async_reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})
将满足给定查询的所有文档从一个索引重新索引到另一个索引(如果指定了 target_client, 可能位于不同集群)。如果不指定查询,则会重新索引所有文档。
自
2.3
版本起,Elasticsearch 本身提供了reindex()
API。 建议尽可能使用该 API 而非此辅助工具。此辅助工具主要用于向后兼容和需要更高灵活性的场景。Note
此辅助工具不传输映射,仅传输数据。
- Parameters:
client (AsyncElasticsearch) – 使用的
AsyncElasticsearch
实例(如果同时指定了 target_client 则用于读取)source_index (str | Collection[str]) – 读取文档的源索引(或索引列表)
target_index (str) – 目标集群中要填充的索引名称
target_client (AsyncElasticsearch | None) – 可选,若指定将用于写入(从而实现跨集群重新索引)
chunk_size (int) – 每次发送到 ES 的文档块大小(默认:500)
scroll (str) – 指定应为滚动搜索保持索引一致视图的时间
op_type (str | None) – 显式操作类型。默认为 ‘_index’。数据流必须设为 ‘create’。 如果未指定,将自动检测 target_index 是否为数据流。
scan_kwargs (MutableMapping[str, Any]) – 传递给
async_scan()
的额外参数bulk_kwargs (MutableMapping[str, Any]) – 传递给
async_bulk()
的额外参数client
source_index
target_index
query
target_client
chunk_size
scroll
op_type
scan_kwargs
bulk_kwargs
- Return type: