阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Twitter Storm源代码分析之Tuple是如何发送的

132次阅读
没有评论

共计 2914 个字符,预计需要花费 8 分钟才能阅读完成。

这篇文章里面我们来看一下 Storm 里面的 tuple 到底是如何从一个 tuple 是怎么从一个 bolt 到另一个 bolt 上去的。

首先 Bolt 在发射一个 tuple 的时候是调用 OutputCollector 的 emit 或者 emitDirect 方法,
而这两个方法最终调用的是 clojure 代码里面的 mk-transfer-fn 方法:

 
1
2
3
4
5
6
;worker.clj
(defn mk-transfer-fn [transfer-queue]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue
transfer-queue [task tuple])
))

这个方法其实只是往一个 LinkedBlockingQueue 里面放入一条新记录 (task-id, tuple)
然后这个 queue 里面的内容会被下面这段代码处理

 
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
; worker.clj
; 这里面的这个 socket 到底是什么东西?
(async-loop
(fn [^ArrayList drainer
^KryoTupleSerializer serializer]
; 从 transfer-queue 里面取出一个任务来
; 这个任务其实就是(task, tuple)
(let [felem (.take transfer-queue)]
(.add drainer felem)
(.drainTo transfer-queue drainer))
(read-locked endpoint-socket-lock
; 获取从 node+port 到 socket 的映射
(let [node+port->socket @node+port->socket
; 获取从 task-id 到 node+port 的映射
task->node+port @task->node+port]
(doseq [[task ^Tuple tuple] drainer]
; 获取 task 对应的 socket
(let [socket
(node+port->socket
(task->node+port task))
; 序列化这个 tuple
ser-tuple (.serialize serializer tuple)]
; 发送这个 tuple
(msg/send socket task ser-tuple)
))
))

从上面代码可见,tuple 最终是被序列化之后由 msg/send 方法通过 socket 发送给指定的 task 的。注意上面代码里面的 async-loop 表示会创建一个单独的线程来执行这些代码。可以 storm 会起一个独立线程来专门发送待发送的消息的。

我们来看下这个 socket 到底是个怎么样的东西。这个 socket 是在 worker.clj 里面被初始化的,看下面的代码:

 
01
02
03
04
05
06
07
08
09
10
11
12
13
; socket(worker.clj)
(swap! node+port->socket
merge
(into {}
(dofor
[[node port :as endpoint] new-connections]
[endpoint
(msg/connect
mq-context
((:node->host assignment) node)
port)
]
)))

从上面代码可以看出 socket 其实是 msg/connect 创建出来的。那 msg/connect 到底在做什么呢?这个方法是定义在 protocol.clj 里面的:

 
1
2
3
4
5
6
(defprotocol Context
(bind [context virtual-port])
(connect [context host port])
(send-local-task-empty [context virtual-port])
(term [context])
)

这里定义的只是一个接口而已,具体的实现是在 zmq.clj 里面。zmq 是 ZeroMQ 的缩写, 可见 storm 的 supervisor 之间就是利用 zeromq 来传递 tuple 的。

zmq.clj 里面的 ZMQCOntext 实现了 Context 接口:

 
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
(deftype ZMQContext [context linger-ms ipc?]
; 实现 Context 接口
Context
; 从给定的 virtual-port 拉消息
(bind [this virtual-port]
(-> context
(mq/socket mq/pull)
(mqvp/virtual-bind virtual-port)
(ZMQConnection.)
))
; 给给定的 host,port 推送消息(push)
(connect [this host port]
(let [url (if ipc?
(str "ipc://" port "ipc")
(str "tcp://" host ":" port))]
(-> context
(mq/socket mq/push)
(mq/set-linger linger-ms)
(mq/connect url)
(ZMQConnection.))))
; 给本地的 virtual-port 发送一条空消息
(send-local-task-empty [this virtual-port]
(let [pusher
(-> context
(mq/socket mq/push)
(mqvp/virtual-connect virtual-port))]
(mq/send pusher (mq/barr))
(.close pusher)))
(term [this]
(.term context))
; 实现 ZMQContextQuery 接口
ZMQContextQuery
(zmq-context [this]
context))

总结一些 Twitter Storm 对于 tuple 的处理 / 创建过程:

  1. Bolt 创建一个 tuple。
  2. Worker 把 tuple, 以及这个 tuple 要发送的地址 (task-id) 组成一个对象 (task-id, tuple) 放进待发送队列(LinkedBlockingQueue).
  3. 一个单独的线程(async-loop 所创建的线程)会取出发送队列里面的每个 tuple 来处理
      • Worker 创建从当前 task 到目的 task 的 zeromq 连接。
      • 序列化这个 tuple 并且通过这个 zeromq 的连接来发送这个 tuple。

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计2914字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中