Spark的内核部分主要从以下几个方面介绍:
任务调度系统、I/0模块、通信控制模块、容错模块、shuffle模块
接下来注意几个概念:
Application:用户自定义的Spark程序,用户提交后,Spark为App分配资源,将程序转换并执行。
Driver Program:运行Application的main()函数并创建SparkContext
RDD Graph:RDD是Spark的核心结构,可以通过一系列算子进行操作(主要有Transformation和Action操作)。当RDD遇到Action算子时,将之前的所有算子形成一个有向无环图(DAG),再在Spark中转化为Job,提交到集群执行。一个App中可以包含多个Job。
Job:一个RDD Graph触发的作业,往往由Spark Action算子触发,在SparkContext中通过runJob方法向Spark提交Job。
Stage:每个Job会根据RDD的宽依赖关系被切分成很多Stage,每个Stage中包含一组相同的Task,叫TaskSet
Task:一个分区对应一个Task,Task执行RDD中对应Stage中包含的算子,Task封装好之后放入Executor的线程池中执行。
作业执行流程:
1)用户启动客户端,之后客户端运行用户程序,启动Driver进程。在Driver中启动或实例化DAGScheduler等组件。客户端的Driver向Master注册。
2)Worker向Master注册(要确保master有活节点可用,或通过心跳机制向master汇报worker节点还活着),Master命令Worker启动Exeuctor。Worker通过创建ExecutorRunner线程,在ExecutorRunner线程内部启动ExecutorBackend进程。
3)ExecutorBackend启动后,向客户端Driver进程内的SchedulerBackend注册。而SchedulerBackend收到后,会向Worker启动LaunchTask进程,Worker开始执行任务。
4)Driver中的SchedulerBackend进程中包含DAGScheduler进程、TaskScheduler进程。SchedulerBackend收到注册后,DAGScheduler会根据RDD DAG切分成相应的Stage,每个Stage中包含的TaskSet通过TaskScheduler分配给Executor。Executor启动线程池并行化执行Task。
1)在最左边通过一系列的Transformation算子和Actions算子,形成DAG图,传递给DAGScheduler,
2)然后DAGScheduler把其切分成一系列的Stage,即形成TaskSet任务集合,每个TaskSet中包含多个任务。然后把TaskSet传递给TaskScheduler
3)TaskScheduler收到后,然后把具体任务分配给相应的Worker节点进行计算。
sc.textFile(hdfs://....).flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_).saveAsFile(hdfs://...) 化简为: sc.textFile(hdfs://....).flatMap(_.split(" ")).map(_,1).reduceByKey(_+_).saveAsFile(hdfs://...) 因为默认得出的结果是没有序的,所以想要得到排序的结果: sc.textFile(hdfs://....).flatMap(_.split(" ")).map(_,1).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsFile(hdfs://...)
上述代码经过了HadoopRDD-->MappedRDD-->flatMappedRDD-->MappedRDD-->PairRDDFunctions-->ShuffledRDD-->MappartitionsRDD-->MappedRDD-->SortedRDD-->MappedRDD-->HadoopRDD
HadoopRDD-->MappedRDD: sc.textFile(hdfs://....) MappedRDD-->flatMappedRDD: flatMap(_.split(" ")) flatMappedRDD-->MappedRDD: flatMap(_.split(" ")).map(_,1) MappedRDD-->PairRDDFunctions-->ShuffledRD: map(_,1).reduceByKey(_+_) ShuffledRDD-->MappartitionsRDD-->MappedRDD:reduceByKey(_+_).map(x=>(x._2,x._1)) MappedRDD-->SortedRDD: map(x=>(x._2,x._1)).sortByKey(false) SortedRDD-->MappedRDD: sortByKey(false).map(x=>(x._2,x._1)) MappedRDD-->HadoopRDD: map(x=>(x._2,x._1)).saveAsFile(hdfs://...)
原文:http://www.cnblogs.com/liuzhongfeng/p/5289169.html