Spark源碼剖析(三):Executor啟動流程

Spark源碼剖析(三):Executor啟動流程

來自專欄大數據框架知識

?# Executor啟動流程

在上面SparkContext進行資源調度後,只有主要的流程是Master和篩選出來的Worker進行通信,然後在Worker中啟動Executor進程。

Spark帶注釋源碼

對於整個Spark源碼分析系列,我將帶有注釋的Spark源碼和分析的文件放在我的GitHub上Spark源碼剖析

Executor啟動流程流程圖

源碼分析

Executor進程的啟動

這裡我們需要追蹤的方法是Master類中的schedule()方法內部的607行,這個方法的主要作用是向Worker發送消息,然後啟動Executor進程。

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) //TODO 記錄該Worker使用的資源 worker.addExecutor(exec) //TODO Master發送消息給Worker,把參數通過case class傳遞給Worker,讓他啟動Executor, worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) //TODO Master向ClientActor發送消息,告訴它Executor已經啟動了 exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }

以上代碼的所做的工作是記錄和Master通信的Worker使用的資源,並且發送消息給Worker,最後Master向ClientActor發送的消息,告知Executor已經啟動了。

接著,我們將會看Worker的LaunchExecutor樣例類模式匹配,這個方法的作用是啟動Executor進程,代碼大約在Worker的335行

//TODO Master發送給Worker的消息,讓Worker啟動Executor,LaunchExecutor是一個CaseClass裡面封裝了要啟動的Executor啟動的信息 case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => //TODO 判斷master是否是活著的Master if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executors working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. //TODO 為Executor創建目錄 val appLocalDirs = appDirectories.get(appId).getOrElse { Utils.getOrCreateLocalRootDirs(conf).map { dir => Utils.createDirectory(dir).getAbsolutePath() }.toSeq } appDirectories(appId) = appLocalDirs //TODO 創建一個ExecutorRunner對象,裡面包含應用的描述信息等通過這個對象啟動Executor val manager = new ExecutorRunner( appId,//TODO ApplicationID execId,//TODO ExecutorID appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),//TODO 裡面含有Driver的URL cores_,//TODO 所用核數 memory_,//TODO 所用內存 self, workerId, host,//TODO Worker Host webUi.boundPort, publicAddress, sparkHome, executorDir, akkaUrl,//TODO Worker的url conf, appLocalDirs, ExecutorState.LOADING) //TODO 將ExecutorID->Executor放入到HashMap中 executors(appId + "/" + execId) = manager //TODO 調用ExecutorRunner的start()方法來啟動Executor子進程 manager.start() coresUsed += cores_ memoryUsed += memory_ //TODO 給Master發送消息表示Executor的狀態已經改變 master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception => { logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None) } } }v

在這個模式匹配中,主要的做的是為Executor創建目錄,創建一個ExecutorRunner對象,裡面包含應用的描述信息等通過這個對象啟動Executor,

然後使用腳本命令啟動CoarseGrainedExecutorBackend。這裡我們將會查看這個CoarseGrainedExecutorBackend類的源碼。

下面是CoarseGrainedExecutorBackend的main()方法.

//TODO 啟動Executor子進程的入口 def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBufferURLvar argv = args.toListwhile (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") printUsageAndExit() }}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit()}//TODO 獲取必要的參數後,進程啟動run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

main()方法是啟動Executor子進程的入口,然後調用run()方法,這裡我們將會追蹤這個方法:

//TODO CoarseGrainedExecutorBackend真正進行通信的Actor env.actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores, userClassPath, env), name = "Executor") workerUrl.foreach { url => env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") }

CoarseGrainedExecutorBackend的生命周期方法

調用CoarseGrainedExecutorBackend的生命周期方法,在preStart()方法主要代碼如下:

//TODO 生命周期方法,首先和Driver建立連接 override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) //TODO Executor向Driver發送消息,註冊Executor driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }

這個方法主要是和Driver進行通信,向Driver發送信息,註冊Executor我們這裡需要看的是DriverActor的代碼

//TODO Executor向DriverActor發送的消息 case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { //TODO DriverActor向Executor發送消息,告訴Executor註冊失敗 sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) //TODO DriverActor向Executor發送消息,告訴Executor註冊成功 sender ! RegisteredExecutor addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) //TODO 查看是否有任務需要提交(DriverActor->Executor) makeOffers() }

這裡進行一個判斷後,向Executor發送註冊成功後,然後調用makeOffers()查看是否有任務需要提交。這裡我們首先看DriverActor向Executor

向Executor發送消息,表示註冊成功,然後再次查看makeOffers()方法。

針對於RegisteredExecutor的,代碼如下:

//TODO Driver發送給Executor的消息告訴已經註冊成功 case RegisteredExecutor => logInfo("Successfully registered with driver") //TODO 這裡的hostname主要是DriverActor val (hostname, _) = Utils.parseHostPort(hostPort) //TODO 創建Executor對象用來執行業務邏輯 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

這裡主要是創建Executor對象用來執行業務邏輯,接下來我們看一下Executor的構造函數內部做了什麼?

//TODO 創建線程池 val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") //TODO 和Dirver進行通信,發送心跳信息,這是一個報活的操作 startDriverHeartbeater()

在Executor構造函數中,主要的工作是創建一個可變的線程池(實現是Java的Executors創建),然後調用startDriverHeartbeater()和Dirver進行通信,發送心跳信息,這是一個報活的操作

接下來是makeOffers()方法,主要運行CoarseGrainedSchedulerBackend的launchTasks()方法。

//TODO 調用launchTasks向Executor提交Task def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }

這個方法主要是查看是否有任務需要提交(DriverActor->Executor)

百度腦圖關於Executor啟動

總結

創建Executor進程的總結:

1.Worker創建Executor進程,該進程的實現類其實是CoarseGrainedExecutorBackend

2.CoarseGrainedExecutorBackend向DriverActor註冊成功後創建Executor對象,內部有一個可變的線程池

3.執行makeOffers()方法,查看是否有任務需要提交


推薦閱讀:

Retrofit原理解析最簡潔的思路
ROS導航包源碼學習6 --- recovery
又見Rx——Rx via UniRx
源碼閱讀經驗談-slim,darknet,labelimg,caffe
TiDB 源碼閱讀系列文章(四)Insert 語句概覽

TAG:Spark | ApacheSpark | 源碼閱讀 |