Storm并行度
public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); // SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); Utils.sleep(10000); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
流程:(包括一个spout和三个bolt)
sentence-spout 生成句子 split-bolt 切分句子传入单词 count-bolt 单词统计 report-bolt 结果输出
1、WordCountTopology并行性
2、在拓扑中增加worker
Config config = new Config(); config.setNumWorkers(2);
3、配置executor数和task数
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 4) .fieldsGrouping(SPLIT_BOLT_ID, newFields("word"));
--- FINAL COUNTS --- a : 2726 ate : 2722 beverages : 2723 cold : 2723 cow : 2726 dog : 5445 don‘t : 5444 fleas : 5451 has : 2723 have : 2722 homework : 2722 i : 8175 like : 5449 man : 2722 my : 5445 the : 2727 think : 2722 --------------
原来的结果:
--- FINAL COUNTS --- a : 1426 ate : 1426 beverages : 1426 cold : 1426 cow : 1426 dog : 2852 don‘t : 2851 fleas : 2851 has : 1426 have : 1426 homework : 1426 i : 4276 like : 2851 man : 1426 my : 2852 the : 1426 think : 1425 --------------
Storm流分组
public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks); List<Integer> chooseTasks(int taskId, List<Object> values); }
public void nextTuple() { if(index < sentences.length){ this.collector.emit(new Values(sentences[index])); index++; } Utils.waitForMillis(1); }
--- FINAL COUNTS --- a : 2 ate : 2 beverages : 2 cold : 2 cow : 2 dog : 4 don‘t : 4 fleas : 4 has : 2 have : 2 homework : 2 i : 6 like : 4 man : 2 my : 4 the : 2 think : 2 --------------
原来:
builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
.fieldsGrouping(SPLIT_BOLT_ID, newFields("word"))
改成:
builder.setBolt(COUNT_BOLT_ID, countBolt, 4) .shuffleGrouping(SPLIT_BOLT_ID);
--- FINAL COUNTS --- a : 1 ate : 2 beverages : 1 cold : 1 cow : 1 dog : 2 don‘t : 2 fleas : 1 has : 1 have : 1 homework : 1 i : 3 like : 1 man : 1 my : 1 the : 1 think : 1 --------------
我们计算不正确了,因为CountBolt参数是有状态:它保留一个计数为每个收到的单词的。在这种情况下,我们计算的准确性取决于当组件被并行化基于元组的内容分组的能力。引入的错误我们将只显示如果CountBolt参数大于1的并行性。这强调了测试拓扑与各种并行配置的重要性。
1、Tip
2、消息处理保证
3、Spout的可靠性
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
collector.emit(new Values("value1", "value2") ,msgId);
4、bolt可靠性
collector.emit(tuple, new Values(word));
collector.emit(new Values(word));
this.collector.ack(tuple);
this.collector.fail(tuple)
5、可靠的word count(修改后的程序)
public class SentenceSpout extends BaseRichSpout { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector collector; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don‘t have a cow man", "i don‘t think i like fleas" }; private int index = 0; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map config, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.pending = new ConcurrentHashMap<UUID, Values>(); } public void nextTuple() { Values values = new Values(sentences[index]); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); index++; if (index >= sentences.length) { index = 0; } Utils.sleep(1); } public void ack(Object msgId) { this.pending.remove(msgId); } public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); } }
public class ReliableSplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(tuple, new Values(word)); } this.collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
原文:https://www.cnblogs.com/51python/p/10976608.html