HBaseClient

package com.feng.scheduler.log;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author bjzhongdegen
 * 
 */
public class HBaseClient {
	protected static final Logger LOGGER = LoggerFactory.getLogger(HBaseClient.class);

	private static Configuration HBASE_CONF = null;
	private static HTablePool TABLE_POOL = null;
	private static final String CONNECTOR = "#";
	private static com.feng.scheduler.common.filemanager.Configuration commonConf = null;

	/**
	 * 初始化配置
	 */
	static {
		commonConf = new com.feng.scheduler.common.filemanager.Configuration("common-config.xml");
		Configuration conf = new Configuration();
		// 与hbase/conf/hbase-site.xml中hbase.zookeeper.quorum配置的值相同
		conf.set("hbase.zookeeper.quorum", commonConf.getDefault("hbase.zookeeper.quorum", "127.0.0.1"));
		// 与hbase/conf/hbase-site.xml中hbase.zookeeper.property.clientPort配置的值相同
		conf.set("hbase.zookeeper.property.clientPort", commonConf.getDefault("hbase.zookeeper.property.clientPort", "2181"));
		HBASE_CONF = HBaseConfiguration.create(conf);
		TABLE_POOL = new HTablePool(HBASE_CONF, 30);

	}

	public static LogRecord scanLog(String tableName, String prifixKey, String startkey, String stopKey) {
		LOGGER.info("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey);
		LogRecord result = new LogRecord();
		
		StringBuffer sb = new StringBuffer();
		HTableInterface table = null;
		try {
			table = TABLE_POOL.getTable(tableName);
			Scan s = new Scan();
			s.setCaching(100);
			List<Filter> list = new ArrayList<Filter>();

			Filter prifixFilter =new PrefixFilter(prifixKey.getBytes());
			Filter pageFilter = new PageFilter(100);
			list.add(pageFilter);
			list.add(prifixFilter);
			Filter all = new FilterList(Operator.MUST_PASS_ALL, list);
			s.setFilter(all);
			s.setStartRow(startkey.getBytes());
			if(!StringUtils.isBlank(stopKey)) {
				s.setStopRow(stopKey.getBytes());
			}
			ResultScanner rs = table.getScanner(s);
			for (Result r : rs) {
                        //新版本api
                        for(Cell cell:r.rawCells()){   
                        System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" ");
                        System.out.println("Timetamp:"+cell.getTimestamp()+" ");
                        System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" ");
                        System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" ");
                        System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" ");
                               }

				KeyValue[] kv = r.raw();
				for (int i = 0; i < kv.length; i++) {
					sb.append(new String(kv[i].getValue())).append("\n");
					if(i == kv.length - 1) {
						result.setNextTimestamp(getTimeStamp(new String(kv[i].getRow())));
					}
				}
			}
			rs.close();
		} catch (IOException e) {
			LOGGER.error("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey, e);
		} finally {
			closeHTable(table);
		}
		result.setContent(sb.toString());
		return result;
	}
	
	/**
	 * @param string
	 * @return
	 */
	private static Long getTimeStamp(String stopKey) {
		return Long.valueOf(stopKey.split(CONNECTOR)[1]);
	}

	public static void pushLog(String tableName, String taskInstanceId, String logContent) {
		pushLog(tableName, taskInstanceId, logContent, "f", "content");
	}
	
	public static void pushLog(String tableName, String taskInstanceId, String logContent, String family, String qualifier) {
		LOGGER.info("push " + taskInstanceId + " log to " + tableName);
		HTableInterface table = null;
		try {
			table = TABLE_POOL.getTable(tableName);
			Put put = new Put((taskInstanceId + CONNECTOR + System.currentTimeMillis()).getBytes());
			put.add(family.getBytes(), qualifier.getBytes(), logContent.getBytes());
			table.put(put);
		} catch (IOException ioe) {
			LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", ioe);
		} catch (Throwable e) {
			LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", e);
		} finally {
			closeHTable(table);
		}
	}

	private static void closeHTable(HTableInterface table) {
		if(table == null)
			return;
		
		try {
			table.close();
		} catch (IOException e) {
			LOGGER.warn("close hbase table FAILED", e);
		}
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
//		for(int i=0; i< 100;i ++) {
//			HBaseClient.pushLog("task_logs", "xxxxxx", "lllllllllllllllllllllllllll");
//		}
		
		LogRecord result = HBaseClient.scanLog("task_logs", "9#", "9#1", null);
		System.out.println(result.getContent());
		
	}
}

相关推荐