Spark源碼剖析(三):Executor啟動流程
來自專欄大數據框架知識
?# Executor啟動流程
在上面SparkContext進行資源調度後,只有主要的流程是Master和篩選出來的Worker進行通信,然後在Worker中啟動Executor進程。
Spark帶注釋源碼
對於整個Spark源碼分析系列,我將帶有注釋的Spark源碼和分析的文件放在我的GitHub上Spark源碼剖析
Executor啟動流程流程圖
源碼分析
Executor進程的啟動
這裡我們需要追蹤的方法是Master類中的schedule()方法內部的607行,這個方法的主要作用是向Worker發送消息,然後啟動Executor進程。
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) //TODO 記錄該Worker使用的資源 worker.addExecutor(exec) //TODO Master發送消息給Worker,把參數通過case class傳遞給Worker,讓他啟動Executor, worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) //TODO Master向ClientActor發送消息,告訴它Executor已經啟動了 exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
以上代碼的所做的工作是記錄和Master通信的Worker使用的資源,並且發送消息給Worker,最後Master向ClientActor發送的消息,告知Executor已經啟動了。
接著,我們將會看Worker的LaunchExecutor樣例類模式匹配,這個方法的作用是啟動Executor進程,代碼大約在Worker的335行。
//TODO Master發送給Worker的消息,讓Worker啟動Executor,LaunchExecutor是一個CaseClass裡面封裝了要啟動的Executor啟動的信息 case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => //TODO 判斷master是否是活著的Master if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executors working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. //TODO 為Executor創建目錄 val appLocalDirs = appDirectories.get(appId).getOrElse { Utils.getOrCreateLocalRootDirs(conf).map { dir => Utils.createDirectory(dir).getAbsolutePath() }.toSeq } appDirectories(appId) = appLocalDirs //TODO 創建一個ExecutorRunner對象,裡面包含應用的描述信息等通過這個對象啟動Executor val manager = new ExecutorRunner( appId,//TODO ApplicationID execId,//TODO ExecutorID appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),//TODO 裡面含有Driver的URL cores_,//TODO 所用核數 memory_,//TODO 所用內存 self, workerId, host,//TODO Worker Host webUi.boundPort, publicAddress, sparkHome, executorDir, akkaUrl,//TODO Worker的url conf, appLocalDirs, ExecutorState.LOADING) //TODO 將ExecutorID->Executor放入到HashMap中 executors(appId + "/" + execId) = manager //TODO 調用ExecutorRunner的start()方法來啟動Executor子進程 manager.start() coresUsed += cores_ memoryUsed += memory_ //TODO 給Master發送消息表示Executor的狀態已經改變 master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception => { logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None) } } }v
在這個模式匹配中,主要的做的是為Executor創建目錄,創建一個ExecutorRunner對象,裡面包含應用的描述信息等通過這個對象啟動Executor,
然後使用腳本命令啟動CoarseGrainedExecutorBackend。這裡我們將會查看這個CoarseGrainedExecutorBackend類的源碼。
下面是CoarseGrainedExecutorBackend的main()方法.
//TODO 啟動Executor子進程的入口 def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBufferURLvar argv = args.toListwhile (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") printUsageAndExit() }}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit()}//TODO 獲取必要的參數後,進程啟動run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
main()方法是啟動Executor子進程的入口,然後調用run()方法,這裡我們將會追蹤這個方法:
//TODO CoarseGrainedExecutorBackend真正進行通信的Actor env.actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores, userClassPath, env), name = "Executor") workerUrl.foreach { url => env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") }
CoarseGrainedExecutorBackend的生命周期方法
調用CoarseGrainedExecutorBackend的生命周期方法,在preStart()方法主要代碼如下:
//TODO 生命周期方法,首先和Driver建立連接 override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) //TODO Executor向Driver發送消息,註冊Executor driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
這個方法主要是和Driver進行通信,向Driver發送信息,註冊Executor我們這裡需要看的是DriverActor的代碼
//TODO Executor向DriverActor發送的消息 case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { //TODO DriverActor向Executor發送消息,告訴Executor註冊失敗 sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) //TODO DriverActor向Executor發送消息,告訴Executor註冊成功 sender ! RegisteredExecutor addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) //TODO 查看是否有任務需要提交(DriverActor->Executor) makeOffers() }
這裡進行一個判斷後,向Executor發送註冊成功後,然後調用makeOffers()查看是否有任務需要提交。這裡我們首先看DriverActor向Executor
向Executor發送消息,表示註冊成功,然後再次查看makeOffers()方法。
針對於RegisteredExecutor的,代碼如下:
//TODO Driver發送給Executor的消息告訴已經註冊成功 case RegisteredExecutor => logInfo("Successfully registered with driver") //TODO 這裡的hostname主要是DriverActor val (hostname, _) = Utils.parseHostPort(hostPort) //TODO 創建Executor對象用來執行業務邏輯 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
這裡主要是創建Executor對象用來執行業務邏輯,接下來我們看一下Executor的構造函數內部做了什麼?
//TODO 創建線程池 val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") //TODO 和Dirver進行通信,發送心跳信息,這是一個報活的操作 startDriverHeartbeater()
在Executor構造函數中,主要的工作是創建一個可變的線程池(實現是Java的Executors創建),然後調用startDriverHeartbeater()和Dirver進行通信,發送心跳信息,這是一個報活的操作。
接下來是makeOffers()方法,主要運行CoarseGrainedSchedulerBackend的launchTasks()方法。
//TODO 調用launchTasks向Executor提交Task def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }
這個方法主要是查看是否有任務需要提交(DriverActor->Executor)
百度腦圖關於Executor啟動
總結
創建Executor進程的總結:
1.Worker創建Executor進程,該進程的實現類其實是CoarseGrainedExecutorBackend
2.CoarseGrainedExecutorBackend向DriverActor註冊成功後創建Executor對象,內部有一個可變的線程池
3.執行makeOffers()方法,查看是否有任務需要提交
推薦閱讀:
※Retrofit原理解析最簡潔的思路
※ROS導航包源碼學習6 --- recovery
※又見Rx——Rx via UniRx
※源碼閱讀經驗談-slim,darknet,labelimg,caffe
※TiDB 源碼閱讀系列文章(四)Insert 語句概覽
TAG:Spark | ApacheSpark | 源碼閱讀 |