Spark實戰(4)_Master原理剖析與源碼分析
主備切換機制原理剖析
Master可以配置兩個,Spark原生的standalone模式支持Master主備切換。
Spark Master主備切換可以基於兩種機制,一種是基於文件系統的,一種是基於ZooKeeper的,基於文件系統的主備切換機制,需要在Active Master掛掉之後,手動去切換到Standby Master上。基於ZooKeeper的主備切換機制,可以實現自動切換Master。
Master主備切換機制,就是在Active Master掛掉之後,切換到Standby Master會做哪些操作。
- Standby Master,使用持久化引擎去讀取持久化的storedApps、storedDrivers、storedWorkers。FileSystemPersistenceEngine,ZooKeeperPersistenceEngine。
- 判斷,如果storedApps、storedDrivers、storedWorkers有任何一個是非空的。
- 將持久化的Application、Driver、Worker的信息,重新進行註冊,註冊到Master內部的內存緩存結構中。
- 將Application和Worker的狀態都修改為UNKNOWN,然後向Application所對應的Driver,以及Worker發送Standby Master的地址。
- Driver和Worker,理論上來說,如果它們目前都是正常在運作的話,那麼在接收到Master發送來的地址之後,就會返迴響應消息給新的Master。
- 此時,Master在陸續接收到Driver和Worker發送來的響應消息之後,會使用completeRecovery()方法對沒有發送響應消息的Driver和Worker進行處理,過濾掉它們的信息。
- 調用Master自己的schedule()方法,對正在等待資源配置的Driver和Application進行調度,比如在某個worker上啟動Driver,或者為Application在Worker上啟動它需要的Executor。
主備切換機制源碼分析
org/apache/spark/deploy/master/Master.scala,completeRecovery方法。
/** * 完成Master的主備切換,就是完成Master的恢復 */ def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { if (state != RecoveryState.RECOVERING) { return } state = RecoveryState.COMPLETING_RECOVERY } // 將Application和Worker,過濾出來目前狀態還是UNKNOWN的 // 然後遍歷,分別調用removeWorker和finishApplication方法,對可能已經出故障, // 或者甚至已經死掉的Application和Worker,進行清理 // 總結一下清理的機制,三點:1、從內存緩存結構中移除; // 2、從相關的組件的內存緩存中移除; // 3、從持久化存儲中移除。 // Kill off any workers and apps that didnt respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Reschedule drivers which were not claimed by any workers drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") }
註冊機制原理剖析
Worker的註冊,
- Worker,在啟動之後,就會主動向Master進行註冊。
- Master,過濾,將狀態為DEAD的Worker過濾掉,對於狀態為UNKNOWN的Worker,清理掉舊的Worker信息,替換為新的Worker信息。
- 把Worker加入內存緩存中(HashMap)。
- 用持久化引擎,將Worker信息進行持久化(文件系統、ZooKeeper)。
- 調用schedule()方法。
Driver的註冊,
- Driver,用spark-submit提交spark Application,首先就會註冊Driver。
- 將Driver信息放入內存緩存中(HashMap)。
- 加入等待調度隊列(ArrayBuffer)。
- 用持久化引擎,將Driver信息持久化。
- 調用schedule()進行調度。
Application的註冊,
- Driver啟動好了,執行我們編寫的Application,執行SparkContext初始化,底層的SparkDeploySchedulerBackend,會通過AppClient內部的線程,ClientActor,發送RegisterApplication,到Master,進行Application的註冊。
- 將Application信息放入內存緩存(HashMap)。
- 將Application加入等待調度的Application隊列(ArrayBuffer)。
- 用持久化引擎將Application信息持久化。
- 調用schedule()方法,進行資源調度。
Application註冊的源碼分析
org/apache/spark/deploy/master/Master.scala,RegisterApplication樣例類。
/** * 處理Application註冊的請求 */ case RegisterApplication(description) => { // 如果master的狀態是standby,也就是當前這個master,是standBy Master,不是Active Master, // 那麼Application來請求註冊,什麼都不會幹 if (state == RecoveryState.STANDBY) { // ignore, dont send response } else { logInfo("Registering app " + description.name) // 用ApplicationDescription信息,創建ApplicationInfo val app = createApplication(description, sender) // 註冊Application // 將ApplicationInfo加入緩存,將Application加入等待調度的隊列waitingApps registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) // 使用持久化引擎,將Application進行持久化 persistenceEngine.addApplication(app) // SparkDeploySchedulerBackend創建的AppClient通過ClientActor線程向Master Actor發送註冊請求, // ClientActor會把自己的引用給帶過來,用sender來命名, // Master就知道RegisterApplication這個消息是誰給我發過來的, // 反向,向SparkDeploySchedulerBackend的AppClient的ClientActor,發送消息, // 也就是RegisteredApplication sender ! RegisteredApplication(app.id, masterUrl) schedule() } }def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } applicationMetricsSystem.registerSource(app.appSource) // 將app的信息加入內存緩存中 apps += app idToApp(app.id) = app actorToApp(app.driver) = app addressToApp(appAddress) = app // 將app加入等待調度的隊列waitingApps waitingApps += app }
Master狀態改變處理機制源碼分析
Driver的狀態改變,package org.apache.spark.deploy.master,DriverStateChanged。
case DriverStateChanged(driverId, state, exception) => { state match { // 如果Driver的狀態是錯誤、完成、被殺掉、失敗 // 那麼就移除Driver。 case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } }def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) { // 用scala的find()高階函數,找到driverId對應的driver drivers.find(d => d.id == driverId) match { // 如果找到了,Some,樣例類(Option) case Some(driver) => logInfo(s"Removing driver: $driverId") // 將driver從內存緩存中移除 drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } // 想completedDrivers中加入driver completedDrivers += driver // 使用持久化引擎去除driver的持久化信息 persistenceEngine.removeDriver(driver) // 設置driver的state、exception driver.state = finalState driver.exception = exception // 將driver所在的worker,移除dirver driver.worker.foreach(w => w.removeDriver(driver)) // 同樣,調用schedule()方法 schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
Executor的狀態改變,package org.apache.spark.deploy.master,ExecutorStateChanged。
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { // 找到executor對應的app,然後再反過來通過app內部的executors緩存獲取executor信息 val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { // 如果有值 case Some(exec) => { // 設置executor的當前狀態 val appInfo = idToApp(appId) exec.state = state if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } // 向driver同步發送ExecutorUpdated消息 exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 判斷,如果executor完成了 if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // 從app的緩存中移除executor appInfo.removeExecutor(exec) // 從運行executor的worker的緩存中移除executor exec.worker.removeExecutor(exec) // 判斷,如果executor的退出狀態是非正常的 val normalExit = exitStatus == Some(0) // Only retry certain number of times so we dont go into an infinite loop. if (!normalExit) { // 判斷application當前的重試次數,是否達到了最大值10, if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { // 重新進行調度 schedule() } else { // 否則,那麼就removeApplication操作 // executor反覆調度都是失敗,那麼就認為application也失敗了 val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } }
Master資源調度演算法原理剖析與源碼分析
org/apache/spark/deploy/master/Master.scala,schedule方法。
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule() { // 首先判斷,master狀態不是ALIVE的話,直接返回, // 也就是說,standby master是不會進行application等資源的調度的。 if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers // Random.shuffle的原理,就是對傳入的集合的元素進行隨機的打亂 // 取出workers中的所有之前註冊上來的worker,進行過濾,必須是狀態為ALIVE的worker // 對狀態為ALIVE的worker,調用Random的shuffle方法,進行隨機的打亂 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) // 獲取shuffledAliveWorkers的大小 val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 // 首先調度driver // 為什麼要調度driver,大家想一下,什麼情況下,會註冊driver,並且會導致driver被調度 // 其實,只有用yarn-cluster模式提交的時候,才會註冊driver,因為standalone和yarn-client模式, // 都會在本地直接啟動driver,而不會來註冊driver,就更不可能讓master來調度driver了。 // driver的調度機制 // 遍歷waitingDrivers ArrayBuffer for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 // numWorkersVisited小於numWorkersAlive // 就是說只要還有或者的worker沒有遍歷到,那麼就繼續進行遍歷, // 而且,當前這個driver還沒有被啟動,也就是launched為false while (numWorkersVisited < numWorkersAlive && !launched) { // 拿到一個活著的worker val worker = shuffledAliveWorkers(curPos) // 遍歷過的worker加1 numWorkersVisited += 1 // 如果當前這個worker的空閑內存量大於等於driver需要的內存 // 並且worker的空閑cpu數量,大於等於driver需要的cpu數量 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { // 啟動driver launchDriver(worker, driver) // 並且將driver從waitingDrivers隊列中移除,後面就不會調度它了,把launched設為true。 waitingDrivers -= driver launched = true } // 將指針指向下一個worker curPos = (curPos + 1) % numWorkersAlive // 這個driver去循環遍歷所有活著的worker,只要launched為true,表明當前這個driver已經在某個worker啟動。 } } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. // Application的調度機制(核心之核心) // Application的調度演算法有兩種,一種是spreadOutApps,另一種是非spreadOutApps // val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores // 首先遍歷waitingApps中的ApplicationInfo,並且過濾出還有需要調度的core的Application for (app <- waitingApps if app.coresLeft > 0) { // 從workers中,過濾出狀態為ALIVE的,再次過濾出可以被Application使用的Worker, // 然後,按照剩餘cpu數量倒序排序 // canUse,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length // 創建一個空數組,存儲了要分配給每個worker的cpu數量 val assigned = new Array[Int](numUsable) // Number of cores to give on each node // 獲取到底要分配多少cpu,取app剩餘要分配的cpu數量和worker總共可用cpu數量的最小值 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 通過這種演算法,其實會將每個application,要啟動的executor,都平均分布到各個worker上去 // 比如有20個cpu core要分配,那麼實際會循環兩遍worker,每次循環,給每個worker分配1個core // 最後每個worker分配了2個core // while條件,只要要分配的cpu,還沒分配完,就繼續循環 var pos = 0 while (toAssign > 0) { // 每一個worker,如果空閑的cpu數量,大於已經分配出去的cpu數量 // 也就是說,worker還有可分配的cpu if (usableWorkers(pos).coresFree - assigned(pos) > 0) { // 將總共要分配的cpu數量-1,因為這裡已經決定在這個worker上分配一個cpu了 toAssign -= 1 // 給這個worker分配的cpu數量,加1 assigned(pos) += 1 } // 指針移動到下一個worker pos = (pos + 1) % numUsable } // Now that weve decided how many cores to give on each node, lets actually give them // 給每個worker分配完application要求的cpu core之後 // 遍歷worker for (pos <- 0 until numUsable) { // 只要判斷之前給這個worker分配到了core if (assigned(pos) > 0) { // 首先,在application內部緩存結構中,添加executor // 並且創建ExecutorDesc對象,其中封裝了,給這個executor分配多個cpu core // 這裡至少是spark1.3.0版本的executor啟動的內部機制 // 在spark-submit腳本中,可以指定要多少個executor,每個executor多少個cpu,多少內存 // 基於我們的機制,實際上,最後,executor的實際數量,以及每個executor的cpu,可能與配置是不一樣的 // 因為,我們這裡是基於總的cpu來分配的,就是說,比如要求3個executor,每個要3個cpu, // 比如有9個worker,每個有1個cpu,那麼其實總共知道,要分配9個core,其實根據這種演算法, // 會給每個worker分配一個core,然後,給每個worker啟動一個executor吧,最後, // 會啟動9個executor,每個executor有1個cpu core val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) // 那麼就在worker上啟動executor launchExecutor(usableWorkers(pos), exec) // 將application的狀態設置為RUNNING app.state = ApplicationState.RUNNING } } } } else { // Pack each app into as few nodes as possible until weve assigned all its cores // 非spreadOutApps調度演算法 // 這種演算法和spreadOutApps演算法正好相反 // 每個application,都儘可能分配到盡量少的worker上去, // 比如總共有10個worker,每個有10個core,app總共要分配20個core,那麼,其實 // 只會分配到兩個worker上,每個worker都佔滿10個core,那麼其餘的app,就只能分配到下一個worker了。 // 所以,比方說,application,spark-submit里,配置的是要10個executor,每個要2個core,那麼總共是20個core, // 但是在這種演算法下,其實只會啟動2個executor,每個有10個core。 // 將每一個Application,儘可能少的分配到worker上去 // 首先遍歷worker,並且是狀態為ALIVE,還有空閑cpu的worker for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { // 遍歷application,並且是還有需要分配的core的application for (app <- waitingApps if app.coresLeft > 0) { // 判斷,如果當前這個worker可以被application使用 if (canUse(app, worker)) { // 取worker剩餘cpu數量,與app要分配的cpu數量的最小值 val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { // 給app添加一個executor val exec = app.addExecutor(worker, coresToUse) // 在worker上啟動executor launchExecutor(worker, exec) // 將application狀態修改為RUNNING app.state = ApplicationState.RUNNING } } } } } }
launchDriver方法,
def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) // 將driver鍵入worker內部的緩存結構 // 將worker內使用的內存和cpu數量,都加上driver需要的內存和cpu數量 worker.addDriver(driver) // 同時把worker加入到driver內部的緩存結構中 driver.worker = Some(worker) // 然後調用worker的actor,給它發送LaunchDriver,讓Worker來啟動Driver worker.actor ! LaunchDriver(driver.id, driver.desc) // 將driver的狀態設置為RUNNING driver.state = DriverState.RUNNING }
launchExecutor方法,
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) // 將executor加入worker內部的緩存 worker.addExecutor(exec) // 向worker的actor發送LaunchExecutor消息 worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) // 向executor對應的application的driver,發送ExecutorAdded消息 exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
本文首發於steem,感謝閱讀,轉載請註明。
https://steemit.com/@padluo
微信公眾號「padluo」,分享數據科學家的自我修養,既然遇見,不如一起成長。
http://weixin.qq.com/r/P3WGnj3E82CMrXn99yAt (二維碼自動識別)
讀者交流電報群
https://t.me/sspadluo
知識星球交流群
推薦閱讀:
※手工搭建 Spark 數據分析平台
※邏輯回歸演算法原理及Spark MLlib調用
※內存有限的情況下 Spark 如何處理 T 級別的數據?
※幾個Scala入門視頻教程
TAG:Spark |