1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。
4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)
进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
Spark Core提供了三种创建RDD的方式:
1.使用程序中的集合创建RDD(主要用于测试)
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
2.使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)
SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate(); JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();
3.使用HDFS文件创建RDD(生产环境的常用方式)
SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate(); JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
使用HDFS文件创建RDD对比使用本地文件创建RDD,需要修改的,只有两个地方:
第一,将SparkSession对象的master("local")方法去掉
第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件
Spark支持两种RDD操作:transformation和action。
transformation操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。
常用的transformation介绍:
map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。
filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。
groupByKey:根据key进行分组,每个key对应一个Iterable<value>。
reduceByKey:对每个key对应的value进行reduce操作。
sortByKey:对每个key对应的value进行排序操作。
join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。
cogroup:同join,但是每个key对应的Iterable<value>都会传入自定义函数进行处理。
sparkRDD算子:
map与flatmap的区别:扁平化
map函数:会对每一条输入进行指定func操作,然后为每一条输入返回一个对象
flatmap函数:先进行map映射,然后在flatten(进行扁平化操作)
reducebykey算子:
首先会触发shuffle,会进行两次聚合操作
1,按照key将数据放到一起(本地聚合--Shuffle Write)
2,将相同key的数据聚合(全局聚合--Shuffle Reader)
action操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。
常用的action介绍:
reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。
count:获取RDD元素总数。
take(n):获取RDD中前n个元素。
saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。
countByKey:对每个key对应的值进行count计数。
foreach:遍历RDD中的每个元素。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。但是cache()或者persist()的使用是有规则的,必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以。
如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的,而且会报错,大量的文件会丢失。
val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。
通用的持久化级别的选择建议:
1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。
2、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。
3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。
Spark提供了两种共享变量:Broadcast Variable(广播变量)和Accumulator(累加变量)。
BroadcastVariable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。广播变量是只读的。
val factor = 3 val broadcastVars = sc.broadcast(factor); val numberList = Array(1,2,3,4,5) val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //广播变量读值broadcastVars.value
Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
val numberList = Array(1,2,3,4,5) val numberRDD = sc.parallelize(numberList,1) val sum = sc.accumulator(0) numberRDD.foreach{m => sum += m}
1、对文本文件内的每个单词都统计出其出现的次数。
2、按照每个单词出现次数的数量,降序排序。
scala编写:
package cn.spark.study.core import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("WordCount").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\\spark.txt") val words = lines.flatMap{line => line.split(" ")} val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _) val countWord = wordCounts.map{word =>(word._2,word._1)} val sortedCountWord = countWord.sortByKey(false) val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)} sortedWordCount.foreach(s=> { println("word \""+s._1+ "\" appears "+s._2+" times.") }) spark.stop() } }
1、按照文件中的第一列排序。
2、如果第一列相同,则按照第二列排序。
scala编写:
class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{ override def compare(that: SecondSortKey): Int = { if(this.first - that.first !=0){ this.first-that.first }else{ this.second-that.second } } } object SecondSort { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\\sort.txt") val pairs = lines.map{line => ( new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line )} val sortedParis = pairs.sortByKey() val sortedLines = sortedParis.map(pairs => pairs._2) sortedLines.foreach(s => println(s)) spark.stop() } }
对每个班级内的学生成绩,取出前3名。(分组取topn)
1.创建初始RDD
2.对初始RDD的文本行按空格分割,映射为key-value键值对
3.对键值对按键分组
4.获取分组后每组前3的成绩:
5.打印输出
以下是使用scala实现:
object GroupTop3 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate() //创建初始RDD val lines = spark.sparkContext.textFile("D:\\score.txt") //对初始RDD的文本行按空格分割,映射为key-value键值对 val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt)) //对pairs键值对按键分组 val groupedPairs = pairs.groupByKey() //获取分组后每组前3的成绩 val top3Score = groupedPairs.map(classScores => { var className = classScores._1 //获取每组的成绩,将其转换成一个数组缓冲,并按从大到小排序,取其前三 var top3 = classScores._2.toBuffer.sortWith(_>_).take(3) Tuple2(className,top3) }) top3Score.foreach(m => { println(m._1) for(s <- m._2) println(s) println("------------------") }) } }
以上三个小案例都用Scala实现了,用到了Scala中的集合的操作、高阶函数、链式调用、隐式转换等知识,自己动手实现,对Scala有个比较好的理解和掌握。
原文:https://www.cnblogs.com/MrChenShao/p/11891076.html