ElasticSearch如何一次查询出全部数据——基于Scroll
Elasticsearch 查询结果默认只显示10条,可以通过设置from及size来达到分页的效果(详见附3),但是 from + size <= 10,000,因为index.max_result_window 默认值是10,000,而 from+ size 必须小于index.max_result_window 。因此只能用Scroll取出所有的结果,
- Scroll相当于传统数据库的游标,具体代码片段如下:
SearchResponse scrollResp = client.prepareSearch(availableIndices) .setTypes(type) .setScroll(new TimeValue(60000)) .setQuery(boolQueryBuilder) .setSize(SEARCH_HITS_SIZE).get(); //max of SEARCH_HITS_SIZE hits will be returned for each scroll //Scroll until no hits are returned do { for (SearchHit hit : scrollResp.getHits().getHits()) { tmpJsonList.add( (JSONObject) JSONValue.parse(hit.getSourceAsString())); } } jsonList.addAll(tmpJsonList); tmpJsonList.clear(); scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0);
setScroll()里传入的时间,表示一次处理setSize()里的数据超时时间。即处理一个分页最长不超过的时间,上面的代码表示1分钟。scrollResp.getScrollId()每次回生成一个ScrollID,如下图:
- 用from + size循环读取的代码片段如下:
int index = 0; { tmpJsonList.clear(); srb.setFrom(Math.multiplyExact(index, SEARCH_HITS_SIZE)); index++; MultiSearchResponse.Item[] items = sr.get().getResponses(); for (MultiSearchResponse.Item item : items) { SearchResponse response = item.getResponse(); SearchHit[] hits = response.getHits().getHits(); if (hits.length != 0) { for (SearchHit hit : hits) { tmpJsonList.add((JSONObject) JSONValue.parse(hit.getSourceAsString()); } } } jsonList.addAll(tmpJsonList); } } while (tmpJsonList.size() > 0);
其中:SEARCH_HITS_SIZE = 1000, srb是多条件组合查询,前置代码如下:
queryBuilders.forEach(query -> { boolQueryBuilder.must(query); }); MultiSearchRequestBuilder sr = client.prepareMultiSearch(); SearchRequestBuilder srb = client.prepareSearch().setTypes(type).setIndices(availableIndices).setQuery(boolQueryBuilder).setSize(SEARCH_HITS_SIZE); sr.add(srb);
查询条件的构造代码片段如下(用QueryBuilders根据需要选择term, range, match等):
StringUtil.isEmpty(l7p)) { queryBuilders.add(QueryBuilders.termQuery(Event.FIELD_L7P, l7p)); } if (!StringUtil.isEmpty(startTime) && StringUtil.isEmpty(endTime)) { queryBuilders.add(QueryBuilders.rangeQuery(Event.FIELD_START_TIME).from(startTime)); }
附:
1)using scroll in java https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-search-scrolling.html
2)scroll https://www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html
3) from and size https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#request-body-search-from-size
*****************************************************************************************************
精力有限,想法太多,专注做好一件事就行
- 我只是一个程序猿。5年内把代码写好,技术博客字字推敲,坚持零拷贝和原创
- 写博客的意义在于打磨文笔,训练逻辑条理性,加深对知识的系统性理解;如果恰好又对别人有点帮助,那真是一件令人开心的事
*****************************************************************************************************
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。