Elasticsearch Java High Level REST Client(Update By Query API)

Update By Query API

Update By Query请求

UpdateByQueryRequest可用于更新索引中的文档。

它需要在其上执行更新的现有索引(或一组索引)。

最简单的UpdateByQueryRequest形式如下所示:

UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
  • 在一组索引上创建UpdateByQueryRequest

默认情况下,版本冲突会中止UpdateByQueryRequest进程,但你可以通过在请求体中设置它为proceed来计数它们。

request.setConflicts("proceed");
  • 设置proceed当版本冲突。

你可以通过添加查询来限制文档。

request.setQuery(new TermQueryBuilder("user", "kimchy"));
  • 仅复制将字段user设置为kimchy的文档。

也可以通过设置大小来限制处理文档的数量。

request.setSize(10);
  • 只复制10条文档。

默认情况下,UpdateByQueryRequest使用1000批次,你可以使用setBatchSize更改批量大小。

request.setBatchSize(100);
  • 使用100个文档批次。

按查询更新还可以通过指定管道来使用摄取功能。

request.setPipeline("my_pipeline");

UpdateByQueryRequest还支持修改文档的script,以下示例说明了这一点。

request.setScript(
    new Script(
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));
  • setScript使用户为kimchy的所有文档上的likes字段递增。

UpdateByQueryRequest还有助于使用sliced-scroll自动并行化到_uid上的切片,使用setSlices指定要使用的切片数。

request.setSlices(2);
  • 设置要使用的切片数。

UpdateByQueryRequest使用scroll参数来控制它保持“搜索上下文”活动的时间。

request.setScroll(TimeValue.timeValueMinutes(10));
  • 设置滚动时间。

如果提供路由,则路由将复制到滚动查询,从而将进程限制为与该路由值匹配的碎片。

request.setRouting("=cat");
  • 设置路由。

可选参数

除上述选项外,还可以选择提供以下参数:

request.setTimeout(TimeValue.timeValueMinutes(2));
  • 等待查询请求更新执行作为TimeValue的超时时间。
request.setRefresh(true);
  • 通过调用查询更新后刷新索引。
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
  • 设置索引选项。

同步执行

BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

异步执行

通过查询更新请求异步执行需要将UpdateByQueryRequest实例和ActionListener实例传递给异步方法:

client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
  • 要执行的UpdateByQueryRequest和执行完成时要使用的ActionListener

异步方法不会阻塞并立即返回,完成后,如果执行成功完成,则使用onResponse方法回调ActionListener,如果失败则使用onFailure方法。

BulkByScrollResponse的典型监听器如下所示:

ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};
  • onResponse — 执行成功完成时调用,响应作为参数提供,并包含已执行的每个操作的单个结果列表,请注意,一个或多个操作可能已失败,而其他操作已成功执行。
  • onFailure — 在整个UpdateByQueryRequest失败时调用,在这种情况下,引发异常作为参数提供,并且没有执行任何操作。

Update By Query响应

返回的BulkByScrollResponse包含有关已执行操作的信息,并允许迭代每个结果,如下所示:

TimeValue timeTaken = bulkResponse.getTook(); 
boolean timedOut = bulkResponse.isTimedOut(); 
long totalDocs = bulkResponse.getTotal(); 
long updatedDocs = bulkResponse.getUpdated(); 
long deletedDocs = bulkResponse.getDeleted(); 
long batches = bulkResponse.getBatches(); 
long noops = bulkResponse.getNoops(); 
long versionConflicts = bulkResponse.getVersionConflicts(); 
long bulkRetries = bulkResponse.getBulkRetries(); 
long searchRetries = bulkResponse.getSearchRetries(); 
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); 
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); 
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
  • 获取总耗时。
  • 检查请求是否超时。
  • 获取处理的文档总数。
  • 已更新的文档数。
  • 已删除的文档数。
  • 已执行的批次数。
  • 跳过的文档数。
  • 版本冲突数。
  • 请求必须重试批量索引操作的次数。
  • 请求必须重试搜索操作的次数。
  • 此请求限制的总时间不包括当前正在休眠的当前节流时间。
  • 任何当前节流阀休眠的剩余延迟或如果不休眠则为0。
  • 搜索阶段的失败。
  • 批量索引操作期间的失败。

相关推荐