共计 4853 个字符,预计需要花费 13 分钟才能阅读完成。
概述
我们知道 storm 一个很重要的特性是它能够保证你发出的每条消息都会被完整处理,完整处理的意思是指:
一个 tuple 被完全处理的意思是:这个 tuple 以及由这个 tuple 所导致的所有的 tuple 都被成功处理。而一个 tuple 会被认为处理失败了如果这个消息在 timeout 所指定的时间内没有成功处理。
也就是说对于任何一个 spout-tuple 以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看 Twitter Storm 如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm 里面有个专门的 acker 来跟踪所有 tuple 的完成情况。这篇文章就来讨论 acker 的详细工作流程。
源代码列表
这篇文章涉及到的源代码主要包括:
- backtype.storm.daemon.acker
- backtype.storm.daemon.task
- backtype.storm.task.OutputCollectorImpl
算法简介
acker 对于 tuple 的跟踪算法是 storm 的主要突破之一,这个算法使得对于任意大的一个 tuple 树,它只需要恒定的 20 字节就可以进行跟踪了。原理很简单:acker 对于每个 spout-tuple 保存一个 ack-val 的校验值,它的初始值是 0,然后每发射一个 tuple/ack 一个 tuple,那么 tuple 的 id 都要跟这个校验值异或一下,并且把得到的值更新为 ack-val 的新值。那么假设每个发射出去的 tuple 都被 ack 了,那么最后 ack-val 一定是 0(因为一个数字跟自己异或得到的值是 0)。
进入正题
那么下面我们从源代码层面来看看 哪些组件在哪些时候会给 acker 发送什么样的消息来共同完成这个算法的。acker 对消息进行处理的主要是下面这块代码:
01 02 03 04 05 06 07 08 09 10 11 | (let [id (.getValue tuple 0)^TimeCacheMap pending @pendingcurr (.get pending id)curr (condp = (.getSourceStreamId tuple)ACKER-INIT-STREAM-ID (-> curr(update-ack id)(assoc :spout-task (.getValue tuple 1)))ACKER-ACK-STREAM-ID (update-ackcurr (.getValue tuple 1))ACKER-FAIL-STREAM-ID (assoc curr :failed true))]...) |
Spout 创建一个新的 tuple 的时候给 acker 发送消息
消息格式 (看上面代码的第 1 行和第 7 行对于tuple.getValue() 的调用)
1 | (spout-tuple-id, task-id) |
消息的 streamId 是__ack_init(ACKER-INIT-STREAM-ID)
这是告诉 acker, 一个新的 spout-tuple 出来了,你跟踪一下,它是由 id 为 task-id 的 task 创建的 (这个 task-id 在后面会用来通知这个 task:你的 tuple 处理成功了 / 失败了)。处理完这个消息之后,acker 会在它的 pending 这个 map(类型为 TimeCacheMap) 里面添加这样一条记录:
1 | {spout-tuple-id {:spout-task task-id :val ack-val)} |
这就是 acker 对 spout-tuple 进行跟踪的核心数据结构,对于每个 spout-tuple 所产生的 tuple 树的跟踪都只需要保存上面这条记录。acker 后面会检查:val 什么时候变成 0,变成 0,说明这个 spout-tuple 产生的 tuple 都处理完成了。
Bolt 发射一个新 tuple 的时候会给 acker 发送消息么?
任何一个 bolt 在发射一个新的 tuple 的时候, 是不会直接通知 acker 的,如果这样做的话那么每发射一个消息会有三条消息了:
- Bolt 创建这个 tuple 的时候,把它发给下一个 bolt 的消息
Bolt 创建这个 tuple 的时候,发送给 acker 的消息- ack tuple 的时候发送的 ack 消息
事实上 storm 里面只有第一条和第三条消息,它把第二条消息省掉了,怎么做到的呢?storm 这点做得挺巧妙的,bolt 在发射一个新的 bolt 的时候会把这个新 tuple 跟它的父 tuple 的关系保存起来。然后在 ack 每个 tuple 的时候,storm 会把要 ack 的 tuple 的 id, 以及这个 tuple 新创建的所有的 tuple 的 id 的异或值发送给 acker。这样就给每个 tuple 省掉了一个消息(具体看下一节)。
Tuple 被 ack 的时候给 acker 发送消息
每个 tuple 在被 ack 的时候,会给 acker 发送一个消息,消息格式是:
1 | (spout-tuple-id, tmp-ack-val) |
消息的 streamId 是__ack_ack(ACKER-ACK-STREAM-ID)
注意,这里的 tmp-ack-val 是要 ack 的 tuple 的 id 与由它新创建的所有的 tuple 的 id 异或的结果:
1 | tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ...) |
我们可以从 task.clj 里面的 send-ack 方法看出这一点:
01 02 03 04 05 06 07 08 09 10 11 12 13 | (defn- send-ack [^TopologyContext topology-context^Tuple input-tuple^List generated-ids send-fn](let [ack-val (bit-xor-vals generated-ids)](doseq [[anchor id] (.. input-tuplegetMessageIdgetAnchorsToIds)](send-fn (Tuple. topology-context[anchor (bit-xor ack-val id)](.getThisTaskId topology-context)ACKER-ACK-STREAM-ID))))) |
这里面的 generated-ids 参数就是这个 input-tuple 的所有子 tuple 的 id,从代码可以看出 storm 会给这个 tuple 的每一个 spout-tuple 发送一个 ack 消息。
为什么说这里的 generated-ids 是 input-tuple 的子 tuple 呢?这个 send-ack 是被 OutputCollectorImpl 里面的 ack 方法调用的:
1 2 3 4 5 6 7 | public void ack(Tuple input) {List generated = getExistingOutput(input);// don't just do this directly in case// there was no output_pendingAcks.remove(input);_collector.ack(input, generated);} |
generated 是由 getExistingOutput(input) 方法计算出来的,我们再来看看这个方法的定义:
1 2 3 4 5 6 7 8 9 | private List getExistingOutput(Tuple anchor) {if(_pendingAcks.containsKey(anchor)) {return _pendingAcks.get(anchor);} else {List ret = new ArrayList();_pendingAcks.put(anchor, ret);return ret;}} |
_pendingAcks里面存的是什么东西呢?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | private Tuple anchorTuple(Collection< Tuple > anchors,String streamId,List< Object > tuple) {// The simple algorithm in this function is the key// to Storm. It is what enables Storm to guarantee// message processing.// 这个 map 存的东西是 spout-tuple-id 到 ack-val 的映射Map< Long, Long > anchorsToIds= new HashMap<Long, Long>();// anchors 其实就是它的所有父亲:spout-tupleif(anchors!=null) {for(Tuple anchor: anchors) {long newId = MessageId.generateId();// 告诉每一个父亲,你们又多了一个儿子了。getExistingOutput(anchor).add(newId);for(long root: anchor.getMessageId().getAnchorsToIds().keySet()) {Long curr = anchorsToIds.get(root);if(curr == null) curr = 0L;// 更新 spout-tuple-id 的 ack-valanchorsToIds.put(root, curr ^ newId);}}}return new Tuple(_context, tuple,_context.getThisTaskId(),streamId,MessageId.makeId(anchorsToIds));} |
从上面代码里面的红色部分我们可以看出,_pendingAcks里面维护的其实就是 tuple 到自己儿子的对应关系。
Tuple 处理失败的时候会给 acker 发送失败消息
acker 会忽略这种消息的消息内容(消息的 streamId 为ACKER-FAIL-STREAM-ID), 直接将对应的 spout-tuple 标记为失败(最上面代码第 9 行)
最后 Acker 发消息通知 spout-tuple 对应的 Worker
最后,acker 会根据上面这些消息的处理结果来通知这个 spout-tuple 对应的 task:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | (when (and curr(:spout-task curr))(cond (= 0 (:val curr));; ack-val == 0 说明这个 tuple 的所有子孙都;; 处理成功了(都发送 ack 消息了);; 那么发送成功消息通知创建这个 spout-tuple 的 task.(do(.remove pending id)(acker-emit-direct @output-collector(:spout-task curr)ACKER-ACK-STREAM-ID[id]));; 如果这个 spout-tuple 处理失败了;; 发送失败消息给创建这个 spout-tuple 的 task(:failed curr)(do(.remove pending id)(acker-emit-direct @output-collector(:spout-task curr)ACKER-FAIL-STREAM-ID[id])))) |
推荐阅读:
Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm
安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm
Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm
Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm






