欢迎转载,转载请注明出处,徽沪一郎。
本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。
在进行后续操作前,确保下列条件已满足。
local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME
MASTER=local bin/spark-shell
"MASTER=local"就是表明当前运行在单机模式
local cluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下
$SPARK_HOME/sbin/start-master.sh
注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。
master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示
export SPARK_MASTER_IP=localhost export SPARK_LOCAL_IP=localhost
bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M
worker启动完成,连接到master。打开maser的web ui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080
MASTER=spark://localhost:7077 bin/spark-shell
如果一切顺利,将看到下面的提示信息。
Created spark context.. Spark context available as sc.
可以用浏览器打开localhost:4040来查看如下内容
上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码
scala>sc.textFile("README.md").filter(_.contains("Spark")).count
上述代码统计在README.md中含有Spark的行数有多少
Spark布置环境中组件构成如下图所示。
Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。
job生成的简单流程如下
调用路径大致如下
代码片段executor.lauchTask
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内。
运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒。
Apache Spark源码走读之2 -- Job的提交与运行,布布扣,bubuko.com
Apache Spark源码走读之2 -- Job的提交与运行
原文:http://www.cnblogs.com/downtjs/p/3815283.html