Spout (随机生成随机数)-----------------------》Ecclamation(加感叹号) -----> print(加问候语)
TestWordSpout
package cn.ljh.storm.helloworld; import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); SpoutOutputCollector _collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
ExclamationBolt
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); //指定名称 } }
printBolt
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintBolt extends BaseRichBolt { private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class); OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { LOG.info(tuple.getString(0) + " Hello World!"); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { //不再下发 } }
ExclamationTopology
package cn.ljh.storm.helloworld; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class ExclamationTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 1); //指定消息ID 为word builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word"); //指定分发策略,由spout(wordID)进行下发 builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim"); //指定分组策略 Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); //随机NAME } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test3", conf, builder.createTopology()); Utils.sleep(20000); cluster.killTopology("test3"); cluster.shutdown(); } } }
原文:https://www.cnblogs.com/tangsonghuai/p/11123846.html