hbase compact源码分析
工作的地方不让上网,回家补个笔记,好惨好惨
主要的步骤都在HRegion下的Store compact方法中
Store.compact(final List<StoreFile> filesToCompact, final boolean majorCompaction, final long maxId)
1.根据filesToCompat,生成hfile所对应的StoreFileScanner
2.创建StoreScanner 类,负责顺序便利所有要compact的file,对生成的storeFileScanner进行封装,主要方法为next
2.1创建ScanQueryMatcher 类,负责判断是否过滤删除keyvalue
2.2创建KeyValueHeap类,存放StoreFileScanner,进行堆的排序,根据hfile的startrowkey进行排序,hfile自身有序
2.3维护一个current的StoreFileScanner
3.调用StoreScanner 的next方法
3.1获取current的StoreFileScanner的startrowkey,进行poll操作
3.2使用ScanQueryMatcher类,对输出的keyValue进行流程分支判断
3.3调用KeyValueHeap类next方法,更新current的StoreFileScanner,若当前current.startrowkey>heap.peek.startrowkey,则将current放入堆中,并从堆中取出作为current,使得下次获得当前最小keyvalue;(keyvalueheap使用PriorityQueue)
补贴一个StoreScanner调用ScanQueryMatcher类的next方法,在这里确定kv是写入新tmp文件,还是skip掉
StoreScanner类 public boolean next(List<Cell> outResult, int limit) throws IOException { lock.lock(); try { if (checkReseek()) { return true; } // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { close(); return false; } KeyValue peeked = this.heap.peek(); if (peeked == null) { close(); return false; } // only call setRow if the row changes; avoids confusing the query matcher // if scanning intra-row byte[] row = peeked.getBuffer(); int offset = peeked.getRowOffset(); short length = peeked.getRowLength(); if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { this.countPerRow = 0; matcher.setRow(row, offset, length); } KeyValue kv; // Only do a sanity-check if store and comparator are available. KeyValue.KVComparator comparator = store != null ? store.getComparator() : null; int count = 0; LOOP: while((kv = this.heap.peek()) != null) { if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevKV, kv, comparator); prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv);//使用ScanQueryMatcher类,进行判断,查看是否 switch(qcode) { case INCLUDE:
删除数据的逻辑 /* * The delete logic is pretty complicated now. * This is corroborated by the following: * 1. The store might be instructed to keep deleted rows around. * 2. A scan can optionally see past a delete marker now. * 3. If deleted rows are kept, we have to find out when we can * remove the delete markers. * 4. Family delete markers are always first (regardless of their TS) * 5. Delete markers should not be counted as version * 6. Delete markers affect puts of the *same* TS * 7. Delete marker need to be version counted together with puts * they affect */ byte type = bytes[initialOffset + keyLength - 1]; if (kv.isDelete()) { if (!keepDeletedCells) { // first ignore delete markers if the scanner can do so, and the // range does not include the marker // // during flushes and compactions also ignore delete markers newer // than the readpoint of any open scanner, this prevents deleted // rows that could still be seen by a scanner from being collected boolean includeDeleteMarker = seePastDeleteMarkers ? tr.withinTimeRange(timestamp) : tr.withinOrAfterTimeRange(timestamp); if (includeDeleteMarker && kv.getMvccVersion() <= maxReadPointToTrackVersions) { this.deletes.add(bytes, offset, qualLength, timestamp, type); } // Can't early out now, because DelFam come before any other keys } if (retainDeletesInOutput || (!isUserScan && (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes) || kv.getMvccVersion() > maxReadPointToTrackVersions) {//minor compact // always include or it is not time yet to check whether it is OK // to purge deltes or not if (!isUserScan) { // if this is not a user scan (compaction), we can filter this deletemarker right here // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking return MatchCode.INCLUDE; } } else if (keepDeletedCells) { if (timestamp < earliestPutTs) { // keeping delete rows, but there are no puts older than // this delete in the store files. return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } // else: fall through and do version counting on the // delete markers } else {//major compact return MatchCode.SKIP; } // note the following next else if... // delete marker are not subject to other delete markers } else if (!this.deletes.isEmpty()) { DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength, timestamp); switch (deleteResult) { case FAMILY_DELETED: case COLUMN_DELETED: return columns.getNextRowOrNextColumn(bytes, offset, qualLength); case VERSION_DELETED: case FAMILY_VERSION_DELETED: return MatchCode.SKIP; case NOT_DELETED: break; default: throw new RuntimeException("UNEXPECTED"); } }
/** *做scan的类型 * Enum to distinguish general scan types. */ @InterfaceAudience.Private public enum ScanType { COMPACT_DROP_DELETES, COMPACT_RETAIN_DELETES, USER_SCAN }
对delete data的处理 /* * The following three booleans define how we deal with deletes. * There are three different aspects: * 1. Whether to keep delete markers. This is used in compactions. * Minor compactions always keep delete markers. * 2. Whether to keep deleted rows. This is also used in compactions, * if the store is set to keep deleted rows. This implies keeping * the delete markers as well. * In this case deleted rows are subject to the normal max version * and TTL/min version rules just like "normal" rows. * 3. Whether a scan can do time travel queries even before deleted * marker to reach deleted rows. */