首页 > 其他 > 详细

updateStateByKey--word count

时间:2015-09-04 22:46:22      阅读:311      评论:0      收藏:0      [点我收藏+]

http://blog.selfup.cn/619.html

private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
    StreamingExamples.setStreamingLogLevels();
 
    JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
          "JavaNetworkWordCount", new Duration(10000));
    jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint
    //打开本地的端口9999
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    //按行输入,以空格分隔
    JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)));
    //每个单词形成pair,如(word,1)
    JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
    //统计并更新每个单词的历史出现次数
    JavaPairDStream<String, Integer> counts = pairs.updateStateByKey((values, state) -> {
        Integer newSum = state.or(0);
        for(Integer i :values) {
            newSum += i;
        }
        return Optional.of(newSum);
    });
    counts.print();
    jssc.start();
    jssc.awaitTermination();
}




updateStateByKey--word count

原文:http://my.oschina.net/forrest420/blog/501172

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!