標籤:

spark源碼分析--任務執行

前面我們提到了spark任務提交時操作流程,本次想給大家講講spark接受任務請求後,怎麼運行?

在driver運行時,會做這麼幾件事兒:

  1. 分析提交上來得RDD的前後左右之間的依賴關係,就像一個人參加工作,用人單位需要對你進行背景調查,查一查你是不是渣男、在一線城市又沒喲房產,曾今禍害多少良家婦女等等,Spark系統也是一樣,先要了解RDD的七大姑八大姨的情況;
  2. 根據RDD的依賴關係,會生成RDD DAG,這就好比把你的家庭關係和背景都了解清楚;
  3. 依據生成的DAG關係分割,分割的目的是將要處理的RDD,按照stage的先後順尋進行處理;
  4. 將確認好的stage,生成相對應的task,然後這些task任務會被分發給所有的work節點進行數據處理;

寬依賴和窄依賴

在spark計算RDD的過程中,就會存在當前RDD的計算需要依賴前一個或多個RDD的計算支持,這樣的話,我們就會把這個階段稱為RDD的寬依賴和窄依賴。

寬依賴:

簡單說就是:子RDD需要依賴全部的父RDD

窄依賴:

簡單說:子RDD只需要部分父RDD

依賴性分析(handleJobSubmitted、submitStage)

handleJobSubmitted和submitStage主要負責依賴性分析的。

只不過handleJobSubmited 會生成一個finalStage,通過finalStage生成ActiveJob。

private[spark] class ActiveJob(nval jobId: Int,n val finalStage: Stage,n val callSite: CallSite,n val listener: JobListener,n val properties: Properties) {nn/**n * Number of partitions we need to compute for this job. Note that result stages may not needn * to compute all partitions in their target RDD, for actions like first() and lookup().n */n val numPartitions = finalStage match {ncase r: ResultStage => r.partitions.lengthncase m: ShuffleMapStage => m.rdd.partitions.lengthn }nn/** Which partitions of the stage have finished */n val finished = Array.fill[Boolean](numPartitions)(false)nnvar numFinished = 0n}n

submitStage則是查看系統運行過程中是不是所有stage是否都已經完成,如果還沒有完成的stage,則需要繼續完成所以來的stage,如果完成了則提交自身的stage。

Akka

spark是通過Akka進行消息傳遞,而且是共享內存。具體內容大家可以看看Akka的通信原理

任務的創建和分發

當在driver階段我們把所有的stage對應的task分配好,接下來就是把task分發給不同的worker節點去執行。

我們知道hadoop在執行任務時會經過兩個階段:

map階段和reduce階段,同樣的道理在spark運算的過程中也會經歷這兩個階段。

當Task進入到executor中後,task的執行分為shuffeMapTask階段和ResultTask階段。

在shuffleMapTask階段,在這個階段主要是負責創建Task,任務調度管理器就會通過

makeoffers去申請resourceOffers資源,資源的分配都是有resourceOffers來完成的。

private[spark] trait SchedulerBackend {nprivate val appId = "spark-application-" + System.currentTimeMillisnndef start(): Unitndef stop(): Unitndef reviveOffers(): Unitndef defaultParallelism(): Intnndef killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =nthrow new UnsupportedOperationExceptionndef isReady(): Boolean = truenn /**n * Get an application ID associated with the job.n *n * @return An application IDn */n def applicationId(): String = appIdnn/**n * Get the attempt ID for this run, if the cluster manager supports multiplen * attempts. Applications run in client mode will not have attempt IDs.n *n * @return The application attempt id, if available.n */n def applicationAttemptId(): Option[String] = Nonenn/**n * Get the URLs for the driver logs. These URLs are used to display the links in the UIn * Executors tab for the driver.n * @return Map containing the log names and their respective URLsn */n def getDriverLogUrls: Option[Map[String, String]] = Nonen

而真正的任務執行則是通過SchedulerBackend來完成的,他會對map端輸出的消息進行序列

化,目的是便於在IO中進行傳輸,其加密的對象是TaskDescription。

Excutor接受序列化的數據,調用LaunchTask對消息進行反序列化。

// Launch tasks returned by a set of resource offersnprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {nfor (task <- tasks.flatten) {nval serializedTask = ser.serialize(task)nif (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {n scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>ntry {nvar msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +n"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +n"spark.akka.frameSize or using broadcast variables for large values."n msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,n AkkaUtils.reservedSizeBytes)n taskSetMgr.abort(msg)n } catch {ncase e: Exception => logError("Exception in error callback", e)n }n }n }nelse {nval executorData = executorDataMap(task.executorId)n executorData.freeCores -= scheduler.CPUS_PER_TASKn executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))n }n }n}n

接收到所執行的消息信息後,需要從數據存儲介質上獲取文件或數據,這樣就需要載入hdfs、本地文件系統的數據。

使用updateDependencies去

private void updateDependencies() {nif (dependencies != null) {ndependencies.clear();n final int n = properties.length;n for (int i = 0; i < n; i++) {nif (properties[i] == null) {nbreak;n }ndependencies.add(properties[i]);n }n }n}n

同時創建了一個臨時文件存放計算過程中的中間數據:

/**n * Creates a new directory in the specified directory, using the givenn * prefix to generate its name. The resulting {@code Path} is associatedn * with the same {@code FileSystem} as the given directory.n *n * <p> The details as to how the name of the directory is constructed isn * implementation dependent and therefore not specified. Where possiblen * the {@code prefix} is used to construct candidate names.n *n * <p> As with the {@code createTempFile} methods, this method is onlyn * part of a temporary-file facility. A {@link Runtime#addShutdownHookn * shutdown-hook}, or the {@link java.io.File#deleteOnExit} mechanism may ben * used to delete the directory automatically.n *n * <p> The {@code attrs} parameter is optional {@link FileAttributen * file-attributes} to set atomically when creating the directory. Eachn * attribute is identified by its {@link FileAttribute#name name}. If moren * than one attribute of the same name is included in the array then all butn * the last occurrence is ignored.n *n * @param dirn * the path to directory in which to create the directoryn * @param prefixn * the prefix string to be used in generating the directorys name;n * may be {@code null}n * @param attrsn * an optional list of file attributes to set atomically whenn * creating the directoryn *n * @return the path to the newly created directory that did not exist beforen * this method was invokedn *n * @throws IllegalArgumentExceptionn * if the prefix cannot be used to generate a candidate directory namen * @throws UnsupportedOperationExceptionn * if the array contains an attribute that cannot be set atomicallyn * when creating the directoryn * @throws IOExceptionn * if an I/O error occurs or {@code dir} does not existn * @throws SecurityExceptionn * In the case of the default provider, and a security manager isn * installed, the {@link SecurityManager#checkWrite(String) checkWrite}n * method is invoked to check write access when creating then * directory.n */npublic static Path createTempDirectory(Path dir,n String prefix,n FileAttribute<?>... attrs)nthrows IOExceptionn{nreturn TempFileHelper.createTempDirectory(Objects.requireNonNull(dir),n prefix, attrs);n}n

這樣的話,數據之前的準備工作已經完成。

下面就是map端和reduce端的計算任務了:

shuffleMapTask.runTask()方法:

override def runTask(context: TaskContext): MapStatus = {n// Deserialize the RDD using the broadcast variable.n val deserializeStartTime = System.currentTimeMillis()nval ser = SparkEnv.get.closureSerializer.newInstance()nval (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](n ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)n _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimenn metrics = Some(context.taskMetrics)nvar writer: ShuffleWriter[Any, Any] = nulln try {nval manager = SparkEnv.get.shuffleManagern writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)n writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])n writer.stop(success = true).getn } catch {ncase e: Exception =>ntry {nif (writer != null) {n writer.stop(success = false)n }n } catch {ncase e: Exception =>n log.debug("Could not stop writer", e)n }nthrow en }n}n

主要工作是將數據從文件系統裡面載入並轉換成另外的RDD形勢存儲在內存中,以供Reduce

階段的程序調用。

reduceTask階段:

(1)執行計算任務(任務狀態、中間結果和Metric data)

最後階段

就是將reduce計算完成的數據會寫hdfs或本地文件系統。


推薦閱讀:

Scala 在大數據處理方面有何優勢?
一般而言常見的Spark的性能瓶頸有哪些?
[譯]打造大數據產品:Shiny的Spark之旅

TAG:Spark |