首页 > 其他 > 详细

storm单词计数 本地执行

时间:2015-08-07 23:57:33      阅读:456      评论:0      收藏:0      [点我收藏+]



import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;


import org.apache.commons.io.FileUtils;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import cn.crxy.storm.LocalStormTopology.SumBolt;


public class WordcountStormTopology {

public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;

/**
* 在本实例运行的时候被调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 死循环调用 心跳
*/

public void nextTuple() {
//获取指定文件夹下面所有的文件
Collection<File> files = FileUtils.listFiles(new File("D:\\test"), new String[]{"txt"}, true);
for (File file : files) {
try {
//解析每一个文件的每一行
List<String> readLines = FileUtils.readLines(file);

for (String line : readLines) {
//把每一行数据发送出去
this.collector.emit(new Values(line));
}

//重命名  防止多次读
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {

e.printStackTrace();
}
}
}
/**
* 声明字段名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//fields就是field的列表
declarer.declare(new Fields("line"));
}
}

public static class SpiltBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
/**
* 只会被调用一次
*/
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
/**
* 死循环,循环的获取上一级发送过来的数据(spout/bolt)
*/
public void execute(Tuple input) {
//获取tuple发来数据
String line = input.getStringByField("line");
//对每一行数据进行切割
String[] words = line.split("\t");
for (String word : words) {
//把切割的单词发送到下一个bolt
this.collector.emit(new Values(word));
}
}


public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class CountBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
/**
* 只会被调用一次
*/
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
/**
* 死循环,循环的获取上一级发送过来的数据(spout/bolt)
*/
Map<String, Integer> map = new HashMap<String, Integer>();
public void execute(Tuple input) {
//获取tuple中发送来的数据
String word = input.getStringByField("word");
Integer value = map.get(word);
if(value==null){
value=0;
}
value++;
//把数据保存到一个map对象中
map.put(word, value);
//把结果写出去
System.err.println("===============================");
for (Entry<String, Integer> entry : map.entrySet()) {
System.err.println(entry);
}
}


public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}


public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new DataSourceSpout());
builder.setBolt("2", new SpiltBolt()).shuffleGrouping("1");
builder.setBolt("3", new CountBolt()).shuffleGrouping("2");

LocalCluster localCluster = new LocalCluster();

Config config = new Config();
localCluster.submitTopology(WordcountStormTopology.class.getSimpleName(), config, builder.createTopology());


}


}

版权声明:本文为博主原创文章,未经博主允许不得转载。

storm单词计数 本地执行

原文:http://blog.csdn.net/u010220089/article/details/47347367

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!