首页 > 其他 > 详细

15.RDD 创建

时间:2016-04-27 02:15:58      阅读:268      评论:0      收藏:0      [点我收藏+]

15课:RDD创建内幕

1.?????? RDD的创建方式

Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法

Spark中的基本方式:

1)?????? 使用程序中的集合创建

这种方式的实际意义主要用于测试。

2)?????? 使用本地文件系统创建

这种方式的实际意义主要用于测试大量数据的文件

3)?????? 使用HDFS创建RDD

这种方式为生产环境中最常用的创建RDD的方式

4)?????? 基于DB创建

5)?????? 基于NoSQL:例如HBase

6)?????? 基于S3(SC3)创建

7)?????? 基于数据流创建

2.?????? RDD创建实战

1)?????? 通过集合创建

代码:

object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100? //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_)? //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
? }
}

结果:

?

2)?????? 通过本地文件系统创建

代码:

object RDDBasedOnLocalFile {

def main (args: Array[String]) {

val conf = new SparkConf()//create SparkConf

conf.setAppName("RDDBasedOnCollection")//set app name

conf.setMaster("local")//run local

val sc =new SparkContext(conf)

val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")

val linesLength=rdd.map(line=>line.length())

val sum = linesLength.reduce(_+_)

println("the total characters of the file"+"="+sum)

? }

}

结果:

?

3)?????? 通过HDFS创建RDD

代码:

?val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)

结果:

?

?

?

?

?

关于spark并行度:

1.默认并行度为程序分配到的cpu core的数目

2.可以手动设置并行度,并行度最佳实践

???????? 1. 2-4 partitions for each CPU core

???????? 2.综合考虑cpu内存

?

注:本内容原型来自 IMP 课程笔记

如果技术上有什么疑问,欢迎加我QQ交流: 1106373297?

15.RDD 创建

原文:http://zhou-yuefei.iteye.com/blog/2293266

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!