在上篇博文《深入理解Spark 2.1 Core (五):Standalone模式运行的实现与源码分析》 中,我们讲到了如何启动Master和Worker,还讲到了如何回收资源。但是,我们没有将AppClient和Executor是如何启动的,其实它们的启动也涉及到了资源是如何调度的。这篇博文,我们就来讲一下Spark的资源调度,以及Executor启动后是如何执行Task的。
调用栈如下:
在Standalone模式下,SparkContext中的backend是StandaloneSchedulerBackend。在StandaloneSchedulerBackend.start中可以看到:
***
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
//创建AppClient
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//启动AppClient
client.start()
***
def start() {
//生成了ClientEndpoint,于是调用其onStart
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
调用registerWithMaster
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
private def registerWithMaster(nthRetry: Int) {
//向所有的Master注册当前App
//一旦成功连接的一个master,其他将被取消
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
}
//若达到最大尝试次数,则标志死亡,默认为3
else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
给Master发送RegisterApplication信号:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
Master.receive接收并处理RegisterApplication信号
case RegisterApplication(description, driver) =>
// 若之前注册过
if (state == RecoveryState.STANDBY) {
// 忽略
} else {
logInfo("Registering app " + description.name)
//创建app
val app = createApplication(description, driver)
//注册app
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//持久化
persistenceEngine.addApplication(app)
//回复RegisteredApplication信号
driver.send(RegisteredApplication(app.id, self))
//资源调度
schedule()
}
让我们深入来看下Master是如何注册app的。
先创建app:
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
//根据日期生成appId
val appId = newApplicationId(date)
//传入 时间,appId, 描述信息, 日期, driver, 默认核数,
//生成app信息
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
再注册app:
private def registerApplication(app: ApplicationInfo): Unit = {
//若已有这个app地址,
//则返回
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//向 applicationMetricsSystem 注册appSource
applicationMetricsSystem.registerSource(app.appSource)
//将app加入到 集合
//HashSet[ApplicationInfo]
apps += app
//更新 id到App
//HashMap[String, ApplicationInfo]
idToApp(app.id) = app
//更新 endpoint到App
// HashMap[RpcEndpointRef, ApplicationInfo]
endpointToApp(app.driver) = app
//更新 address到App
// HashMap[RpcAddress, ApplicationInfo]
addressToApp(appAddress) = app
// 加入到等待数组中
//ArrayBuffer[ApplicationInfo]
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}
case RegisteredApplication(appId_, masterRef) =>
//这里的代码有两个缺陷:
//1. 一个Master可能接收到多个注册请求,
// 并且回复多个RegisteredApplication信号,
//这会导致网络不稳定。
//2.若master正在变化,
//则会接收到多个RegisteredApplication信号
//设置appId
appId.set(appId_)
//编辑已经注册
registered.set(true)
//创建master信息
master = Some(masterRef)
//绑定监听
listener.connected(appId.get)
深入理解Spark 2.1 Core (六):资源调度与任务执行的实现与源码分析
原文:http://blog.csdn.net/u011239443/article/details/54098376