hbase compact源码分析

工作的地方不让上网,回家补个笔记,好惨好惨

主要的步骤都在HRegion下的Store compact方法中

Store.compact(final List<StoreFile> filesToCompact, 
                               final boolean majorCompaction, final long maxId)

1.根据filesToCompat,生成hfile所对应的StoreFileScanner 

2.创建StoreScanner 类,负责顺序便利所有要compact的file,对生成的storeFileScanner进行封装,主要方法为next

                   2.1创建ScanQueryMatcher 类,负责判断是否过滤删除keyvalue

                   2.2创建KeyValueHeap类,存放StoreFileScanner,进行堆的排序,根据hfile的startrowkey进行排序,hfile自身有序 

                   2.3维护一个current的StoreFileScanner

3.调用StoreScanner 的next方法

                  3.1获取current的StoreFileScanner的startrowkey,进行poll操作

                  3.2使用ScanQueryMatcher类,对输出的keyValue进行流程分支判断

                  3.3调用KeyValueHeap类next方法,更新current的StoreFileScanner,若当前current.startrowkey>heap.peek.startrowkey,则将current放入堆中,并从堆中取出作为current,使得下次获得当前最小keyvalue;(keyvalueheap使用PriorityQueue)

补贴一个StoreScanner调用ScanQueryMatcher类的next方法,在这里确定kv是写入新tmp文件,还是skip掉

StoreScanner类 
public boolean next(List<Cell> outResult, int limit) throws IOException {
    lock.lock();
    try {
    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher
    // if scanning intra-row
    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
        matcher.rowOffset, matcher.rowLength)) {
      this.countPerRow = 0;
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    int count = 0;
    LOOP: while((kv = this.heap.peek()) != null) {
      if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
      checkScanOrder(prevKV, kv, comparator);
      prevKV = kv;

      ScanQueryMatcher.MatchCode qcode = matcher.match(kv);//使用ScanQueryMatcher类,进行判断,查看是否
      switch(qcode) {
        case INCLUDE:
删除数据的逻辑
     /*
     * The delete logic is pretty complicated now.
     * This is corroborated by the following:
     * 1. The store might be instructed to keep deleted rows around.
     * 2. A scan can optionally see past a delete marker now.
     * 3. If deleted rows are kept, we have to find out when we can
     *    remove the delete markers.
     * 4. Family delete markers are always first (regardless of their TS)
     * 5. Delete markers should not be counted as version
     * 6. Delete markers affect puts of the *same* TS
     * 7. Delete marker need to be version counted together with puts
     *    they affect
     */
 byte type = bytes[initialOffset + keyLength - 1];
    if (kv.isDelete()) {
      if (!keepDeletedCells) {
        // first ignore delete markers if the scanner can do so, and the
        // range does not include the marker
        //
        // during flushes and compactions also ignore delete markers newer
        // than the readpoint of any open scanner, this prevents deleted
        // rows that could still be seen by a scanner from being collected
        boolean includeDeleteMarker = seePastDeleteMarkers ?
            tr.withinTimeRange(timestamp) :
            tr.withinOrAfterTimeRange(timestamp);
        if (includeDeleteMarker
            && kv.getMvccVersion() <= maxReadPointToTrackVersions) {
          this.deletes.add(bytes, offset, qualLength, timestamp, type);
        }
        // Can't early out now, because DelFam come before any other keys
      }
      if (retainDeletesInOutput
          || (!isUserScan && (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes)
          || kv.getMvccVersion() > maxReadPointToTrackVersions) {//minor compact
        // always include or it is not time yet to check whether it is OK
        // to purge deltes or not
        if (!isUserScan) {
          // if this is not a user scan (compaction), we can filter this deletemarker right here
          // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
          return MatchCode.INCLUDE;
        }
      } else if (keepDeletedCells) {
        if (timestamp < earliestPutTs) {
          // keeping delete rows, but there are no puts older than
          // this delete in the store files.
          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
        }
        // else: fall through and do version counting on the
        // delete markers
      } else {//major compact
        return MatchCode.SKIP;
      }
      // note the following next else if...
      // delete marker are not subject to other delete markers
    } else if (!this.deletes.isEmpty()) {
      DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
          timestamp);
      switch (deleteResult) {
        case FAMILY_DELETED:
        case COLUMN_DELETED:
          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
        case VERSION_DELETED:
        case FAMILY_VERSION_DELETED:
          return MatchCode.SKIP;
        case NOT_DELETED:
          break;
        default:
          throw new RuntimeException("UNEXPECTED");
        }
    }
/**
 *做scan的类型
 * Enum to distinguish general scan types.
 */
@InterfaceAudience.Private
public enum ScanType {
  COMPACT_DROP_DELETES,
  COMPACT_RETAIN_DELETES,
  USER_SCAN
}
  
对delete data的处理
  /*
   * The following three booleans define how we deal with deletes.
   * There are three different aspects:
   * 1. Whether to keep delete markers. This is used in compactions.
   *    Minor compactions always keep delete markers.
   * 2. Whether to keep deleted rows. This is also used in compactions,
   *    if the store is set to keep deleted rows. This implies keeping
   *    the delete markers as well.
   *    In this case deleted rows are subject to the normal max version
   *    and TTL/min version rules just like "normal" rows.
   * 3. Whether a scan can do time travel queries even before deleted
   *    marker to reach deleted rows.
   */
 

相关推荐