Worker通过worker-data方法定义了一个包含很多共享数据的映射集合,Worker中很多方法都依赖它
mk-worker
功能:
创建对应的计时器、Executor、接收线程接收消息
方法原型:
1 | (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]) |
Worker内部数据处理流程图:
功能:
该心态信息被写入到本地文件系统,Supervisor读取这些心跳信息来判断Worker状态,然后决定是否重启Worker.
实现代码:
1 | (defn do-heartbeat [worker]) |
2 | (let [conf (:conf worker) |
3 | hb (WorkerHearbeat. |
4 | (current-time-secs) |
5 | (:storm-id worker) |
6 | (:executors worker) |
7 | (:port worker))] |
8 | (.put (worker-state conf (:worker-id worker)) |
9 | LS_WORKER_HEARTBEAT |
10 | hb))) |
代码说明:
发送Worker心跳信息
功能:
Woker的心跳信息使用heartbeat-timer计时器进行持续发送,发送间隔默认为1秒(WORKER-HEARTBEAT-FREQUENCY-SECS)
调用方式:
1 | _(schedule-recurring (:hearbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) |
Executor心跳
功能:
Worker心跳信息保存到本地文件夹,Executor心态保存到zookeeper中。
do-executor-hearbeats函数用来发送一次心跳。
方法原型:
1 | (defnk do-executor-heartbeats [worker :executors nil]) |
代码说明:
发送Executor心跳信息
功能:
Worker使用:executor-hearbeat-timer计时器线程来发送Executor心跳信息,默认三秒钟发送一次。
发送过程:
1 | _(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK_HEARBEAT-FREQUENCY-SECS) #(do-executor-hearbeats worker :executors @executors)) |
原文:http://www.cnblogs.com/jianyuan/p/4858795.html