Spark源碼剖析(五):Task提交流程

Spark源碼剖析(五):Task提交流程

來自專欄大數據框架知識

?# Task提交流程

在劃分Stage之後,在對Task進行封裝成為TaskSet然後提交給TaskScheduler。

Spark帶注釋源碼

對於整個Spark源碼分析系列,我將帶有注釋的Spark源碼和分析的文件放在我的GitHub上Spark源碼剖析

提交流程源碼解析

提交TaskSet

查看TaskSchedulerImpl的160行,可以看到submitTasks()方法,主要代碼如下:

//TODO 該方法提交TaskSet override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) } hasReceivedTask = true}//TODO CoarseGrainedSchedulerBackend類的reviveOffers()backend.reviveOffers()

這裡主要的的方法是CoarseGrainedSchedulerBackend類的reviveOffers()。

CoarseGrainedSchedulerBackend的reviveOffers()

這裡主要是向向DriverActor發送消息//TODO 向DriverActor發送消息 override def reviveOffers() { driverActor ! ReviveOffers }

CoarseGrainedSchedulerBackend中DriverActor的receiveWithLogging()

DriverActor類中的receiveWithLogging()進行模式匹配

//TODO 進行模式匹配,調用makeOffers()向Executor提交Taskcase ReviveOffers => makeOffers()

makeOffers()方法向Executor提交Task

Executor運行Task

makeOffers()方法的主要代碼如下:

//TODO 調用launchTasks向Executor提交Task def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }

這裡調用launchTasks(),代碼主要的流程是:

//TODO def launchTasks(tasks: Seq[Seq[TaskDescription]]) { //TODO Task是一個一個發送給Executor的 for (task <- tasks.flatten) { //TODO 首先拿到序列化器 val ser = SparkEnv.get.closureSerializer.newInstance() //TODO 將Task序列化用來進行網路傳輸 val serializedTask = ser.serialize(task) //TODO 進行大小判斷 if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSet.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { //TODO 這是一個HashMap val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK //TODO 向Executor發送序列化好的Task executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } }

這裡做的工作主要是迭代TaskSet然後一個一個的取出Task進行序列化之後向Executor發送序列化好的Task

Executor執行Task

CoarseGrainedExecutorBackend的模式匹配,主要是DriverActor發送數據給Executor的信息

//TODO Driver 發送數據給Executor的信息 case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { //TODO 拿到序列化器 val ser = env.closureSerializer.newInstance() //TODO 將Task反序列化 val taskDesc = ser.deserializeTaskDescription logInfo("Got assigned task " + taskDesc.taskId) //TODO 將反序列化的Task放入線程池執行 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }

這裡做的工作是拿到序列化器,將Task反序列化,將反序列化的Task放入線程池執行

下面是Executor的launchTask()方法,主要的邏輯是將創建一個TaskRunner對象將Task的信息封裝信息然後使用線程池執行

//TODO Executor執行Task def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) { //TODO 創建一個TaskRunner對象將Task的信息封裝信息 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) //TODO runningTasks是一個ConcurrentHashMap保證線程安全 runningTasks.put(taskId, tr) //TODO 使用線程池執行 threadPool.execute(tr) }

總結

1.提交Task主要是迭代TaskSet一個一個的取出Task進行序列化之後向Executor發送序列化好的Task

2.Executor執行Task,創建一個TaskRunner對象將Task的信息封裝信息然後使用線程池執行

推薦閱讀:

數據科學家 (Data Scientist) 的日常工作內容包括什麼?
今日數據行業日報(2016.12.01)
2017下半年8場雲計算相關會議合集 聚焦大數據未來!
小豬數據-新零售數據服務商,2018年創世
滴滴拼車路徑的優化

TAG:源代碼 | Spark | 大數據 |