什么是Trident?
package trident.test.demo2; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import java.util.Arrays; import java.util.List; import java.util.Map; /** * User: jianwl * Date: 2016/2/24 * Time: 16:12 */ public class TridentSpoutDemo implements IBatchSpout{ private static final List<String> sentences = Arrays.asList( "the cow jumped over the moon", "the man went to the store and bought some candy", "four score and seven years ago"); private boolean complete = false; @Override public void open(Map map, TopologyContext topologyContext) { // nothing do } @Override public void emitBatch(long l, TridentCollector tridentCollector) { if(complete){ return; } for(int i=0; i< sentences.size() ; i++){ tridentCollector.emit(new Values(sentences.get(i))); } complete = true; } @Override public void ack(long l) { // nothing do } @Override public void close() { // nothing do } @Override public Map getComponentConfiguration() { return new Config(); } @Override public Fields getOutputFields() { return new Fields("sentence"); } }
创建Topology
public class TridentTopo { public static void main(String[] args) { TridentTopology topology = new TridentTopology(); topology.newStream("spout", new TridentSpoutDemo()) .each(new Fields("sentence"), new Split(), new Fields("word")) .each(new Fields("word"), new PrintFunction(),new Fields("field","count")); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("trident-demo2", config, topology.build()); } }
public class Split extends BaseFunction {
public Split() {
}
public void execute(TridentTuple tuple, TridentCollector collector) {
String[] arr$ = tuple.getString(0).split(" ");
int len$ = arr$.length;
for(int i$ = 0; i$ < len$; ++i$) {
String word = arr$[i$];
if(word.length() > 0) {
collector.emit(new Values(new Object[]{word}));
}
}
}
}
public class PrintFunction extends BaseFunction {
private final static Map<String,Integer> map = new HashMap<>();
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
String input = tridentTuple.getString(0);
if(!map.containsKey(input)){
map.put(input,1);
}else{
int count = map.get(input) + 1;
map.put(input,count);
}
System.out.println("result ==> "+map);
}
}
原文:http://www.cnblogs.com/liuyunAlex/p/5213715.html