共计 6996 个字符,预计需要花费 18 分钟才能阅读完成。
storm 保证从 spout 发出的每个 tuple 都会被完全处理。这篇文章介绍 storm 是怎么做到这个保证的,以及我们使用者怎么做才能充分利用 storm 的可靠性特点。
一个 tuple 被”完全处理”是什么意思?
就如同蝴蝶效应一样,从 spout 发射的一个 tuple 可以引起其它成千上万个 tuple 因它而产生,想想那个计算一篇文章中每个单词出现次数的 topology.
1 2 3 4 5 6 7 8 9 | TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( 1 , new KestrelSpout( "kestrel.backtype.com" ,
22133 ,
"sentence_queue" ,
new StringScheme())); builder.setBolt( 2 , new SplitSentence(), 10 )
.shuffleGrouping( 1 ); builder.setBolt( 3 , new WordCount(), 20 )
.fieldsGrouping( 2 , new Fields( "word" )); |
这个 topology 从一个 Kestrel 队列读取句子,把每个句子分割成一个个单词,然后发射这一个个单词:一个源 tuple(一个句子)引起后面很多 tuple 的产生(一个个单词),这个消息流大概是这样的:

统计单词出现次数的 tuple 树
在 storm 里面 一个 tuple 被完全处理的意思是:这个 tuple 以及由这个 tuple 所导致的所有的 tuple 都被成功处理。而一个 tuple 会被认为处理失败了如果这个消息在 timeout 所指定的时间内没有成功处理。 而这个 timetout 可以通过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定。
如果一个消息处理成功了或者失败了会发生什么?
FYI。下面这个是 spout 要实现的接口:
1 2 3 4 5 6 7 8 | public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context,
SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId); } |
首先 storm 通过调用 spout 的 nextTuple 方法来获取下一个 tuple, Spout 通过 open 方法参数里面提供的 SpoutOutputCollector 来发射新 tuple 到它的其中一个输出消息流, 发射 tuple 的时候 spout 会提供一个 message-id, 后面我们通过这个 message-id 来追踪这个 tuple。举例来说,KestrelSpout 从 kestrel 队列里面读取一个消息,并且把 kestrel 提供的消息 id 作为 message-id, 看例子:
1 | _collector.emit( new Values( "field1" , "field2" , 3 ),msgId); |
接下来,这个发射的 tuple 被传送到消息处理者 bolt 那里,storm 会跟踪由此所产生的这课 tuple 树。如果 storm 检测到一个 tuple 被完全处理了,那么 storm 会以最开始的那个 message-id 作为参数去调用消息源的 ack 方法;反之 storm 会调用 spout 的 fail 方法。值得注意的一点是,storm 调用 ack 或者 fail 的 task 始终是产生这个 tuple 的那个 task。所以如果一个 spout 被分成很多个 task 来执行,消息执行的成功失败与否始终会通知最开始发出 tuple 的那个 task。
我们再以 KestrelSpout 为例来看看 spout 需要做些什么才能保证“一个消息始终被完全处理”, 当 KestrelSpout 从 Kestrel 里面读出一条消息,首先它“打开”这条消息,这意味着这条消息还在 kestrel 队列里面,不过这条消息会被标示成“处理中”直到 ack 或者 fail 被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个 spout“断线”了,那么所有处于“处理中”状态的消息会被重新标示成“等待处理”.
Storm 的可靠性 API
作为 storm 的使用者,有两件事情要做以更好的利用 storm 的可靠性特征。首先,在你生成一个新的 tuple 的时候要通知 storm; 其次,完成处理一个 tuple 之后要通知 storm。这样 storm 就可以检测整个 tuple 树有没有完成处理,并且通知源 spout 处理结果。storm 提供了一些简洁的 api 来做这些事情。
由一个 tuple 产生一个新的 tuple 称为:anchoring。你发射一个新 tuple 的同时也就完成了一次 anchoring。看下面这个例子:这个 bolt 把一个包含一个句子的 tuple 分割成每个单词一个 tuple。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class SplitSentence implements IRichBolt {
OutputCollector _collector;
public void prepare(Map conf,
TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString( 0 );
for (String word: sentence.split( "" )) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" ));
}
} |
看一下这个 execute 方法,emit 的第一个参数是输入 tuple,第二个参数则是输出 tuple,这其实就是通过输入 tuple anchoring 了一个新的输出 tuple。因为这个“单词 tuple”被 anchoring 在“句子 tuple”一起,如果其中一个单词处理出错,那么这整个句子会被重新处理。作为对比,我们看看如果通过下面这行代码来发射一个新的 tuple 的话会有什么结果。
1 | _collector.emit( new Values(word)); |
用这种方法发射会导致新发射的这个 tuple 脱离原来的 tuple 树(unanchoring), 如果这个 tuple 处理失败了,整个句子不会被重新处理。到底要 anchoring 还是要 unanchoring 则完全取决于你的业务需求。
一个输出 tuple 可以被 anchoring 到多个输入 tuple。这种方式在 stream 合并或者 stream 聚合的时候很有用。一个多入口 tuple 处理失败的话,那么它对应的所有输入 tuple 都要重新执行。看看下面演示怎么指定多个输入 tuple:
1 2 3 4 | List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values( 1 , 2 , 3 )); |
多入口 tuple 把这个新 tuple 加到了多个 tuple 树里面去了。
我们通过 anchoring 来构造这个 tuple 树,最后一件要做的事情是在你处理完当个 tuple 的时候告诉 storm, 通过 OutputCollector 类的 ack 和 fail 方法来做,如果你回过头来看看 SplitSentence
的例子,你可以看到“句子 tuple”在所有“单词 tuple”被发出之后调用了 ack。
你可以调用OutputCollector
的 fail 方法去立即将从消息源头发出的那个 tuple 标记为 fail,比如你查询了数据库,发现一个错误,你可以马上 fail 那个输入 tuple,这样可以让这个 tuple 被快速的重新处理,因为你不需要等那个 timeout 时间来让它自动 fail。
每个你处理的 tuple,必须被 ack 或者 fail。因为 storm 追踪每个 tuple 要占用内存。所以如果你不 ack/fail 每一个 tuple,那么最终你会看到 OutOfMemory 错误。
大多数 Bolt 遵循这样的规律:读取一个 tuple;发射一些新的 tuple;在 execute 的结束的时候 ack 这个 tuple。这些 Bolt 往往是一些过滤器或者简单函数。Storm 为这类规律封装了一个 BasicBolt 类。如果用 BasicBolt 来做,上面那个 SplitSentence 可以改写成这样:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 | public class SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
}
public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString( 0 );
for (String word: sentence.split( "" )) {
collector.emit( new Values(word));
}
}
public void cleanup() {
}
public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word" ));
}
} |
这个实现比之前的实现简单多了,但是功能上是一样的。发送到 BasicOutputCollector 的 tuple 会自动和输入 tuple 相关联,而在 execute 方法结束的时候那个输入 tuple 会被自动 ack 的。
作为对比,处理聚合和合并的 bolt 往往要处理一大堆的 tuple 之后才能被 ack,而这类 tuple 通常都是多输入的 tuple,所以这个已经不是 IBasicBolt 可以罩得住的了。
storm 是怎么实现高效率的可靠性的?
storm 里面有一类特殊的 task 称为:acker,他们负责跟踪 spout 发出的每一个 tuple 的 tuple 树。当 acker 发现一个 tuple 树已经处理完成了。它会发送一个消息给产生这个 tuple 的那个 task。你可以通过 Config.TOPOLOGY_ACKERS 来设置一个 topology 里面的 acker 的数量,默认值是一。如果你的 topology 里面的 tuple 比较多的话,那么把 acker 的数量设置多一点,效率会高一点。
理解 storm 的可靠性的最好的方法是来看看 tuple 和 tuple 树的生命周期,当一个 tuple 被创建,不管是 spout 还是 bolt 创建的,它会被赋予一个 64 位的 id,而 acker 就是利用这个 id 去跟踪所有的 tuple 的。每个 tuple 知道它的祖宗的 id(从 spout 发出来的那个 tuple 的 id), 每当你新发射一个 tuple,它的祖宗 id 都会传给这个新的 tuple。所以当一个 tuple 被 ack 的时候,它会发一个消息给 acker,告诉它这个 tuple 树发生了怎么样的变化。具体来说就是:它告诉 acker:我呢已经完成了,我有这些儿子 tuple, 你跟踪一下他们吧。下面这个图演示了 C 被 ack 了之后,这个 tuple 树所发生的变化。

tuple ack 示例
关于 storm 怎么跟踪 tuple 还有一些细节,前面已经提到过了,你可以自己设定你的 topology 里面有多少个 acker。而这又给我们带来一个问题,当一个 tuple 需要 ack 的时候,它到底选择哪个 acker 来发送这个信息呢?
storm 使用一致性哈希来把一个 spout-tuple-id 对应到 acker,因为每一个 tuple 知道它所有的祖宗的 tuple-id,所以它自然可以算出要通知哪个 acker 来 ack。(这里所有的祖宗是指这个 tuple 所对应的所有的根 tuple。这里注意因为一个 tuple 可能存在于多个 tuple 树,所以才有所有一说)。
storm 的另一个细节是 acker 是怎么知道每一个 spout tuple 应该交给哪个 task 来处理。当一个 spout 发射一个新的 tuple,它会简单的发一个消息给一个合适的 acker,并且告诉 acker 它自己的 id(taskid),这样 storm 就有了 taskid-tupleid 的对应关系。当 acker 发现一个树完成处理了,它知道给哪个 task 发送成功的消息。
acker task 并不显式的跟踪 tuple 树。对于那些有成千上万个节点的 tuple 树,把这么多的 tuple 信息都跟踪起来会耗费太多的内存。相反,acker 用了一种不同的方式,使得对于每个 spout tuple 所需要的内存量是恒定的(20 bytes) . 这个跟踪算法是 storm 如何工作的关键,并且也是它的主要突破。
一个 acker task 存储了一个 spout-tuple-id 到一对值的一个 mapping。这个对子的第一个值是创建这个 tuple 的 taskid,这个是用来在完成处理 tuple 的时候发送消息用的。第二个值是一个 64 位的数字称作:”ack val”, ack val 是整个 tuple 树的状态的一个表示,不管这棵树多大。它只是简单地把这棵树上的所有创建的 tupleid/ack 的 tupleid 一起异或(XOR)。
当一个 acker task 发现一个 ack val 变成 0 了,它知道这棵树已经处理完成了。因为 tupleid 是随机的 64 位数字,所以,ack val 碰巧变成 0(而不是因为所有创建的 tuple 都完成了)的几率极小。算一下就知道了,就算每秒发生 10000 个 ack,那么需要 50000000 万年才可能碰到一个错误。而且就算碰到了一个错误,也只有在这个 tuple 失败的时候才会造成数据丢失。关于 Acker 的详细工作流程的分析可以看看这篇文章: Twitter Storm 源代码分析之 acker��作流程。
既然你已经理解了 storm 的可靠性算法,让我们一起过一遍所有可能的失败场景,并看看 storm 在每种情况下是怎么避免数据丢失的。
1. 由于对应的 task 挂掉了,一个 tuple 没有被 ack: storm 的超时机制在超时之后会把这个 tuple 标记为失败,从而可以重新处理。
2. Acker 挂掉了: 这种情况下由这个 acker 所跟踪的所有 spout tuple 都会超时,也就会被重新处理。
3. Spout 挂掉了: 在这种情况下给 spout 发送消息的消息源负责重新发送这些消息。比如 Kestrel 和 RabbitMQ 在一个客户端断开之后会把所有”处理中“的消息放回队列。
就像你看到的那样,storm 的可靠性机制是完全分布式的,可伸缩的并且是高度容错的。
调整可靠性 (Tuning Reliability)
acker task 是非常轻量级的,所以一个 topology 里面不需要很多 acker。你可以通过 Strom UI(id: -1)来跟踪它的性能。如果它的吞吐量看起来不正常,那么你就需要多加点 acker 了。
如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据,那么你可以通过不跟踪这些 tuple 树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半,因为对于每一个 tuple 都要发送一个 ack 消息。并且它需要更少的 id 来保存下游的 tuple,减少带宽占用。
有三种方法可以去掉可靠性。第一是把 Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下,storm 会在 spout 发射一个 tuple 之后马上调用 spout 的 ack 方法。也就是说这个 tuple 树不会被跟踪。
第二个方法是在 tuple 层面去掉可靠性。你可以在发射 tuple 的时候不指定 messageid 来达到不跟粽某个特定的 spout tuple 的目的。
最后一个方法是如果你对于一个 tuple 树里面的某一部分到底成不成功不是很关心,那么可以在发射这些 tuple 的时候 unanchor 它们。这样这些 tuple 就不在 tuple 树里面,也就不会被跟踪了。
推荐阅读:
Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm
安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm
Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm
Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm
