阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Twitter Storm如何保证消息不丢失

172次阅读
没有评论

共计 6996 个字符,预计需要花费 18 分钟才能阅读完成。

storm 保证从 spout 发出的每个 tuple 都会被完全处理。这篇文章介绍 storm 是怎么做到这个保证的,以及我们使用者怎么做才能充分利用 storm 的可靠性特点。

一个 tuple 被”完全处理”是什么意思?

就如同蝴蝶效应一样,从 spout 发射的一个 tuple 可以引起其它成千上万个 tuple 因它而产生,想想那个计算一篇文章中每个单词出现次数的 topology.

1
2
3
4
5
6
7
8
9
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
.fieldsGrouping(2, new Fields("word"));

这个 topology 从一个 Kestrel 队列读取句子,把每个句子分割成一个个单词,然后发射这一个个单词:一个源 tuple(一个句子)引起后面很多 tuple 的产生(一个个单词),这个消息流大概是这样的:

Twitter Storm 如何保证消息不丢失

统计单词出现次数的 tuple 树

在 storm 里面 一个 tuple 被完全处理的意思是:这个 tuple 以及由这个 tuple 所导致的所有的 tuple 都被成功处理。而一个 tuple 会被认为处理失败了如果这个消息在 timeout 所指定的时间内没有成功处理。 而这个 timetout 可以通过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定。

 

如果一个消息处理成功了或者失败了会发生什么?

FYI。下面这个是 spout 要实现的接口:

1
2
3
4
5
6
7
8
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context,
SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}

首先 storm 通过调用 spout 的 nextTuple 方法来获取下一个 tuple, Spout 通过 open 方法参数里面提供的 SpoutOutputCollector 来发射新 tuple 到它的其中一个输出消息流, 发射 tuple 的时候 spout 会提供一个 message-id, 后面我们通过这个 message-id 来追踪这个 tuple。举例来说,KestrelSpout 从 kestrel 队列里面读取一个消息,并且把 kestrel 提供的消息 id 作为 message-id, 看例子:

1
_collector.emit(new Values("field1","field2", 3),msgId);

接下来,这个发射的 tuple 被传送到消息处理者 bolt 那里,storm 会跟踪由此所产生的这课 tuple 树。如果 storm 检测到一个 tuple 被完全处理了,那么 storm 会以最开始的那个 message-id 作为参数去调用消息源的 ack 方法;反之 storm 会调用 spout 的 fail 方法。值得注意的一点是,storm 调用 ack 或者 fail 的 task 始终是产生这个 tuple 的那个 task。所以如果一个 spout 被分成很多个 task 来执行,消息执行的成功失败与否始终会通知最开始发出 tuple 的那个 task。

我们再以 KestrelSpout 为例来看看 spout 需要做些什么才能保证“一个消息始终被完全处理”, 当 KestrelSpout 从 Kestrel 里面读出一条消息,首先它“打开”这条消息,这意味着这条消息还在 kestrel 队列里面,不过这条消息会被标示成“处理中”直到 ack 或者 fail 被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个 spout“断线”了,那么所有处于“处理中”状态的消息会被重新标示成“等待处理”.

Storm 的可靠性 API

作为 storm 的使用者,有两件事情要做以更好的利用 storm 的可靠性特征。首先,在你生成一个新的 tuple 的时候要通知 storm; 其次,完成处理一个 tuple 之后要通知 storm。这样 storm 就可以检测整个 tuple 树有没有完成处理,并且通知源 spout 处理结果。storm 提供了一些简洁的 api 来做这些事情。

由一个 tuple 产生一个新的 tuple 称为:anchoring。你发射一个新 tuple 的同时也就完成了一次 anchoring。看下面这个例子:这个 bolt 把一个包含一个句子的 tuple 分割成每个单词一个 tuple。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SplitSentence implements IRichBolt {
OutputCollector _collector;
 
public void prepare(Map conf,
TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
 
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split("")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
 
public void cleanup() {
}
 
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

看一下这个 execute 方法,emit 的第一个参数是输入 tuple,第二个参数则是输出 tuple,这其实就是通过输入 tuple anchoring 了一个新的输出 tuple。因为这个“单词 tuple”被 anchoring 在“句子 tuple”一起,如果其中一个单词处理出错,那么这整个句子会被重新处理。作为对比,我们看看如果通过下面这行代码来发射一个新的 tuple 的话会有什么结果。

1
_collector.emit(new Values(word));

用这种方法发射会导致新发射的这个 tuple 脱离原来的 tuple 树(unanchoring), 如果这个 tuple 处理失败了,整个句子不会被重新处理。到底要 anchoring 还是要 unanchoring 则完全取决于你的业务需求。

一个输出 tuple 可以被 anchoring 到多个输入 tuple。这种方式在 stream 合并或者 stream 聚合的时候很有用。一个多入口 tuple 处理失败的话,那么它对应的所有输入 tuple 都要重新执行。看看下面演示怎么指定多个输入 tuple:

1
2
3
4
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多入口 tuple 把这个新 tuple 加到了多个 tuple 树里面去了。

我们通过 anchoring 来构造这个 tuple 树,最后一件要做的事情是在你处理完当个 tuple 的时候告诉 storm, 通过 OutputCollector 类的 ack 和 fail 方法来做,如果你回过头来看看 SplitSentence 的例子,你可以看到“句子 tuple”在所有“单词 tuple”被发出之后调用了 ack。

你可以调用OutputCollector 的 fail 方法去立即将从消息源头发出的那个 tuple 标记为 fail,比如你查询了数据库,发现一个错误,你可以马上 fail 那个输入 tuple,这样可以让这个 tuple 被快速的重新处理,因为你不需要等那个 timeout 时间来让它自动 fail。

每个你处理的 tuple,必须被 ack 或者 fail。因为 storm 追踪每个 tuple 要占用内存。所以如果你不 ack/fail 每一个 tuple,那么最终你会看到 OutOfMemory 错误。

大多数 Bolt 遵循这样的规律:读取一个 tuple;发射一些新的 tuple;在 execute 的结束的时候 ack 这个 tuple。这些 Bolt 往往是一些过滤器或者简单函数。Storm 为这类规律封装了一个 BasicBolt 类。如果用 BasicBolt 来做,上面那个 SplitSentence 可以改写成这样:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
public class SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
}
 
public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split("")) {
collector.emit(new Values(word));
}
}
 
public void cleanup() {
}
 
public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

这个实现比之前的实现简单多了,但是功能上是一样的。发送到 BasicOutputCollector 的 tuple 会自动和输入 tuple 相关联,而在 execute 方法结束的时候那个输入 tuple 会被自动 ack 的。
作为对比,处理聚合和合并的 bolt 往往要处理一大堆的 tuple 之后才能被 ack,而这类 tuple 通常都是多输入的 tuple,所以这个已经不是 IBasicBolt 可以罩得住的了。

storm 是怎么实现高效率的可靠性的?

storm 里面有一类特殊的 task 称为:acker,他们负责跟踪 spout 发出的每一个 tuple 的 tuple 树。当 acker 发现一个 tuple 树已经处理完成了。它会发送一个消息给产生这个 tuple 的那个 task。你可以通过 Config.TOPOLOGY_ACKERS 来设置一个 topology 里面的 acker 的数量,默认值是一。如果你的 topology 里面的 tuple 比较多的话,那么把 acker 的数量设置多一点,效率会高一点。

理解 storm 的可靠性的最好的方法是来看看 tuple 和 tuple 树的生命周期,当一个 tuple 被创建,不管是 spout 还是 bolt 创建的,它会被赋予一个 64 位的 id,而 acker 就是利用这个 id 去跟踪所有的 tuple 的。每个 tuple 知道它的祖宗的 id(从 spout 发出来的那个 tuple 的 id), 每当你新发射一个 tuple,它的祖宗 id 都会传给这个新的 tuple。所以当一个 tuple 被 ack 的时候,它会发一个消息给 acker,告诉它这个 tuple 树发生了怎么样的变化。具体来说就是:它告诉 acker:我呢已经完成了,我有这些儿子 tuple, 你跟踪一下他们吧。下面这个图演示了 C 被 ack 了之后,这个 tuple 树所发生的变化。

Twitter Storm 如何保证消息不丢失

tuple ack 示例

关于 storm 怎么跟踪 tuple 还有一些细节,前面已经提到过了,你可以自己设定你的 topology 里面有多少个 acker。而这又给我们带来一个问题,当一个 tuple 需要 ack 的时候,它到底选择哪个 acker 来发送这个信息呢?

storm 使用一致性哈希来把一个 spout-tuple-id 对应到 acker,因为每一个 tuple 知道它所有的祖宗的 tuple-id,所以它自然可以算出要通知哪个 acker 来 ack。(这里所有的祖宗是指这个 tuple 所对应的所有的根 tuple。这里注意因为一个 tuple 可能存在于多个 tuple 树,所以才有所有一说)。

storm 的另一个细节是 acker 是怎么知道每一个 spout tuple 应该交给哪个 task 来处理。当一个 spout 发射一个新的 tuple,它会简单的发一个消息给一个合适的 acker,并且告诉 acker 它自己的 id(taskid),这样 storm 就有了 taskid-tupleid 的对应关系。当 acker 发现一个树完成处理了,它知道给哪个 task 发送成功的消息。

acker task 并不显式的跟踪 tuple 树。对于那些有成千上万个节点的 tuple 树,把这么多的 tuple 信息都跟踪起来会耗费太多的内存。相反,acker 用了一种不同的方式,使得对于每个 spout tuple 所需要的内存量是恒定的(20 bytes) . 这个跟踪算法是 storm 如何工作的关键,并且也是它的主要突破。

一个 acker task 存储了一个 spout-tuple-id 到一对值的一个 mapping。这个对子的第一个值是创建这个 tuple 的 taskid,这个是用来在完成处理 tuple 的时候发送消息用的。第二个值是一个 64 位的数字称作:”ack val”, ack val 是整个 tuple 树的状态的一个表示,不管这棵树多大。它只是简单地把这棵树上的所有创建的 tupleid/ack 的 tupleid 一起异或(XOR)。

当一个 acker task 发现一个 ack val 变成 0 了,它知道这棵树已经处理完成了。因为 tupleid 是随机的 64 位数字,所以,ack val 碰巧变成 0(而不是因为所有创建的 tuple 都完成了)的几率极小。算一下就知道了,就算每秒发生 10000 个 ack,那么需要 50000000 万年才可能碰到一个错误。而且就算碰到了一个错误,也只有在这个 tuple 失败的时候才会造成数据丢失。关于 Acker 的详细工作流程的分析可以看看这篇文章: Twitter Storm 源代码分析之 acker��作流程。

既然你已经理解了 storm 的可靠性算法,让我们一起过一遍所有可能的失败场景,并看看 storm 在每种情况下是怎么避免数据丢失的。

1. 由于对应的 task 挂掉了,一个 tuple 没有被 ack: storm 的超时机制在超时之后会把这个 tuple 标记为失败,从而可以重新处理。

2. Acker 挂掉了: 这种情况下由这个 acker 所跟踪的所有 spout tuple 都会超时,也就会被重新处理。

3. Spout 挂掉了: 在这种情况下给 spout 发送消息的消息源负责重新发送这些消息。比如 Kestrel 和 RabbitMQ 在一个客户端断开之后会把所有”处理中“的消息放回队列。

就像你看到的那样,storm 的可靠性机制是完全分布式的,可伸缩的并且是高度容错的。

 

调整可靠性 (Tuning Reliability)

acker task 是非常轻量级的,所以一个 topology 里面不需要很多 acker。你可以通过 Strom UI(id: -1)来跟踪它的性能。如果它的吞吐量看起来不正常,那么你就需要多加点 acker 了。

如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据,那么你可以通过不跟踪这些 tuple 树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半,因为对于每一个 tuple 都要发送一个 ack 消息。并且它需要更少的 id 来保存下游的 tuple,减少带宽占用。

有三种方法可以去掉可靠性。第一是把 Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下,storm 会在 spout 发射一个 tuple 之后马上调用 spout 的 ack 方法。也就是说这个 tuple 树不会被跟踪。

第二个方法是在 tuple 层面去掉可靠性。你可以在发射 tuple 的时候不指定 messageid 来达到不跟粽某个特定的 spout tuple 的目的。

最后一个方法是如果你对于一个 tuple 树里面的某一部分到底成不成功不是很关心,那么可以在发射这些 tuple 的时候 unanchor 它们。这样这些 tuple 就不在 tuple 树里面,也就不会被跟踪了。

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计6996字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中