声明:本文基于spark的programming guide,并融合自己的相关理解整理而成
val conf = new SparkConf().setAppName("BasicRDDApp").setMaster("local[4]")
//spark://host:port
val sc = new SparkContext(conf)
/**
* parallelized collections
* 将scala的集合数据,并行化成为能够并行计算的分布式数据集
*/
val data = 1 to 1000 toArray
val distData = sc.parallelize(data,10)
//后面的数字是表示将集合切分成多少个块 ,通常是一个CPU 2-4块,通常spark可以自动帮你切分
val sum = distData.reduce((a, b) => a+b )
//在reduce的时候才开始真正的执行,driver将任务分布到各个机器上,然后每个机器单独执行,将计算的结果返回到driver程序
println("sum " + sum)
/**
* 读取外部的数据源
* 1.Hadoop支持的数据源 ,例如HDFS,Cassandra,HBase ,Amazon S3
* ##如果文件地址是本地地址的话,那么他应该在集群的每个节点上都能够被访问(即:每个节点上都应该有同样的文件)
* ##textFile的第二个参数控制文件被切割的大小默认为64MB ,可以设置更大的但是不能设置更小的
*/
val distFile = sc.textFile("file:///usr/local/spark/README.md")
//接下来就可以进行相关的操作了
distFile.persist()//持久化
val len = distFile.map(s => 1).reduce((a, b) => a+b)
println(len)
val words = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey((a,b) => a+b)
//w => (v1+v2+v3+...)
//map => 1->1 , flatMap => 1 -> 0..n
print(words.count())
words foreach println
val twords = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).groupByKey()
//分组 w => (v1, v2, v3 ...)
twords foreach println
//.map(w => (w,1)).foreach(w => w._1);
val broadcastVar = sc.broadcast("string test")//broadcast variable is readonly
val v = broadcastVar.value
println(v)
val accum = sc.accumulator(0, "My Accumulator")//value and name
sc.parallelize(1 to 1000000).foreach(x => accum+= 1)
println(accum.name + ":" + accum.value)原文:http://blog.csdn.net/youmengjiuzhuiba/article/details/41245603