solr多组Merge Query原理

概要

    Solr和ES搜索引擎都有集群模式,在Solr中可以向任何一个节点发起一个查询,如果查询中没有附带路由router键的话,该节点会扮演Merge的角色向集群中每个share的一个副本发起查询,最终将所有取到的结果排序,返回给客户端。

详细

   为了测试需要在测试集群中构建一个Collection,有两个shard(shard中只有一个副本),使用查询Q为:id:9922600464a707520164a75ded18001f OR id:0000000064db34070164db344a460004,查询条件中的id在两个shard上各有一个存在。

   从查询结果看集群中,一共累计进行了5次查询solr多组Merge Query原理,为了实现一个全shard扫描查询将本来一个查询变成了5次查询,这还只是在shard为2的时候。经过试验,在shard为n的,集群merge查询会发起 1+n+1 到 1+2n 个子查询,所以在生产环境中尽量应该避免使用merge查询。

查询日志详细

最终结果集落在两个分片上

从2组上查询候选记录

15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request

- [search4xxxx_shard2_replica1]  webapp=/solr path=/select

params={df=text

&distrib=false

&fl=id

&fl=score

&shards.purpose=4

&start=0

&fsv=true

&shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard2_replica1/

&rows=2

&version=2

&q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004

&NOW=1533020971828

&isShard=true

&wt=javabin&_=1533019917767} hits=1 status=0 QTime=0

从1组上查询候选记录

shards.purpose=4

15:09:31 [x:search4xxxx_shard1_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard1_replica1]  webapp=/solr path=/select params={df=text&distrib=false&fl=id&fl=score&shards.purpose=4&start=0&fsv=true

&shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard1_replica1/

&rows=2

&version=2

&q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004

&NOW=1533020971828

&isShard=true

&wt=javabin

&_=1533019917767} hits=1 status=0 QTime=0

从1组上召回field内容

shards.purpose=64

15:09:31 [x:search4xxxx_shard1_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard1_replica1]  webapp=/solr path=/select params={

df=text

&distrib=false

&fl=id,[shard]

&shards.purpose=64

&shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard1_replica1/

&rows=2

&version=2

&q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004

&NOW=1533020971828

&ids=0000000064db34070164db344a460004

&isShard=true

&wt=javabin&_=1533019917767} status=0 QTime=0

从2组上召回field内容

shards.purpose=64

ids=9922600464a707520164a75ded18001f

15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request-

[search4xxxx_shard2_replica1]  webapp=/solr path=/select params={

df=text

&distrib=false

&fl=id,[shard]

&shards.purpose=64

&shard.url=http://127.0.0.1:8080/solr/search4xxxx_shard2_replica1/

&rows=2

&version=2

&q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004

&NOW=1533020971828

&ids=9922600464a707520164a75ded18001f

&isShard=true

&wt=javabin

&_=1533019917767} status=0 QTime=0

总查询

15:09:31 [x:search4xxxx_shard2_replica1] INFO o.apache.solr.core.SolrCore.Request- [search4xxxx_shard2_replica1] 

webapp=/solr

path=/select params={

q=id:9922600464a707520164a75ded18001f+OR+id:0000000064db34070164db344a460004

&indent=on

&fl=id,[shard]

&rows=2&wt=json

&_=1533019917767} hits=2 status=0 QTime=44

利用Solr的SearchComponent来优化MergeQuery

Solr客户端查询经常有通过Id列表的方式获取数据(这些Id是分布在多个shard上),换句话说客户端不需要对查询结果翻页,结果都在一页之中,有这样的前提就意味着可以省掉数据集排序的过程了,而排序在Mergequery中占用了非常多的时间,通过这个优化可以大大提高MergeQuery的查询性能。

可以扩展org.apache.solr.handler.component.SearchComponent 类来实现:

覆写distributedProcess方法:

@Override
	public int distributedProcess(ResponseBuilder rb) throws IOException {
		if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS)
			return ResponseBuilder.STAGE_GET_FIELDS;
		if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
			return createSubRequests(rb);
		}
		return ResponseBuilder.STAGE_DONE;
	}

 将本来需要两个阶段的MergeQuery 简化成只有一个阶段(STAGE_GET_FIELDS阶段)省去了数据排序流程,直接出数据。详细代码:

package com.dfire.tis.solrextend.handler.component.s4product;

import java.io.IOException;

import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.SearchComponent;
import org.apache.solr.handler.component.ShardRequest;

public class RealtimeGetQueryComponent extends SearchComponent {
	public static final String NAME = "RealtimeGetQuery";
	@Override
	public void prepare(ResponseBuilder rb) throws IOException {
	}

	@Override
	public void process(ResponseBuilder rb) throws IOException {
	}

	@Override
	public int distributedProcess(ResponseBuilder rb) throws IOException {
		if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS)
			return ResponseBuilder.STAGE_GET_FIELDS;
		if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
			return createSubRequests(rb);
		}
		return ResponseBuilder.STAGE_DONE;
	}

	public int createSubRequests(ResponseBuilder rb) throws IOException {
		SolrParams params = rb.req.getParams();
		CloudDescriptor cloudDescriptor = rb.req.getCore().getCoreDescriptor().getCloudDescriptor();
		ZkController zkController = rb.req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
		String collection = cloudDescriptor.getCollectionName();

		for (Slice slice : zkController.getClusterState().getActiveSlices(cloudDescriptor.getCollectionName())) {
			String shard = slice.getName();
			ShardRequest sreq = new ShardRequest();

			sreq.purpose = 1;
			sreq.shards = sliceToShards(rb, collection, shard);
			sreq.actualShards = sreq.shards;

			SolrQuery squery = new SolrQuery();
			squery.set(ShardParams.SHARDS_QT, "/select");
			String fields = params.get(CommonParams.FL);
			if (fields != null) {
				squery.set(CommonParams.FL, fields);
			}
			squery.set("distrib", false);
			squery.setQuery(params.get(CommonParams.Q));
			sreq.params = squery;
			sreq.params.set("distrib", false);

			rb.addRequest(this, sreq);
		}
		return ResponseBuilder.STAGE_DONE;
	}

	private String[] sliceToShards(ResponseBuilder rb, String collection, String slice) {
		String lookup = collection + '_' + slice; // seems either form may be filled in rb.slices?
		for (int i = 0; i < rb.slices.length; i++) {
			if (lookup.equals(rb.slices[i]) || slice.equals(rb.slices[i])) {
				return new String[] { rb.shards[i] };
			}
		}

		throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find shard '" + lookup + "'");
	}

	@Override
	public String getDescription() {
		return NAME;
	}
}

 例子:

    solrconfig:    

<config>

  <searchComponent name="realtimeGetQueryComponent" class="handler.component.s4product.RealtimeGetQueryComponent" />
   <requestHandler name="/rquery" class="solr.SearchHandler">
    <lst name="defaults">
      <str name="echoParams">explicit</str>
      <int name="rows">10</int>
      <str name="df">text</str>
    </lst>
    <arr name="last-components">
      <str>realtimeGetQueryComponent</str>
    </arr>
  </requestHandler>

</config>

 查询代码:

public class TestRQuery extends BasicTestCase {

	public void testRquery() throws Exception {
		SolrQuery solrQuery = new SolrQuery();
		StringBuilder idStr = new StringBuilder("is_valid:1 AND {!terms f=id}");
		idStr.append(
				"000000006492817901649281d0380004,9992654455288a2b0155438eb12f00da,9992654461dbc2ad0161df5d152300fe,9992654355288a2b01553eaee9e400cb,9993236363d8ecba01641bf7d08c095f");

		solrQuery.setQuery(idStr.toString());
		solrQuery.setRows(5);

		solrQuery.setRequestHandler("/rquery");

		System.out.println("start query");
		QueryResponse response = client.mergeQuery("search4product", solrQuery, false);
		for (SolrDocument doc : response.getResults()) {
			System.out.println(doc.getFieldValue("id"));
		}
	}
}

相关推荐