首页 > 编程语言 > 详细

Spark本地运行的几个实例代码(Java实现)

时间:2021-03-11 19:09:42      阅读:21      评论:0      收藏:0      [点我收藏+]

Spark本地运行的几个实例代码(Java实现)

初学spark,用Java写了几个本地运行的spark实例代码,来记录一下已学的spark常用的算子的使用和处理逻辑,不涉及分布式集群。相关内容仅为自己的个人理解,如有错误还请指出。

实例一:词频数统计

问题描述

统计一个文本文件中的每个单词的出现次数,数据格式:
技术分享图片

过程分析

首先通过textFile()函数将文件读入JavaRDD,然后通过flatMap算子将每一行的数据进行分割,得到多个String,一行数据分割得到的多个String以Iterator的迭代器格式返回,返回之后的Iterator中的每一个String都会作为一个RDD。接着通过mapToPair算子给每一个word添加计数标记1(代表出现1次),该算子返回一个键值对RDD。最后通过reduceByKey算子根据相同的key对RDD进行reduce聚合操作,进行统计计数。

代码

public class SparkWordCount {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        //添加这一行则在本地运行,不添加这一行则默认在集群执行
        conf.setMaster("local");
        conf.setAppName("WordCount");
        //基本的初始化
        JavaSparkContext sc=new JavaSparkContext(conf);
        //创建String类型的RDD,并从本地文件中读取数据
        JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//通过文件读入创建RDD

        //flatMap()算子用来分割操作,将原RDD中的数据分成一个个片段
        //new FlatMapFunction<String, String>中的两个String分别表示输入和输出类型
        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
            	//通过Iterator迭代器可以将分割后的多个数据元素全部返回输出
                return Arrays.asList(line.split("\\s+")).iterator();
            }
        });

        //mapToPair()算子是用来对分割后的一个个片段结果添加计数标志的,如出现次数1,该函数用来创建并返回pair类型的RDD. new PairFunction<String, String, Integer>中分别是输入类型String和输出类型<String, Integer>.
        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1); //Tuple2是spark的二元数组类型,Java中没有
            }
        });

        //reduceByKey()算子是根据key来聚合,reduce阶段.new Function2<Integer, Integer, Integer>中分别是用来聚合的两个输入类型Integer,Integer和聚合后的输出类型Integer.
        JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        wordCountRDD.saveAsTextFile("E:\\result7");
    }

}

运行结果

技术分享图片

实例二:统计平均年龄

问题描述

java实现spark统计100万人口的平均年龄以及每个年龄的出现次数,数据格式为“序号 年龄”
技术分享图片
数据生成代码:

//生成年龄数据,格式“序号  年龄”
    private static void makeAgeData() throws IOException {
        File newFile = new File("src/main/files/peopleAges.txt");
        if (newFile.exists()){
            System.out.println("文件已存在!");
            return;
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        for (int i=1;i<=1000000;i++){
            fw.append(i+"  "+(rand.nextInt(100)+1)+"\n");
            fw.flush();
        }
        fw.close();
    }

过程分析

首先通过textFile()函数将文件数据读入RDD中。然后使用mapToPair()算子将每一行数据中的年龄作为key并对每一个年龄添加计数标记1作为value,接着使用reduceByKey算子对相同年龄值的数据进行聚合。
求平均年龄时首先要求出年龄和,这也是reduce聚合操作。但是要注意reduce算子只能接收单个数据元素组成的RDD作为输入,不能接收pair类型的RDD,所以对源文件读出的RDD先通过map算子输出只有年龄值数据的RDD,然后进行reduce()。

代码

public class AvgAge {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("AvgAge");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
        JavaRDD<String> fileRDD = sc.textFile("src/main/files/peopleAges.txt");
        JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(String s) throws Exception {
                return new Tuple2<>(Integer.parseInt(s.split("\\s+")[1]),1);
            }
        });

		//使用reduceByKey算子对具有相同年龄值的数据进行聚合,获取每个年龄值的人数
        JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        //求平均年龄
        //先通过map算子取出每个年龄值作为一个RDD。
        //reduce()函数的输入RDD不能是pair,只能是单个数据组成的RDD
        Integer ageSum = fileRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[1]);
            }
        }).reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer s, Integer s2) throws Exception {
                return s+s2;
            }
        });
        System.out.println("平均年龄:"+ageSum/fileRDD.count());

        ageCountRDD.saveAsTextFile("src/main/files/ageAnalysis");

    }

运行结果

技术分享图片

案例三:统计身高最值

问题描述

统计男女人数,并分别计算出男性和女性的最高和最低身高,数据格式“序号 M/F 身高”
技术分享图片
数据生成代码

	//生成性别身高数据,格式“序号  性别(M/F) 身高”
    private static void makeHeightData() throws IOException {
        File newFile = new File("src/main/files/heightData.txt");
        if (newFile.exists()){
            System.out.println("文件已存在!");
            return;
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        for (int i=1;i<=50000;i++){
            fw.append(i+"  M  "+(rand.nextInt(100)+100)+"\n");
            fw.append(i+"  F  "+(rand.nextInt(80)+100)+"\n");
            fw.flush();
        }
        fw.close();
    }

过程分析

首先通过textFile()函数将文件数据读入RDD。然后使用filter算子分别过滤出男性和女性数据,接着用map算子分割出身高值并将其转化成Integer类型,这样才能用于数字排序,然后使用sortBy算子排序。sortBy算子可以直接对RDD中的数据排序,不用区分key还是value。

代码

public class HeightMaxMin {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("HeightAnalysis");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("src/main/files/heightData.txt");
        //使用filter算子分别过滤出男性和女性数据
        JavaRDD<String> maleRDD = fileRDD.filter(new Function<String, Boolean>() {
            @Override
            //如果这行数据符合过滤条件则返回true
            public Boolean call(String s) throws Exception {
                return s.contains("M");
            }
        });
        JavaRDD<String> femaleRDD = fileRDD.filter(new Function<String, Boolean>() {
            @Override
            //如果这行数据符合过滤条件则返回true
            public Boolean call(String s) throws Exception {
                return s.contains("F");
            }
        });

        //使用map算子分割出身高并转化为整数类型,这样才能用排序
        JavaRDD<Integer> maleHeightRDD = maleRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[2]);
            }
        });
        JavaRDD<Integer> femaleHeightRDD = femaleRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s.split("\\s+")[2]);
            }
        });

        //使用sortBy算子排序
        JavaRDD<Integer> sortmaleHeightRDD = (JavaRDD<Integer>) maleHeightRDD.sortBy(new Function<Integer, Object>() {
            @Override
            //返回的是排序的内容,即对输入RDD中的哪一部分进行排序就输出哪一部分
            public Object call(Integer integer) throws Exception {
                return integer;
            }
        }, false,10);//false降序true升序,10是partition分区数,因为没有用集群所以也不太明白这个分区数具体指什么
        JavaRDD<Integer> sortfemaleHeightRDD = (JavaRDD<Integer>) femaleHeightRDD.sortBy(new Function<Integer, Object>() {
            @Override
            public Object call(Integer integer) throws Exception {
                return integer;
            }
        }, false,10);//第二个参数true/false是正序逆序,最后一个参数10是分区数

		//first()函数返回排名第一的数据
        System.out.println("男性: "+sortmaleHeightRDD.count()+"  "+sortmaleHeightRDD.first());
        System.out.println("女性: "+sortfemaleHeightRDD.count()+"  "+sortfemaleHeightRDD.first());

    }
}

运行结果

技术分享图片
技术分享图片

案例四:统计单词频率

问题描述

统计一段文本里出现频率最高的前k个词,注意单词不分大小写。

过程分析

首先从文件读入数据到RDD,然后使用flatMap算子对每一行的数据按照空格进行分割,并将所有的字母都转为小写,接着使用mapToPair算子对每一个单词添加计数标记1,然后使用reduceByKey算子对单词进行reduce聚合,为了根据key来排序,聚合后再使用mapToPair算子将得到的pair里面的key和value调换一下位置。然后使用sortByKey算子根据key来进行排序,最后使用take算子取出排名前5的数据。

代码

public class wordTopK {
    public static void main(String[] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("wordTopK");
        JavaSparkContext sc=new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("D:\\summer_study\\ppt\\hive.txt");
        //使用flatmap算子对每一行数据按空格分隔,并将所有的字母都转为小写
        //注意这里不用map是因为map只能输出一个数据元素,而flatMap可以在输入元素后添加任意多元素来输出,
        // 比如分割后的多个元素组成Iterator来输出,但是Iterator里的每一个元素依然是独立的RDD。
        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.toLowerCase().split("\\s+")).iterator();
            }
        });

        //使用maoToPair算子给每一个word加上计数标记1
        JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });

        //使用reduceByKey算子对word进行reduce聚合,为了根据key来排序,聚合后再将得到的pair里面的key和value调换一下
        JavaPairRDD<Integer, String> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
            }
        });

        //使用sortByKey算子排序,false降序,true升序
        JavaPairRDD<Integer,String> sortWordCountRDD = wordCountRDD.sortByKey(false);

        //使用take算子取出前5
        List<Tuple2<Integer,String>> result = sortWordCountRDD.take(5);
        System.out.println("结果:"+result.toString());
    }
}

运行结果

技术分享图片

一些总结:

  1. RDD就是spark中专用的一种数据格式,代表一种抽象数据类型,spark中的数据都是存在不同类型的RDD中,如JavaRDD<String>,JavaPairRDD<String,Integer>等。
  2. textFile读文件生成的RDD可以理解成源文件中一行一行的数据,每一行的数据就是一个JavaRDD.
  3. flatMap算子和map算子的区别:map算子就是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD,注意call返回的数据只能是单个数据元素。 flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。比如分割后的多个元素组成Iterator来输出,但是Iterator里的每一个元素依然是独立的RDD,这个Iterator只能由flatMap算子输出,map算子不可以,因为它只能输出单个数据元素。

运行环境

ide:Idea
jdk:1.8.0_121
spark:2.1.1
附maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ffxn</groupId>
    <artifactId>ffxn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

    </dependencies>

</project>

注:以上仅为个人学习记录,spark初学菜鸟,如有错误,敬请提出。
参考:
Spark 入门实战之最好的实例
Spark学习之路

Spark本地运行的几个实例代码(Java实现)

原文:https://www.cnblogs.com/zhenyanli/p/14519682.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!