storm的ack和fail

为了保证数据能正确的被处理,对于spout产生的每一个tuple,storm都会进行跟踪,这里面涉及到ack/fail的处理,如果一个tuple处理成功,会调用spout的ack方法,如果失败,会调用fail方法.而在处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功.为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系,我调试跟踪了一下storm代码.

IBasicBolt实现类不关心ack/fail,spout的ack/fail完全由后面的bolt的ack/fail来决定.其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用.相当于忽略了该bolt的ack/fail行为.所以IBasicBolt用来做filter或者简单的计算比较合适.

可以参考BasicBoltExecutor代码里面的实现就可以明白了:

public void execute(Tuple input) {
        _collector.setContext(input);
        try {
            _bolt.execute(input, _collector);
            _collector.getOutputter().ack(input);
        } catch(FailedException e) {
            LOG.warn("Failed to process tuple", e);
            _collector.getOutputter().fail(input);
        }
    }

在IRichBolt实现类中,如果OutputCollector.emit(oldTuple,newTuple)这样调用来发射tuple(在storm中称之为anchoring),那么后面的bolt的ack/fail会影响spout的ack/fail,如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring),则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail.所以某个bolt后面的bolt的成功失败对你来说不关心,你可以直接通过这种方式来忽略

中间的某个boltfail了,不会影响后面的bolt执行,但是会立即触发spout的fail.相当于短路了,后面bolt虽然也执行了,但是ack/fail对spout已经无意义了.也就是说,只要bolt集合中的任何一个fail了,会立即触发spout的fail方法.而ack方法需要所有的bolt调用为ack才能触发.

另外一点,storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况,对于成功/失败该如何处理,必须由应用自己来决定,因为storm内部也没有保存失败的具体数据,但是也有办法知道失败记录,因为spout的ack/fail方法会附带一个msgId对象,我们可以在最初发射tuple的时候将将msgId设置为tuple,然后在ack/fail中对该tuple进行处理.

这里有个问题,就是每个bolt执行完之后要显式的调用ack/fail,否则会出现tuple不释放导致oom.不知道storm在最初设计的时候,为什么不将bolt的ack设置为默认调用

参考文档:https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

相关推荐