阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Twitter Storm源代码分析之acker工作流程

431次阅读
没有评论

共计 4853 个字符,预计需要花费 13 分钟才能阅读完成。

概述

我们知道 storm 一个很重要的特性是它能够保证你发出的每条消息都会被完整处理,完整处理的意思是指:

一个 tuple 被完全处理的意思是:这个 tuple 以及由这个 tuple 所导致的所有的 tuple 都被成功处理。而一个 tuple 会被认为处理失败了如果这个消息在 timeout 所指定的时间内没有成功处理。

也就是说对于任何一个 spout-tuple 以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看 Twitter Storm 如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm 里面有个专门的 acker 来跟踪所有 tuple 的完成情况。这篇文章就来讨论 acker 的详细工作流程。

源代码列表

这篇文章涉及到的源代码主要包括:

  1. backtype.storm.daemon.acker
  2. backtype.storm.daemon.task
  3. backtype.storm.task.OutputCollectorImpl

算法简介

acker 对于 tuple 的跟踪算法是 storm 的主要突破之一,这个算法使得对于任意大的一个 tuple 树,它只需要恒定的 20 字节就可以进行跟踪了。原理很简单:acker 对于每个 spout-tuple 保存一个 ack-val 的校验值,它的初始值是 0,然后每发射一个 tuple/ack 一个 tuple,那么 tuple 的 id 都要跟这个校验值异或一下,并且把得到的值更新为 ack-val 的新值。那么假设每个发射出去的 tuple 都被 ack 了,那么最后 ack-val 一定是 0(因为一个数字跟自己异或得到的值是 0)。

进入正题

那么下面我们从源代码层面来看看 哪些组件在哪些时候会给 acker 发送什么样的消息来共同完成这个算法的。acker 对消息进行处理的主要是下面这块代码:

 
01
02
03
04
05
06
07
08
09
10
11
(let [id (.getValue tuple 0)
^TimeCacheMap pending @pending
curr (.get pending id)
curr (condp = (.getSourceStreamId tuple)
ACKER-INIT-STREAM-ID (-> curr
(update-ack id)
(assoc :spout-task (.getValue tuple 1)))
ACKER-ACK-STREAM-ID (update-ack
curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
...)

Spout 创建一个新的 tuple 的时候给 acker 发送消息

消息格式 (看上面代码的第 1 行和第 7 行对于tuple.getValue() 的调用)

 
1
(spout-tuple-id, task-id)

消息的 streamId 是__ack_init(ACKER-INIT-STREAM-ID)

这是告诉 acker, 一个新的 spout-tuple 出来了,你跟踪一下,它是由 id 为 task-id 的 task 创建的 (这个 task-id 在后面会用来通知这个 task:你的 tuple 处理成功了 / 失败了)。处理完这个消息之后,acker 会在它的 pending 这个 map(类型为 TimeCacheMap) 里面添加这样一条记录:

 
1
{spout-tuple-id {:spout-task task-id :val ack-val)}

这就是 acker 对 spout-tuple 进行跟踪的核心数据结构,对于每个 spout-tuple 所产生的 tuple 树的跟踪都只需要保存上面这条记录。acker 后面会检查:val 什么时候变成 0,变成 0,说明这个 spout-tuple 产生的 tuple 都处理完成了。

Bolt 发射一个新 tuple 的时候会给 acker 发送消息么?

任何一个 bolt 在发射一个新的 tuple 的时候, 是不会直接通知 acker 的,如果这样做的话那么每发射一个消息会有三条消息了:

  1. Bolt 创建这个 tuple 的时候,把它发给下一个 bolt 的消息
  2. Bolt 创建这个 tuple 的时候,发送给 acker 的消息
  3. ack tuple 的时候发送的 ack 消息

事实上 storm 里面只有第一条和第三条消息,它把第二条消息省掉了,怎么做到的呢?storm 这点做得挺巧妙的,bolt 在发射一个新的 bolt 的时候会把这个新 tuple 跟它的父 tuple 的关系保存起来。然后在 ack 每个 tuple 的时候,storm 会把要 ack 的 tuple 的 id, 以及这个 tuple 新创建的所有的 tuple 的 id 的异或值发送给 acker。这样就给每个 tuple 省掉了一个消息(具体看下一节)。

Tuple 被 ack 的时候给 acker 发送消息

每个 tuple 在被 ack 的时候,会给 acker 发送一个消息,消息格式是:

 
1
(spout-tuple-id, tmp-ack-val)

消息的 streamId 是__ack_ack(ACKER-ACK-STREAM-ID)

注意,这里的 tmp-ack-val 是要 ack 的 tuple 的 id 与由它新创建的所有的 tuple 的 id 异或的结果:

 
1
tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ...)

我们可以从 task.clj 里面的 send-ack 方法看出这一点:

 
01
02
03
04
05
06
07
08
09
10
11
12
13
(defn- send-ack [^TopologyContext topology-context
^Tuple input-tuple
^List generated-ids send-fn]
(let [ack-val (bit-xor-vals generated-ids)]
(doseq [
[anchor id] (.. input-tuple
getMessageId
getAnchorsToIds)]
(send-fn (Tuple. topology-context
[anchor (bit-xor ack-val id)]
(.getThisTaskId topology-context)
ACKER-ACK-STREAM-ID))
)))

这里面的 generated-ids 参数就是这个 input-tuple 的所有子 tuple 的 id,从代码可以看出 storm 会给这个 tuple 的每一个 spout-tuple 发送一个 ack 消息。

为什么说这里的 generated-ids 是 input-tuple 的子 tuple 呢?这个 send-ack 是被 OutputCollectorImpl 里面的 ack 方法调用的:

 
1
2
3
4
5
6
7
public void ack(Tuple input) {
List generated = getExistingOutput(input);
// don't just do this directly in case
// there was no output
_pendingAcks.remove(input);
_collector.ack(input, generated);
}

generated 是由 getExistingOutput(input) 方法计算出来的,我们再来看看这个方法的定义:

 
1
2
3
4
5
6
7
8
9
private List getExistingOutput(Tuple anchor) {
if(_pendingAcks.containsKey(anchor)) {
return _pendingAcks.get(anchor);
} else {
List ret = new ArrayList();
_pendingAcks.put(anchor, ret);
return ret;
}
}

_pendingAcks里面存的是什么东西呢?

 
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Tuple anchorTuple(Collection< Tuple > anchors,
String streamId,
List< Object > tuple) {
// The simple algorithm in this function is the key
// to Storm. It is what enables Storm to guarantee
// message processing.
// 这个 map 存的东西是 spout-tuple-id 到 ack-val 的映射
Map< Long, Long > anchorsToIds
= new HashMap<Long, Long>();
// anchors 其实就是它的所有父亲:spout-tuple
if(anchors!=null) {
for(Tuple anchor: anchors) {
long newId = MessageId.generateId();
// 告诉每一个父亲,你们又多了一个儿子了。
getExistingOutput(anchor).add(newId);
for(long root: anchor.getMessageId()
.getAnchorsToIds().keySet()) {
Long curr = anchorsToIds.get(root);
if(curr == null) curr = 0L;
 
// 更新 spout-tuple-id 的 ack-val
anchorsToIds.put(root, curr ^ newId);
}
}
}
return new Tuple(_context, tuple,
_context.getThisTaskId(),
streamId,
MessageId.makeId(anchorsToIds));
}

从上面代码里面的红色部分我们可以看出,_pendingAcks里面维护的其实就是 tuple 到自己儿子的对应关系。

Tuple 处理失败的时候会给 acker 发送失败消息

acker 会忽略这种消息的消息内容(消息的 streamId 为ACKER-FAIL-STREAM-ID), 直接将对应的 spout-tuple 标记为失败(最上面代码第 9 行)

最后 Acker 发消息通知 spout-tuple 对应的 Worker

最后,acker 会根据上面这些消息的处理结果来通知这个 spout-tuple 对应的 task:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(when (and curr
(:spout-task curr))
(cond (= 0 (:val curr))
;; ack-val == 0 说明这个 tuple 的所有子孙都
;; 处理成功了(都发送 ack 消息了)
;; 那么发送成功消息通知创建这个 spout-tuple 的 task.
(do
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
ACKER-ACK-STREAM-ID
[id]
))
;; 如果这个 spout-tuple 处理失败了
;; 发送失败消息给创建这个 spout-tuple 的 task
(:failed curr)
(do
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
ACKER-FAIL-STREAM-ID
[id]
))
))

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计4853字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19350
评论数
4
阅读量
7960669
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face 免费服务器 +Docker 快速部署 HertzBeat 监控平台 ...
告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

  告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...
免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

免费无广告!这款跨平台AI RSS阅读器,拯救你的信息焦虑

  免费无广告!这款跨平台 AI RSS 阅读器,拯救你的信息焦虑 在算法推荐主导信息流的时代,我们...
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...
300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

300元就能买到的”小钢炮”?惠普7L四盘位小主机解析

  300 元就能买到的 ” 小钢炮 ”?惠普 7L 四盘位小主机解析 最近...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

零成本上线!用 Hugging Face 免费服务器 +Docker 快速部署 HertzBeat 监控平台 ...
安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装Black群晖DSM7.2系统安装教程(在Vmware虚拟机中、实体机均可)!

安装 Black 群晖 DSM7.2 系统安装教程(在 Vmware 虚拟机中、实体机均可)! 前言 大家好,...
国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

国产开源公众号 AI 知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率 大家好,我是星哥,...
仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...
支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare 也瘫了连监控都挂,根因藏在哪? 最近两天的互联网堪称“故障...