聊聊storm的CheckpointSpout

本文主要研究一下storm的CheckpointSpout

TopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(_workerHooks);

        if (!_componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(_componentToSharedMemory);
            stormTopology.set_shared_memory(_sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

    /**
     * If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }

    private void maybeAddCheckpointInputs(ComponentCommon common) {
        if (hasStatefulBolt) {
            addCheckPointInputs(common);
        }
    }

    /**
     * If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
     * checkpoint tuples can flow through the topology.
     */
    private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
        if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
            bolt = new CheckpointTupleForwarder(bolt);
        }
        return bolt;
    }

    /**
     * For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
     * bolts, add checkpoint stream from the previous bolt to its input.
     */
    private void addCheckPointInputs(ComponentCommon component) {
        Set<GlobalStreamId> checkPointInputs = new HashSet<>();
        for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
            String sourceId = inputStream.get_componentId();
            if (_spouts.containsKey(sourceId)) {
                checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
            } else {
                checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
            }
        }
        for (GlobalStreamId streamId : checkPointInputs) {
            component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
        }
    }
  • TopologyBuilder在createTopology的时候,会调用maybeAddCheckpointSpout,如果是hasStatefulBolt的话,则会自动创建并添加CheckpointSpout
  • 如果是hasStatefulBolt,bolt不是StatefulBoltExecutor类型,则会使用CheckpointTupleForwarder进行包装
  • 如果是hasStatefulBolt,会调用addCheckPointInputs,配置inputs

CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.java

/**
 * Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology.
 * If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per
 * topology. Checkpoint spout stores its internal state in a {@link KeyValueState}.
 *
 * @see CheckPointState
 */
public class CheckpointSpout extends BaseRichSpout {
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context));
    }

    // package access for unit test
    void open(TopologyContext context, SpoutOutputCollector collector,
              int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
        this.context = context;
        this.collector = collector;
        this.checkpointInterval = checkpointInterval;
        this.sleepInterval = checkpointInterval / 10;
        this.checkpointState = checkpointState;
        this.curTxState = checkpointState.get(TX_STATE_KEY);
        lastCheckpointTs = 0;
        recoveryStepInProgress = false;
        checkpointStepInProgress = false;
        recovering = true;
    }

    @Override
    public void nextTuple() {
        if (shouldRecover()) {
            handleRecovery();
            startProgress();
        } else if (shouldCheckpoint()) {
            doCheckpoint();
            startProgress();
        } else {
            Utils.sleep(sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
        if (curTxState.getTxid() == ((Number) msgId).longValue()) {
            if (recovering) {
                handleRecoveryAck();
            } else {
                handleCheckpointAck();
            }
        } else {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
        }
        resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.debug("Got fail with msgid {}", msgId);
        if (!recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            recovering = true;
        }
        resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
            interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        // ensure checkpoint interval is not less than a sane low value.
        interval = Math.max(100, interval);
        LOG.info("Checkpoint interval is {} millis", interval);
        return interval;
    }

    private boolean shouldCheckpoint() {
        return !recovering && !checkpointStepInProgress &&
               (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (curTxState.getState() == COMMITTED) {
            saveTxState(curTxState.nextState(false));
            lastCheckpointTs = System.currentTimeMillis();
        }
        Action action = curTxState.nextAction(false);
        emit(curTxState.getTxid(), action);
    }

    private void emit(long txid, Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
        collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
    }

    //......
}
  • CheckpointSpout从Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms)读取checkpoint的时间间隔,defaults.yaml中默认是1000,如果没有指定,则使用100,最低值为100
  • nextTuple方法首先判断shouldRecover,如果需要恢复则调用handleRecovery进行恢复,然后startProgress;如果需要checkpoint则进行checkpoint,否则sleepInterval再进行下次判断
  • 如果不需要recover,则调用shouldCheckpoint方法判断是否需要进行checkpoint,如果当前状态不是COMMITTED或者当前时间距离上次checkpoint的时间超过了checkpointInterval,则进行doCheckpoint操作,往CHECKPOINT_STREAM_ID发送下一步的action
  • CheckpointSpout在收到ack之后会进行saveTxState操作,调用checkpointState.commit提交整个checkpoint,然后调用resetProgress重置状态
  • 如果是fail的ack,则调用resetProgress重置状态

CheckPointState

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.java

/**
     * Get the next state based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next checkpoint state based on this state.
     */
    public CheckPointState nextState(boolean recovering) {
        CheckPointState nextState;
        switch (state) {
            case PREPARING:
                nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
                break;
            case COMMITTING:
                nextState = new CheckPointState(txid, COMMITTED);
                break;
            case COMMITTED:
                nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return nextState;
    }

    /**
     * Get the next action to perform based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next action to perform based on this state
     */
    public Action nextAction(boolean recovering) {
        Action action;
        switch (state) {
            case PREPARING:
                action = recovering ? Action.ROLLBACK : Action.PREPARE;
                break;
            case COMMITTING:
                action = Action.COMMIT;
                break;
            case COMMITTED:
                action = recovering ? Action.INITSTATE : Action.PREPARE;
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return action;
    }
  • CheckPointState提供了nextState方法进行状态的切换,nextAction方法则提供了对应state的的下个动作

BaseStatefulBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java

public void execute(Tuple input) {
        if (CheckpointSpout.isCheckpoint(input)) {
            processCheckpoint(input);
        } else {
            handleTuple(input);
        }
    }

    /**
     * Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
     */
    private void processCheckpoint(Tuple input) {
        CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
        if (shouldProcessTransaction(action, txid)) {
            LOG.debug("Processing action {}, txid {}", action, txid);
            try {
                if (txid >= lastTxid) {
                    handleCheckpoint(input, action, txid);
                    if (action == ROLLBACK) {
                        lastTxid = txid - 1;
                    } else {
                        lastTxid = txid;
                    }
                } else {
                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
                    collector.ack(input);
                }
            } catch (Throwable th) {
                LOG.error("Got error while processing checkpoint tuple", th);
                collector.fail(input);
                collector.reportError(th);
            }
        } else {
            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
                      "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
            collector.ack(input);
        }
    }

    /**
     * Checks if check points have been received from all tasks across all input streams to this component
     */
    private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
        TransactionRequest request = new TransactionRequest(action, txid);
        Integer count;
        if ((count = transactionRequestCount.get(request)) == null) {
            transactionRequestCount.put(request, 1);
            count = 1;
        } else {
            transactionRequestCount.put(request, ++count);
        }
        if (count == checkPointInputTaskCount) {
            transactionRequestCount.remove(request);
            return true;
        }
        return false;
    }

    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }
  • BaseStatefulBoltExecutor的execute方法首先通过CheckpointSpout.isCheckpoint(input)判断是否是CheckpointSpout发来的tuple,如果是则执行processCheckpoint
  • processCheckpoint首先调用shouldProcessTransaction判断所有输入流的task是否都有给它发送checkpint tuple来决定是否往下处理
  • 如果txid大于lastTxid,则调用handleCheckpoint方法,该方法由子类实现

StatefulBoltExecutor.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java

public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
    //......

    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
        if (action == PREPARE) {
            if (boltInitialized) {
                bolt.prePrepare(txid);
                state.prepareCommit(txid);
                preparedTuples.addAll(collector.ackedTuples());
            } else {
                /*
                 * May be the task restarted in the middle and the state needs be initialized.
                 * Fail fast and trigger recovery.
                 */
                LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
                collector.fail(checkpointTuple);
                return;
            }
        } else if (action == COMMIT) {
            bolt.preCommit(txid);
            state.commit(txid);
            ack(preparedTuples);
        } else if (action == ROLLBACK) {
            bolt.preRollback();
            state.rollback();
            fail(preparedTuples);
            fail(collector.ackedTuples());
        } else if (action == INITSTATE) {
            if (!boltInitialized) {
                bolt.initState((T) state);
                boltInitialized = true;
                LOG.debug("{} pending tuples to process", pendingTuples.size());
                for (Tuple tuple : pendingTuples) {
                    doExecute(tuple);
                }
                pendingTuples.clear();
            } else {
                /*
                 * If a worker crashes, the states of all workers are rolled back and an initState message is sent across
                 * the topology so that crashed workers can initialize their state.
                 * The bolts that have their state already initialized need not be re-initialized.
                 */
                LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}",
                          checkpointTuple, action, txid);
            }
        }
        collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.delegate.ack(checkpointTuple);
    }

    //......
}
  • StatefulBoltExecutor继承了BaseStatefulBoltExecutor,实现了handleCheckpoint方法
  • 该方法根据不同的action进行相应的处理,PREPARE的话,调用bolt的prePrepare,对state调用prepareCommit;COMMIT的话则调用bolt的preCommit,对state调用commit;ROLLBACK的话,调用bolt的preRollback,对state调用rollback;对于INITSTATE,如果bolt未初始化,则调用bolt的initState
  • 根据action执行完之后,继续流转checkpoint tuple,然后调用collector.delegate.ack(checkpointTuple)进行ack

CheckpointTupleForwarder.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java

/**
 * Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology.
 * <p>
 * When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder}
 * so that the checkpoint tuples can flow through the entire topology DAG.
 * </p>
 */
public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
    //......
    /**
     * Forwards the checkpoint tuple downstream.
     *
     * @param checkpointTuple the checkpoint tuple
     * @param action          the action (prepare, commit, rollback or initstate)
     * @param txid            the transaction id.
     */
    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.ack(checkpointTuple);
    }

    //......
}
  • CheckpointTupleForwarder用于包装non-stateful bolts,使得checkpoint tuples得以在整个topology DAG中顺利流转

小结

  • 如果topology有IStatefulBolt的话(IStatefulBolt为bolt提供了存取state的抽象,通过checkpiont机制持久化state并利用ack机制提供at-least once语义),TopologyBuilder会自动添加CheckpointSpout,对于bolt不是StatefulBoltExecutor类型,则会使用CheckpointTupleForwarder进行包装,这样使得checkpint tuple贯穿整个topology的DAG
  • CheckpointSpout在nextTuple方法先判断是否需要recover,在判断是否需要进行checkpoint,都不是的话则sleep一段时间,sleepInterval为checkpointInterval/10,而checkpointInterval最小为100,从Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL配置读取,默认是1000;注意该值并不是意味着每隔checkpointInterval就进行checkpoint检测,也就是说不是fixedRate效果而是fixedDelay的效果,即如果当前checkpoint还没有结束,是不会再重复进行checkpoint检测的
  • recover及checkpoint都会往CHECKPOINT_STREAM_ID发送tuple;BaseStatefulBoltExecutor则在execute方法封装了对checkpoint tuple的处理,非checkpint tuple则通过抽象方法handleTuple由子类去实现;具体的handleCheckpoint方法由子类实现,BaseStatefulBoltExecutor只是对其进行前提判断,要求收到所有输入流的task发来的checkpoint tuple,且txid >= lastTxid才可以执行handleCheckpoint操作
  • StatefulBoltExecutor继承了BaseStatefulBoltExecutor,实现了handleCheckpoint方法,对PREPARE、COMMIT、ROLLBACK、INITSTATE这几个action(类似three phase commit protocol)进行相应处理,然后继续流转checkpoint tuple,并进行ack
  • CheckpointSpout在发送checkpoint tuple的时候,使用txid作为msgId来发送可靠的tuple,在所有checkpoint tuple在整个topology的DAG都被ack之后,会收到ack,然后调用checkpointState.commit提交整个checkpoint;如果是fail的话则重置相关状态;一般情况下Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms,默认1000,即1秒)值小于Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,默认30秒);如果checkpointInterval设置得太大,中间假设worker crash了恢复后的state就不太实时,这样就失去了checkpoint的意义了。

doc

相关推荐