hbase的CoprocessorProtocol及一个简单的通用扩展实现V2
hbase中的CoprocessorProtocol机制.
CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。
先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。
@Test public void testAggregationClient() throws Throwable { LongColumnInterpreter columnInterpreter = new LongColumnInterpreter(); AggregationClient aggregationClient = new AggregationClient( CommonConfig.getConfiguration()); Scan scan = new Scan(); scan.addColumn(ColumnFamilyName, QName1); Long max = aggregationClient.max(TableNameBytes, columnInterpreter, scan); Assert.assertTrue(max.longValue() == 100); Long min = aggregationClient.min(TableNameBytes, columnInterpreter, scan); Assert.assertTrue(min.longValue() == 20); Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter, scan); Assert.assertTrue(sum.longValue() == 120); Long count = aggregationClient.rowCount(TableNameBytes, columnInterpreter, scan); Assert.assertTrue(count.longValue() == 4); }
看下hbase的源码。AggregateImplementation
@Override public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan) throws IOException { T temp; T max = null; InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) .getRegion().getScanner(scan); List<KeyValue> results = new ArrayList<KeyValue>(); byte[] colFamily = scan.getFamilies()[0]; byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); // qualifier can be null. try { boolean hasMoreRows = false; do { hasMoreRows = scanner.next(results); for (KeyValue kv : results) { temp = ci.getValue(colFamily, qualifier, kv); max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max; } results.clear(); } while (hasMoreRows); } finally { scanner.close(); } log.info("Maximum from this region is " + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() .getRegionNameAsString() + ": " + max); return max; }
这里由于
byte[] colFamily = scan.getFamilies()[0]; byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
所以,hbase自带的Aggregate函数,只能面向单列进行统计。
当我们想对多列进行Aggregate,并同时进行countRow时,有以下选择。
1scan出所有的row,程序自己进行Aggregate和count。
2使用AggregationClient,调用多次,得到所有的结果。由于多次调用,有一致性问题。
3自己扩展CoprocessorProtocol。
首先我们可以写一个protocol的通用框架。
定义protocol接口。
public interface CommonCoprocessorProtocol extends CoprocessorProtocol { public static final long VERSION = 345L; public <T> T handle(KeyValueListHandler<T> handler, Scan scan) throws IOException; }
定义该protocol的实现。
public class CommonEndpointImpl extends BaseEndpointCoprocessor implements CommonCoprocessorProtocol { protected static Log log = LogFactory.getLog(CommonEndpointImpl.class); @Override public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) throws IOException { if (CommonCoprocessorProtocol.class.getName().equals(protocol)) { return new ProtocolSignature(CommonCoprocessorProtocol.VERSION, null); } throw new IOException("Unknown protocol: " + protocol); } @Override public <T> T handle(KeyValueListHandler<T> handler, Scan scan) throws IOException { InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) .getRegion().getScanner(scan); List<KeyValue> results = new ArrayList<KeyValue>(); T t = handler.getInitValue(); try { boolean hasMoreRows = false; do { hasMoreRows = scanner.next(results); t = handler.handle(results, t); results.clear(); } while (hasMoreRows); } finally { scanner.close(); } return t; } }
定义一个KeyValueListHandler。
public interface KeyValueListHandler<T> extends Writable { public T getInitValue(); public T handle(List<KeyValue> keyValues, T t); }
定义一个reduce。
public interface ClientReducer<T, R> { public R getInitValue(); public R reduce(R r, T t); }
定义一个client。
public class CpClient { private HTableInterface table; public CpClient(HTableInterface table) { this.table = table; } public <T, R> R call(final KeyValueListHandler<T> handler, final ClientReducer<T, R> reducer, final Scan scan) throws Throwable { class MyCallBack implements Batch.Callback<T> { R r = reducer.getInitValue(); R getResult() { return r; } @Override public synchronized void update(byte[] region, byte[] row, T result) { r = reducer.reduce(r, result); } } MyCallBack myCallBack = new MyCallBack(); try { table.coprocessorExec(CommonCoprocessorProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<CommonCoprocessorProtocol, T>() { @Override public T call(CommonCoprocessorProtocol instance) throws IOException { return instance.handle(handler, scan); } }, myCallBack); } finally { table.close(); } return myCallBack.getResult(); } }
这样,我们就有了一个protocol的通用框架。
假设我们要同时得到多个列的sum和结果的count,我们通过实现这些接口和定义一些request和result类来实现。
public class AggrRequest implements Writable { private List<byte[]> families = new ArrayList<byte[]>(); private List<byte[]> qualifiers = new ArrayList<byte[]>(); public AggrRequest() { } public void add(String family, String qualifier) { if (family != null && qualifier != null) { this.families.add(Bytes.toBytes(family)); this.qualifiers.add(Bytes.toBytes(qualifier)); } } public int getColumnSize() { return families.size(); } public byte[] getFamily(int index) { return families.get(index); } public byte[] getQualifer(int index) { return qualifiers.get(index); } @Override public void readFields(DataInput dataInput) throws IOException { int size = dataInput.readInt(); for (int i = 0; i < size; i++) { families.add(Bytes.toBytes(dataInput.readUTF())); } for (int i = 0; i < size; i++) { qualifiers.add(Bytes.toBytes(dataInput.readUTF())); } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(getColumnSize()); for (byte[] b : families) { dataOutput.writeUTF(Bytes.toString(b)); } for (byte[] b : qualifiers) { dataOutput.writeUTF(Bytes.toString(b)); } } } public class AggrResult implements Writable { private AggrRequest aggrRequest; private long[] sum; private long count; public AggrResult() { } public AggrResult(AggrRequest aggrRequest) { this.aggrRequest = aggrRequest; sum = new long[aggrRequest.getColumnSize()]; } public int getColumnSize() { return aggrRequest.getColumnSize(); } public byte[] getFamily(int index) { return aggrRequest.getFamily(index); } public byte[] getQualifer(int index) { return aggrRequest.getQualifer(index); } public long getSum(int index) { return sum[index]; } public void setSum(int index, long value) { sum[index] = value; } // getter and setter. public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public void readFields(DataInput dataInput) throws IOException { int columnSize = dataInput.readInt(); sum = new long[columnSize]; for (int i = 0; i < columnSize; i++) { sum[i] = dataInput.readLong(); } count = dataInput.readLong(); aggrRequest = new AggrRequest(); aggrRequest.readFields(dataInput); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(aggrRequest.getColumnSize()); for (long v : sum) { dataOutput.writeLong(v); } dataOutput.writeLong(count); aggrRequest.write(dataOutput); } } public class AggrHandler implements KeyValueListHandler<AggrResult> { private AggrRequest aggrRequest; public AggrHandler() { } public AggrHandler(AggrRequest aggrRequest) { this.aggrRequest = aggrRequest; } @Override public void readFields(DataInput dataInput) throws IOException { aggrRequest = new AggrRequest(); aggrRequest.readFields(dataInput); } @Override public void write(DataOutput dataOutput) throws IOException { aggrRequest.write(dataOutput); } @Override public AggrResult getInitValue() { AggrResult aggrResult = new AggrResult(aggrRequest); return aggrResult; } @Override public AggrResult handle(List<KeyValue> keyValues, AggrResult t) { if (keyValues.isEmpty()) { return t; } t.setCount(t.getCount() + 1); int columnSize = t.getColumnSize(); for (int i = 0; i < columnSize; i++) { byte[] family = t.getFamily(i); byte[] qualifer = t.getQualifer(i); for (KeyValue kv : keyValues) { if (kv != null) { if (Bytes.equals(qualifer, 0, qualifer.length, kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()) && Bytes.equals(family, 0, family.length, kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength())) { if (kv.getValueLength() == Bytes.SIZEOF_LONG) { long tem = Bytes.toLong(kv.getBuffer(), kv.getValueOffset()); t.setSum(i, t.getSum(i) + tem); } } } } } return t; } } public class AggrReducer implements ClientReducer<AggrResult, AggrResult> { @Override public AggrResult getInitValue() { return null; } @Override public AggrResult reduce(AggrResult r, AggrResult t) { if (r == null) return t; if (t == null) return r; r.setCount(r.getCount() + t.getCount()); int columnSize = r.getColumnSize(); for (int i = 0; i < columnSize; i++) { r.setSum(i, r.getSum(i) + t.getSum(i)); } return r; } }
有了CoprocessorProtocol,可以扩展出来很多的功能,这个机制还是很强大的。
代码见https://github.com/zhang-xzhi/simplehbase
并且有测试代码。