流处理
package com.shujia.flink.core //导入隐式转换 import org.apache.flink.streaming.api.scala._ object Demo1StreamWordCount { def main(args: Array[String]): Unit = { /** * 构建flink环境 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 //并行度由数据量决定 // env.setParallelism(3) //读取socket,构建DS //nc -lk 8888 val lineDS: DataStream[String] = env.socketTextStream("master", 8888) //1、将单词切分 val wordDS: DataStream[String] = lineDS.flatMap(_.split(",")) //2、转换成kv格式 val kvDS: DataStream[(String, Int)] = wordDS.map((_, 1)) //3、按照key进行分组,底层也是hash分区 keyBy会产生shuffle val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1) //4、对value进行聚合 //sum 可以指定列名,也可指定下标 val countDS: DataStream[(String, Int)] = keyByDS.sum(1) //打印结果 countDS.print() //启动flink程序 env.execute() } }
批处理
package com.shujia.flink.core import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode object Demo2BatchWordCount { def main(args: Array[String]): Unit = { //创建flink batch环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment ///1、读取数据 //DataSet 相当于rdd val linesDS: DataSet[String] = env.readTextFile("data/words.txt") //将单词拆分 val countDS: AggregateDataSet[(String, Int)] = linesDS .flatMap(_.split(",")) .map((_, 1)) .groupBy(0) .sum(1) // countDS.print() //保存数据 countDS.writeAsText("data/count", WriteMode.OVERWRITE) //启动 env.execute() /** * 批处理:如果打印结果,不需要启动,因为里面封装了 * * 如果保存结果,需要启动 * */ } }
原文:https://www.cnblogs.com/lipinbigdata/p/15100639.html