是一个分布式, 高容错的 实时计算框架
Storm进程常驻内存, 永久运行
Storm数据不经过磁盘, 在内存中流转, 通过网络直接发送给下游
流式处理(streaming) 与 批处理(batch)
流(streaming): Storm, Flink(其实Flink也可以做批处理)
Storm | MapReduce |
---|---|
流式处理 | 批处理 |
毫秒级 | 分钟级 |
DAG模型 | Map+Reduce模型 |
常驻运行 | 反复启停 |
Topology - DAG 有向无环图
对Storm实时计算逻辑进行封装
由一系列通过数据流相互关联的Spout、Bolt锁组成的拓扑结构
生命周期: 此拓扑只要启动就会一直在集群中运行, 直到手动将其kill, 否则不会终止
(与MapReduce中的Job的区别: MR中的Job在计算机执行完成就会终止)
Tuple - 元组
Stream - 数据流
Spout - 数据源
拓扑中数据流的来源。一般会从指定外部的数据源读取元组 (Tuple) 发送到拓扑(Topology) 中。
一个Spout可以发送多个数据流(Stream)
Spout 中最核心的方法是 nextTuple, 该方法会被Storm线程不断调用、主动从数据源拉取数据, 再通过emit 方法将数据生成元组 (Tuple) 发送给之后的 Bolt 计算
Bolt - 对数据流进行处理的组件
拓扑中数据处理均由Bolt完成。对于简单的任务或者数据流转换, 单个Bolt可以简单实现; 更加复杂场景往往需要多个Bolt分多个步骤完成
一个Bolt可以发送多个数据流(Stream)
Bolt 中最核心的方法是execute方法, 该方法负责接收到一个元组 (Tuple) 数据 以及 真正实现核心的业务逻辑
简单的WorldCount实例
package com.ronnie.storm.wordCount;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class WordCountSpout extends BaseRichSpout {
private Random random = new Random();
SpoutOutputCollector collector;
String[] lines = {
"Well done Gaben well fucking done",
"What is going wrong with you Bro",
"You are so fucking retard",
"What the hell is it",
"hadoop spark storm flink",
"mysql oracle memcache redis mongodb"
};
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 1. storm 会一直(死循环)调用此方法
* 2. 每次调用此方法, 往下游发输出
*
* while(flag){
* nextTuple();
* }
*/
@Override
public void nextTuple() {
int index = random.nextInt(lines.length);
String line = lines[index];
System.out.println("line: " + line);
collector.emit(new Values(line));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("liner"));
}
}
package com.ronnie.storm.wordCount;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class WordCountSplit extends BaseRichBolt {
// 提升作用域
OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
System.err.println(this + "=============================");
this.collector = collector;
}
@Override
public void execute(Tuple input) {
// 从域中获取数据, 要与之前Spout中 declareOutputFields 的域名称一致
String line = input.getStringByField("liner");
// 根据什么分离
String[] words = line.split(" ");
for (String word: words){
// Value是一个ArrayList, 其中存的对象要与后面声明的域中属性相对应
collector.emit(new Values(word,"ronnie"));
}
}
/**
* Fields中 的名称 与 前面 value 中的属性相应
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "name"));
}
}
WordCount
package com.ronnie.storm.wordCount;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class WordCount extends BaseRichBolt {
Map<String, Integer> result = new HashMap<>();
/**
* 初始化任务
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
/**
* 最核心方法
* 上游传tuple数据给它, 并调用此方法
* @param input
*/
@Override
public void execute(Tuple input) {
String word = input.getString(0);
Integer integer = result.get(word);
if (null == integer){
integer = 1;
} else {
integer += 1;
}
result.put(word, integer);
System.err.println(word + " : " + integer);
}
/**
* 声明输出的域类型
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
WordCountTopology
package com.ronnie.storm.wordCount;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("wcSpout", new WordCountSpout());
// setBolt 的第三个参数为并行量 setNumTasks 修改 任务数量为 4
topologyBuilder.setBolt("wcSplit", new WordCountSplit(), 2).setNumTasks(4).shuffleGrouping("wcSpout");
topologyBuilder.setBolt("wcCount", new WordCount(), 5).fieldsGrouping("wcSplit", new Fields("word"));
StormTopology topology = topologyBuilder.createTopology();
Config config = new Config();
// 修改配置文件中的worker数量为3
config.setNumWorkers(3);
// 只要参数存在
if (args.length > 0){
try {
StormSubmitter.submitTopology(args[0],config,topology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
// 不存在就执行本地任务
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordCount", config, topology);
}
}
}
最后可将任务打成 jar 包传送到linux系统上(已经部署好storm集群), 再通过命令行执行任务
[root@node01 storm-0.10.0] bin/storm jar /opt/ronnie/wc.jar com.ronnie.storm.wordCount.WordCountTopology wc
# 在storm目录下 bin/storm jar jar文件目录 包结构.任务类 任务参数
Nimbus
Supervisor
接收Nimbus分配的任务
启动、停止自己管理的worker进程 (当前supervisor上的work数量可通过配置文件设定)
Worker
运行具体处理运算组件的进程 (每个Worker对应执行一个Topology 的子集)
worker 任务类型:
启动 executor
Zookeeper
与Hadoop架构对比
Hadoop | Storm | |
---|---|---|
主节点 | ResourceManager | Nimbus |
从节点 | NodeManager | Supervisor |
应用程序 | Job | Topology |
工作进程 | Child | Worker |
计算模型 | Map/Reduce | Spout/Bolt |
DRPC (Distributed RPC)
分布式远程过程调用
DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式RPC功能的。
DRPC Server 负责接收 RPC 请求, 并将该请求发送到Storm中运行的Topology, 等待接收 Topology 发送的处理结果, 并将该结果返回给发送请求的客户端。
集群节点宕机
Nimbus服务器
非Nimbus服务器
进程挂了
消息的完整性
消息完整性的实现机制
Acker
基本组件
Worker - 进程
Executor - 线程
Task
设置参数
Worker进程数
Executor线程数
Task数量
Rebalance - 重平衡
Worker进程间的数据通信
ZMQ
Nettty
Nettty 是基于NIO(Not Blocked Input Output)的网络框架(是对NIO包的一种封装, 因为原生API不是很好用),更加高效。
Storm 0.9版本之后使用Netty是因为ZMQ的license和Storm的license不兼容。
Worker内部的数据通信
Fields Grouping
All Grouping
Global Grouping
原文:https://www.cnblogs.com/ronnieyuan/p/11695774.html