solr多组Merge Query原理
概要
Solr和ES搜索引擎都有集群模式,在Solr中可以向任何一个节点发起一个查询,如果查询中没有附带路由router键的话,该节点会扮演Merge的角色向集群中每个share的一个副本发起查询,最终将所有取到的结果排序,返回给客户端。
详细
为了测试需要在测试集群中构建一个Collection,有两个shard(shard中只有一个副本),使用查询Q为:id:9922600464a707520164a75ded18001f OR id:0000000064db34070164db344a460004,查询条件中的id在两个shard上各有一个存在。
从查询结果看集群中,一共累计进行了5次查询,为了实现一个全shard扫描查询将本来一个查询变成了5次查询,这还只是在shard为2的时候。经过试验,在shard为n的,集群merge查询会发起 1+n+1 到 1+2n 个子查询,所以在生产环境中尽量应该避免使用merge查询。
查询日志详细
最终结果集落在两个分片上
从2组上查询候选记录
15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request - [search4xxxx_shard2_replica1] webapp=/solr path=/select params={df=text &distrib=false &fl=id &fl=score &shards.purpose=4 &start=0 &fsv=true &shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard2_replica1/ &rows=2 &version=2 &q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004 &NOW=1533020971828 &isShard=true &wt=javabin&_=1533019917767} hits=1 status=0 QTime=0 |
从1组上查询候选记录
shards.purpose=4
15:09:31 [x:search4xxxx_shard1_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard1_replica1] webapp=/solr path=/select params={df=text&distrib=false&fl=id&fl=score&shards.purpose=4&start=0&fsv=true &shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard1_replica1/ &rows=2 &version=2 &q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004 &NOW=1533020971828 &isShard=true &wt=javabin &_=1533019917767} hits=1 status=0 QTime=0 |
从1组上召回field内容
shards.purpose=64
15:09:31 [x:search4xxxx_shard1_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard1_replica1] webapp=/solr path=/select params={ df=text &distrib=false &fl=id,[shard] &shards.purpose=64 &shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard1_replica1/ &rows=2 &version=2 &q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004 &NOW=1533020971828 &ids=0000000064db34070164db344a460004 &isShard=true &wt=javabin&_=1533019917767} status=0 QTime=0 |
从2组上召回field内容
shards.purpose=64
ids=9922600464a707520164a75ded18001f 15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard2_replica1] webapp=/solr path=/select params={ df=text &distrib=false &fl=id,[shard] &shards.purpose=64 &shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard2_replica1/ &rows=2 &version=2 &q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004 &NOW=1533020971828 &ids=9922600464a707520164a75ded18001f &isShard=true &wt=javabin &_=1533019917767} status=0 QTime=0 |
总查询
15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard2_replica1] webapp=/solr path=/select params={ q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004 &indent=on &fl=id,[shard] &rows=2&wt=json &_=1533019917767} hits=2 status=0 QTime=44 |
利用Solr的SearchComponent来优化MergeQuery
Solr客户端查询经常有通过Id列表的方式获取数据(这些Id是分布在多个shard上),换句话说客户端不需要对查询结果翻页,结果都在一页之中,有这样的前提就意味着可以省掉数据集排序的过程了,而排序在Mergequery中占用了非常多的时间,通过这个优化可以大大提高MergeQuery的查询性能。
可以扩展org.apache.solr.handler.component.SearchComponent 类来实现:
覆写distributedProcess方法:
@Override public int distributedProcess(ResponseBuilder rb) throws IOException { if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS; if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) { return createSubRequests(rb); } return ResponseBuilder.STAGE_DONE; }
将本来需要两个阶段的MergeQuery 简化成只有一个阶段(STAGE_GET_FIELDS阶段)省去了数据排序流程,直接出数据。详细代码:
package com.dfire.tis.solrextend.handler.component.s4product; import java.io.IOException; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.handler.component.ResponseBuilder; import org.apache.solr.handler.component.SearchComponent; import org.apache.solr.handler.component.ShardRequest; public class RealtimeGetQueryComponent extends SearchComponent { public static final String NAME = "RealtimeGetQuery"; @Override public void prepare(ResponseBuilder rb) throws IOException { } @Override public void process(ResponseBuilder rb) throws IOException { } @Override public int distributedProcess(ResponseBuilder rb) throws IOException { if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS; if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) { return createSubRequests(rb); } return ResponseBuilder.STAGE_DONE; } public int createSubRequests(ResponseBuilder rb) throws IOException { SolrParams params = rb.req.getParams(); CloudDescriptor cloudDescriptor = rb.req.getCore().getCoreDescriptor().getCloudDescriptor(); ZkController zkController = rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController(); String collection = cloudDescriptor.getCollectionName(); for (Slice slice : zkController.getClusterState().getActiveSlices(cloudDescriptor.getCollectionName())) { String shard = slice.getName(); ShardRequest sreq = new ShardRequest(); sreq.purpose = 1; sreq.shards = sliceToShards(rb, collection, shard); sreq.actualShards = sreq.shards; SolrQuery squery = new SolrQuery(); squery.set(ShardParams.SHARDS_QT, "/select"); String fields = params.get(CommonParams.FL); if (fields != null) { squery.set(CommonParams.FL, fields); } squery.set("distrib", false); squery.setQuery(params.get(CommonParams.Q)); sreq.params = squery; sreq.params.set("distrib", false); rb.addRequest(this, sreq); } return ResponseBuilder.STAGE_DONE; } private String[] sliceToShards(ResponseBuilder rb, String collection, String slice) { String lookup = collection + '_' + slice; // seems either form may be filled in rb.slices? for (int i = 0; i < rb.slices.length; i++) { if (lookup.equals(rb.slices[i]) || slice.equals(rb.slices[i])) { return new String[] { rb.shards[i] }; } } throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find shard '" + lookup + "'"); } @Override public String getDescription() { return NAME; } }
例子:
solrconfig:
<config> <searchComponent name="realtimeGetQueryComponent" class="handler.component.s4product.RealtimeGetQueryComponent" /> <requestHandler name="/rquery" class="solr.SearchHandler"> <lst name="defaults"> <str name="echoParams">explicit</str> <int name="rows">10</int> <str name="df">text</str> </lst> <arr name="last-components"> <str>realtimeGetQueryComponent</str> </arr> </requestHandler> </config>
查询代码:
public class TestRQuery extends BasicTestCase { public void testRquery() throws Exception { SolrQuery solrQuery = new SolrQuery(); StringBuilder idStr = new StringBuilder("is_valid:1 AND {!terms f=id}"); idStr.append( "000000006492817901649281d0380004,9992654455288a2b0155438eb12f00da,9992654461dbc2ad0161df5d152300fe,9992654355288a2b01553eaee9e400cb,9993236363d8ecba01641bf7d08c095f"); solrQuery.setQuery(idStr.toString()); solrQuery.setRows(5); solrQuery.setRequestHandler("/rquery"); System.out.println("start query"); QueryResponse response = client.mergeQuery("search4product", solrQuery, false); for (SolrDocument doc : response.getResults()) { System.out.println(doc.getFieldValue("id")); } } }