Storm中文官方文档翻译计划(2) ——消息处理保证

Storm中文官方文档翻译计划(2)

——消息处理保证通

Strom保证来自spout的每一个消息都会被完全处理。本文描述Storm是如何做到这个保证的,以及作为用户需要干些什么从而受益于Storm的可靠性能力。

啥玩意(Whatdoesitmeanforamessagetobe"fullyprocessed")

来自于spout的元组可以触发基于该元组的成千上万的元组被创建。例如,考虑一下,单词计数拓扑:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
                                               22133,
                                               "sentence_queue",
                                               new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

这个拓扑从Kestrel队列读取句子,将其拆分成逐个单词,然后发射每个单词之前遇到的次数。来自spout的元组触发了许多基于该元组的元组被创建:句子中的每一个单词有一个元组,每个单词的计数也有一个元组。消息树看起来大概是这样的:

当元组树(tupletree)耗尽或者树中的每一个消息都被处理了时,来自于spout的元组则被Storm认为完全处理了。当树中的消息在指定的超时时间内没有被完全处理时,则这个元组被认为处理失败。这个超时时间可以为特定的拓扑进行设置,使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置选项即可,默认为30秒。

背后的故事(Whathappensifamessageisfullyprocessedorfailstobefullyprocessed?)

为了理解这个问题,让我们来瞧瞧来自于spout的元组的生命周期。例如,下面是spout需要实现的接口(详细信息请看Javadoc):

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,Storm通过调用Spout的nextTuple方法来从spout中请求一个元组。spout使用open方法中提供的SpoutOutputCollector来发射一个元组到其中某个输出流。当发射一个元组时,spout提供了一个消息id,用于在之后标识这个元组。例如,KestrelSpout从kestrel队列中读取消息,并且将Kestrel提供的id作为消息id来发射消息。发射一个消息到SpoutOutputCollector看起来像这样:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下来,元组被发往消费bolt,Storm负责追踪创建的消息树。如果Storm检测到元组被完全处理了,Storm就会使用spout提供给Storm的消息id,在源头spout上调用ack方法。同样地,如果元组超时了,Storm就会在spout上调用fail方法。注意,元组只会被创建它的同一个spout任务确认(acked)或失败(failed)。所以,如果spout作为很多任务在集群上执行时,元组不会被不同的任务确认或失败(这里的失败是动词,下同,译者注),而只能是创建它的那个。

让我们继续使用KestrelSpout这个例子,来看看spout需要做什么来保证消息处理。当KestrelSpout从Kestrel队列中取得一个消息,就”打开”了这个消息。这意思就是说,消息还没有真正从队列中取出,而是处于”等待(pending)”状态,等待消息被处理的确认信息。当处于等待状态时,消息不会被发送给其他的队列消费者。另外,如果客户端断开了连接,为这个客户端等待的所有消息将会被重新放入队列。当消息被打开了,Kestrel为客户端提供了消息的数据以及消息的唯一id。KestrelSpout使用这个唯一id作为发送元组到SpoutOutputCollector的消息id。之后的某个时间,当KestrelSpout上的ack或者fail被调用时,KestrelSpout发送一个确认或失败消息给Kestrel,使用这个消息id来从队列中取出这个消息或者将其放回队列。

何方神圣(WhatisStorm'sreliabilityAPI)

作为用户,你可以干两件事情来从Storm的可靠性能力中获益。第一,每当你在元组树中创建一个新的链接时,你需告知Storm。第二,当你完成处理某个元组时,也需要告知Storm。通过干这两件事情,Storm就能够检测到元组树什么时候被完全处理,能够准确地对spout的元组进行确认或者失败。StormAPI提供了一种简单的方式来做这两件事情。

在元组树中指定一个链接被称为锚定(anchoring)。锚定在你发射一个新的元组时同时完成了。让我们使用下面的bolt作为例子。这个bolt拆解一个包含句子的元组为逐个单词的元组:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        } 

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

每一个单词元组通过指定输入元组为emit方法的第一参数而被锚定。因为单词元组被锚定了,如果单词元组在往下游执行的过程中失败,处于树根的spout元组会被稍后重放(replayed)。为了对比,我们一起看看,如果单词元组像这样发射会发生什么:

_collector.emit(newValues(word));

按照这样的方式发射单词元组将导致其没有被锚定。如果元组在往下游的处理过程中失败,根元组不会被重放。取决于在你的拓扑中你想要的容错保证,但通常发射未被锚定的元组是合适的。

一个输出元组可以被锚定到多个输入元组。这在进行流连接和流聚合时非常有用。一个处理失败的多锚定(multi-anchored)元组会导致spout的多个元组被重放。多锚定通过指定一个元组列表而不是单个元组来实现。例如:

List<Tuple>anchors=newArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit(anchors,newValues(1,2,3));

多锚定将输出元组添加到多个元组树。注意,也有可能多锚定打破树结构而创建了元组的有向无环图(DAG),像这样:

Storm的实现既可以处理有向无环图,也可以处理树(之前的版本只能处理树,但元组树这个名字就这样固定下来了)。

锚定就是如何指定元组树。Storm可靠性API的下一部分,也就是最后一部分(功能),就是指定你什么时候完成了处理元组树中的一个元组。这是通过使用OutputCollector的ack和fail方法实现的。如果你回过头来看看SplitSentence这个例子,你会看到在所有的单词元组被发射之后输入元组被确认了。

你可以使用OutputCollector的fail方法来立即失败处于元组树根部的spout元组。例如,你的应用程序可以选择捕获一个数据库客户端的异常,然后显式地失败这个输入元组。通过显式失败元组,相比于你等待这个元组超时,spout元组会被更快的重放。

你处理的每一个元组必须被确认或者失败。Storm使用内存来追踪每一个元组,所以如果你不确认或者失败每一个元组,任务最终会耗尽内存。

许多bolt都遵循着相同的模式来读取输入元组,基于它发射元组,然后在execute方法的结尾确认这个元组。这些bolts按照过滤器和简单功能进行分类。Storm有一个BasicBolt的接口为你封装了这个模式。SplitSentence的例子可以作为一个BasicBolt写成下面这样:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

个实现比之前的实现更简单,并且在语义上是等价的。元组被发射到BasicOutputCollector自动被锚定到输入元组了,当execute方法完成时,输入元组也为你自动确认了。

作为对比,执行聚合和连接的bolt可能会延迟确认元组,直到其计算出一批元组的结果出来以后。聚合和连接也通常多锚定其输出流。这些事情超出了IbasicBolt的简单模式范畴。

肿么办(HowdoImakemyapplicationsworkcorrectlygiventhattuplescanbereplayed)

在软件设计领域,答案永远是“具体问题具体分析(itdepends)”。Storm0.7.0引入了”事务性拓扑”特性,使得你可以在大多数计算中获取精确一次消息送达的语义的完全容错能力。请持续关注本系列后续文章(译者注)。

真奇妙(HowdoesStormimplementreliabilityinanefficientway)

一个Storm拓扑有一个特殊的”acker”任务集合,为每一个spout元组追踪元组的有向无环图。当一个acker发现一个DAG完成了,就会发送一个消息到创建这个spout元组的spout任务来确认这个消息。你可以在拓扑的配置中使用Config.TOPOLOGY_ACKERS选项来设置拓扑的acker任务数量。Storm默认Config.TOPOLOGY_ACKERS为一个任务。你需要为处理大量消息的拓扑增加这个值。

理解Storm可靠性实现的最佳途径是关注元组和元组有向无环图的生命周期。当拓扑中一个元组被创建,无论是在spout中还是在bolt中,都会被赋予一个随机的64位id。这个id被acker用来为每一个spout元组追踪其有向无环图。

每一个元组知道其所在的元组树中的全部spout元组的id。当你在bolt中发射一个新的元组时,spout元组的id就会从元组的锚点中被拷贝到这个新的元组中。当一个元组被确认,就会发送一个消息给合适的acker任务,这个任务知道元组树是如何变化的相关信息。具体来说,就是告诉acker”我已经在树中完成了这个spout元组,这个是树中被锚定给我的新元组”。

例如,如果元组D和E是基于元组C被创建,下面就是当C被确认时元组树时如何变化的:

因为在C被从树中删除的同时,D和E被添加进去,所以树不会很快地结束。

还有一个关于Storm如何追踪元组树的更多细节。正如已经提到的,你可以拥有任意数量的acker任务在拓扑中。那么问题来了:当一个元组在拓扑中被确认,它是如何知道应该发送信息给哪一个acker任务呢?

Storm使用模哈希(modhashing)来映射spout元组id到acker任务。因为每个元组都携带着其所在的所有树中的spout元组id(一个元组可能存在于多个树中,译者注),它们知道应该与哪一个acker任务进行通信。

Storm的另一个细节是,acker任务是如何知道每一个spout元组该由哪一个acker进行追踪。当一个spout任务发射一个新元组,它只是发送消息给一个合适的acker,来告诉它这个spout元组的任务id。然后当一个acker发现一个树完成了,就知道发送完成消息给哪一个任务id。

acker任务并不显式完全的追踪元组树。在一个拥有成千上万(或者更多)个节点的大型元组树中,追踪所有的元组树可能会超过acker内存上限。取而代之的是,acker采取了一种不同的策略,每个spout元组只需要固定数量的空间(大约20字节)。这个追踪算法是Storm运转的关键,也是其主要的突破之一。

一个acker任务存储了从一个spout元组id到一对值的映射。第一个值是创建这个spout元组的任务id,后面用于发送完成消息。第二个值是一个64位的数字,被称为”ackval”。ackval是整个元组树的状态表示,无论这个树有多大或者多小。它只是简单地对树中所有创建的或者/以及确认的元组id进行异或。

当一个acker任务发现一个ackval变成了0,它就知道元组树完成了。因为元组id是64位的随机数字,所以ackval碰巧变成0的可能性是极低的。如果你用数学算一算,以每秒10K次确认的频率,也需要50,000,000年才能遇到一个错误。即便如此,也只是在拓扑中的元组碰巧失败的情况下才会导致数据丢失。

现在,你理解了可靠性算法,让我们一起来过一下所有可能失败的情形,看看每种情况下Storm是如何避免数据丢失的:

  • 由于任务死掉了导致元组没有被确认:在这种情况下,处于失败元祖所在树的根部的spout元组会超时并被重放。
  • acker任务死掉:在这种情况下,这个acker追踪的所有spout元组会超时并被重放。
  • spout任务死掉:在这种情况下,与spout对话的源头来负责重放消息。例如,像Kestrel和RabbitMQ这样的队列会在客户端断开时将所有顶戴的消息重新防火到队列中。

正如你所见,Storm的可靠性机制完全是分布式的,可伸缩的和容错的。

可靠性调优(Tuningreliability)

acker任务是轻量级的,所以你在拓扑中不需要太多。你可以通过StormUI(组件id是__acker)来追踪其性能。如果吞吐量看起来不正常,不就需要添加更多的acker任务。

如果可靠性对你来说并不重要——也就是说你不在意在失败情形下的元组丢失——那么你可以通过不追踪spout元组的元组树来改善性能。不追踪元组树将会减少一半的消息传输量,因为正常情况下,元组树中的每一个元组都对应有一个确认消息。另外,下游的元组也只需要保存更少的id从而节省了带宽使用。

有三种移除可靠性的方式。第一种是设置Config.TOPOLOGY_ACKERS为0。这种方式下,Storm会在spout发射一个元组后立即在spout上调用ack方法。

第二种移除可靠性的方式是通过消息本身。你可以通过在SpoutOutputCollector.emit方法中忽略消息id来关闭对某个spout元组的追踪。

最后,如果你对下游的部分元组处理失败不是很在意,你可以作为非锚定的元组来发射它们。因为它们没有被锚定到任何spout元组,所以如果它们没有被确认也不会导致任何spout元组失败。

===============================================================================

大家好,我是阮威。华中科技大学,计算机软件专业硕士。毕业后加入腾讯,先后在腾讯电子商务部和无线游戏产品部工作,现供职于欢聚时代负责基础产品相关工作。IT男,至今。欢迎大家收听我的公众账号"技术与人生"。