HBaseClient
package com.feng.scheduler.log; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author bjzhongdegen * */ public class HBaseClient { protected static final Logger LOGGER = LoggerFactory.getLogger(HBaseClient.class); private static Configuration HBASE_CONF = null; private static HTablePool TABLE_POOL = null; private static final String CONNECTOR = "#"; private static com.feng.scheduler.common.filemanager.Configuration commonConf = null; /** * 初始化配置 */ static { commonConf = new com.feng.scheduler.common.filemanager.Configuration("common-config.xml"); Configuration conf = new Configuration(); // 与hbase/conf/hbase-site.xml中hbase.zookeeper.quorum配置的值相同 conf.set("hbase.zookeeper.quorum", commonConf.getDefault("hbase.zookeeper.quorum", "127.0.0.1")); // 与hbase/conf/hbase-site.xml中hbase.zookeeper.property.clientPort配置的值相同 conf.set("hbase.zookeeper.property.clientPort", commonConf.getDefault("hbase.zookeeper.property.clientPort", "2181")); HBASE_CONF = HBaseConfiguration.create(conf); TABLE_POOL = new HTablePool(HBASE_CONF, 30); } public static LogRecord scanLog(String tableName, String prifixKey, String startkey, String stopKey) { LOGGER.info("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey); LogRecord result = new LogRecord(); StringBuffer sb = new StringBuffer(); HTableInterface table = null; try { table = TABLE_POOL.getTable(tableName); Scan s = new Scan(); s.setCaching(100); List<Filter> list = new ArrayList<Filter>(); Filter prifixFilter =new PrefixFilter(prifixKey.getBytes()); Filter pageFilter = new PageFilter(100); list.add(pageFilter); list.add(prifixFilter); Filter all = new FilterList(Operator.MUST_PASS_ALL, list); s.setFilter(all); s.setStartRow(startkey.getBytes()); if(!StringUtils.isBlank(stopKey)) { s.setStopRow(stopKey.getBytes()); } ResultScanner rs = table.getScanner(s); for (Result r : rs) { //新版本api for(Cell cell:r.rawCells()){ System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" "); System.out.println("Timetamp:"+cell.getTimestamp()+" "); System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" "); System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" "); System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" "); } KeyValue[] kv = r.raw(); for (int i = 0; i < kv.length; i++) { sb.append(new String(kv[i].getValue())).append("\n"); if(i == kv.length - 1) { result.setNextTimestamp(getTimeStamp(new String(kv[i].getRow()))); } } } rs.close(); } catch (IOException e) { LOGGER.error("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey, e); } finally { closeHTable(table); } result.setContent(sb.toString()); return result; } /** * @param string * @return */ private static Long getTimeStamp(String stopKey) { return Long.valueOf(stopKey.split(CONNECTOR)[1]); } public static void pushLog(String tableName, String taskInstanceId, String logContent) { pushLog(tableName, taskInstanceId, logContent, "f", "content"); } public static void pushLog(String tableName, String taskInstanceId, String logContent, String family, String qualifier) { LOGGER.info("push " + taskInstanceId + " log to " + tableName); HTableInterface table = null; try { table = TABLE_POOL.getTable(tableName); Put put = new Put((taskInstanceId + CONNECTOR + System.currentTimeMillis()).getBytes()); put.add(family.getBytes(), qualifier.getBytes(), logContent.getBytes()); table.put(put); } catch (IOException ioe) { LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", ioe); } catch (Throwable e) { LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", e); } finally { closeHTable(table); } } private static void closeHTable(HTableInterface table) { if(table == null) return; try { table.close(); } catch (IOException e) { LOGGER.warn("close hbase table FAILED", e); } } /** * @param args */ public static void main(String[] args) { // for(int i=0; i< 100;i ++) { // HBaseClient.pushLog("task_logs", "xxxxxx", "lllllllllllllllllllllllllll"); // } LogRecord result = HBaseClient.scanLog("task_logs", "9#", "9#1", null); System.out.println(result.getContent()); } }
相关推荐
minerd 2020-10-28
Kafka 2020-09-18
Wepe0 2020-10-30
杜倩 2020-10-29
windle 2020-10-29
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01