package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.sql.SQLOutput; import java.util.Arrays; /** * @author: yangchun * @description: * @date: Created in 2020-05-03 21:27 */ public class WordCountLocal { public static void main(String[] args) { //第一步:创建SparkConf对象,设置Spark应用的配置信息 //setMaster()可以设置Spark应用程序要连接的机器的master机器,设置为local表示本地运行 SparkConf conf = new SparkConf().setAppName("WordCountLocal") .setMaster("local"); //第二步:创建JavaSparkContext对象 /*在Spark,SparkContext是所有Spark所有功能的一个入口,你无论是java,scala,还是python编写的 都必须有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器 (DAGSchedule,TaskSchedule),还会去Spark Master节点进行注册,等等。Spark Context是Spark中 最重要的一个对象。不同类型的Spark应用程序,SparkContext不同 Java的SparkContext,就是JavaSparkContext Spark SQL程序,SQLContext,HiveContext Spark Streaming SparkContext Scala 就是SparkContext */ JavaSparkContext sc = new JavaSparkContext(conf); /* 第三步,针对输入源创建R初始RDD,输入源中的数据会打散,分配到RDD的每个partition中,形成一个分布式数据集 SparkContext根据本地文件创建RDD的方法叫做textFile(),Java中,创建的普通RDD,都叫做JavaRDD。RDD中有元素的 概念,如果hdfs和本地文件,创建的RDD每一个元素相当于文件里面的一行 */ JavaRDD<String> lines = sc.textFile("E:\\spark\\spark.txt"); /* 第四步,对初始RDD进行transformation操作,通过创建function,并配合RDD的map、flatMap等算子 来执行function,通常,如果比较简单,则创建指定的Function匿名内部类,如果比较复杂就定义一个类实现 function接口。 先将一行拆分成单词 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID= 1l; @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); /** * 将每一个单词映射成一个Tuple(单词,1),mapToPair与PairFunction配合使用。第一个参数是输入,第二个,第三个是 * Tuple的组成 */ JavaPairRDD<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word,1); } }); /** * 接着对所有Tuple进行reduce操作,相当于将根据key对所有tuple进行值的累加 */ JavaPairRDD<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); /** * 最后Spark程序中光有transformation操作,是不行的必须哟action操作,不会执行,可以用foreach操作来触发程序执行 */ wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1+" appeared "+wordCount._2+" times"); } }); sc.close(); } }
原文:https://www.cnblogs.com/xiaofeiyang/p/12823910.html