共计 2914 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章里面我们来看一下 Storm 里面的 tuple 到底是如何从一个 tuple 是怎么从一个 bolt 到另一个 bolt 上去的。
首先 Bolt 在发射一个 tuple 的时候是调用 OutputCollector 的 emit 或者 emitDirect 方法,
而这两个方法最终调用的是 clojure 代码里面的 mk-transfer-fn 方法:
1 2 3 4 5 6 | ;worker.clj(defn mk-transfer-fn [transfer-queue](fn [task ^Tuple tuple](.put ^LinkedBlockingQueuetransfer-queue [task tuple]))) |
这个方法其实只是往一个 LinkedBlockingQueue 里面放入一条新记录 (task-id, tuple)
然后这个 queue 里面的内容会被下面这段代码处理
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ; worker.clj; 这里面的这个 socket 到底是什么东西?(async-loop(fn [^ArrayList drainer^KryoTupleSerializer serializer]; 从 transfer-queue 里面取出一个任务来; 这个任务其实就是(task, tuple)(let [felem (.take transfer-queue)](.add drainer felem)(.drainTo transfer-queue drainer))(read-locked endpoint-socket-lock; 获取从 node+port 到 socket 的映射(let [node+port->socket @node+port->socket; 获取从 task-id 到 node+port 的映射task->node+port @task->node+port](doseq [[task ^Tuple tuple] drainer]; 获取 task 对应的 socket(let [socket(node+port->socket(task->node+port task)); 序列化这个 tupleser-tuple (.serialize serializer tuple)]; 发送这个 tuple(msg/send socket task ser-tuple)))))) |
从上面代码可见,tuple 最终是被序列化之后由 msg/send 方法通过 socket 发送给指定的 task 的。注意上面代码里面的 async-loop 表示会创建一个单独的线程来执行这些代码。可以 storm 会起一个独立线程来专门发送待发送的消息的。
我们来看下这个 socket 到底是个怎么样的东西。这个 socket 是在 worker.clj 里面被初始化的,看下面的代码:
01 02 03 04 05 06 07 08 09 10 11 12 13 | ; socket(worker.clj)(swap! node+port->socketmerge(into {}(dofor[[node port :as endpoint] new-connections][endpoint(msg/connectmq-context((:node->host assignment) node)port)]))) |
从上面代码可以看出 socket 其实是 msg/connect 创建出来的。那 msg/connect 到底在做什么呢?这个方法是定义在 protocol.clj 里面的:
1 2 3 4 5 6 | (defprotocol Context(bind [context virtual-port])(connect [context host port])(send-local-task-empty [context virtual-port])(term [context])) |
这里定义的只是一个接口而已,具体的实现是在 zmq.clj 里面。zmq 是 ZeroMQ 的缩写, 可见 storm 的 supervisor 之间就是利用 zeromq 来传递 tuple 的。
zmq.clj 里面的 ZMQCOntext 实现了 Context 接口:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | (deftype ZMQContext [context linger-ms ipc?]; 实现 Context 接口Context; 从给定的 virtual-port 拉消息(bind [this virtual-port](-> context(mq/socket mq/pull)(mqvp/virtual-bind virtual-port)(ZMQConnection.))); 给给定的 host,port 推送消息(push)(connect [this host port](let [url (if ipc?(str "ipc://" port "ipc")(str "tcp://" host ":" port))](-> context(mq/socket mq/push)(mq/set-linger linger-ms)(mq/connect url)(ZMQConnection.)))); 给本地的 virtual-port 发送一条空消息(send-local-task-empty [this virtual-port](let [pusher(-> context(mq/socket mq/push)(mqvp/virtual-connect virtual-port))](mq/send pusher (mq/barr))(.close pusher)))(term [this](.term context)); 实现 ZMQContextQuery 接口ZMQContextQuery(zmq-context [this]context)) |
总结一些 Twitter Storm 对于 tuple 的处理 / 创建过程:
- Bolt 创建一个 tuple。
- Worker 把 tuple, 以及这个 tuple 要发送的地址 (task-id) 组成一个对象 (task-id, tuple) 放进待发送队列(LinkedBlockingQueue).
- 一个单独的线程(async-loop 所创建的线程)会取出发送队列里面的每个 tuple 来处理
- Worker 创建从当前 task 到目的 task 的 zeromq 连接。
- 序列化这个 tuple 并且通过这个 zeromq 的连接来发送这个 tuple。
推荐阅读:
Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm
安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm
Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm
Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm






