深度剖析Spark分散式執行原理

原創文章,未經授權,不得轉載

讓代碼分散式運行是所有分散式計算框架需要解決的最基本的問題。

Spark是大數據領域中相當火熱的計算框架,在大數據分析領域有一統江湖的趨勢,網上對於Spark源碼分析的文章有很多,但是介紹Spark如何處理代碼分散式執行問題的資料少之又少,這也是我撰寫文本的目的。

Spark運行在JVM之上,任務的執行依賴序列化及類載入機制,因此本文會重點圍繞這兩個主題介紹Spark對代碼分散式執行的處理。本文假設讀者對Spark、Java、Scala有一定的了解,代碼示例基於Scala,Spark源碼基於2.1.0版本。閱讀本文你可以了解到:

  • Java對象序列化機制
  • 類載入器的作用
  • Spark對closure序列化的處理
  • Spark Application的class是如何載入的
  • Spark REPL(spark-shell)中的代碼是如何分散式執行的

根據以上內容,讀者可以基於JVM相關的語言構建一個自己的分散式計算服務框架。

Java對象序列化

序列化(Serialization)是將對象的狀態信息轉換為可以存儲或傳輸的形式的過程。所謂的狀態信息指的是對象在內存中的數據,Java中一般指對象的欄位數據。我們開發Java應用的時候或多或少都處理過對象序列化,對象常見的序列化形式有JSON、XML等。

JDK中內置一個ObjectOutputStream類可以將對象序列化為二進位數據,使用ObjectOutputStream序列化對象時,要求對象所屬的類必須實現java.io.Serializable介面,否則會報java.io.NotSerializableException的異常。

基本的概念先介紹到這。接下來我們一起探討一個問題:Java的方法能否被序列化?

假設我們有如下的SimpleTask類(Java類):

import java.io.Serializable;nnpublic abstract class Task implements Serializable {n public void run() {n System.out.println("run task!");n }n}nnpublic class SimpleTask extends Task {n @Overriden public void run() {n System.out.println("run simple task!");n }n}n

還有一個用於將對象序列化到文件的工具類FileSerializer:

import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}nnobject FileSerializer {nn def writeObjectToFile(obj: Object, file: String) = {n val fileStream = new FileOutputStream(file)n val oos = new ObjectOutputStream(fileStream)n oos.writeObject(obj)n oos.close()n }nn def readObjectFromFile(file: String): Object = {n val fileStream = new FileInputStream(file)n val ois = new ObjectInputStream(fileStream)n val obj = ois.readObject()n ois.close()n objn }n}n

簡單起見,我們採用將對象序列化到文件,然後通過反序列化執行的方式來模擬代碼的分散式執行。SimpleTask就是我們需要模擬分散式執行的代碼。我們先將SimpleTask序列化到文件中:

val task = new SimpleTask()nFileSerializer.writeObjectToFile(task, "task.ser")n

然後將SimpleTask類從我們的代碼中刪除,此時只有task.ser文件中含有task對象的序列化數據。接下來我們執行下面的代碼:

val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task]ntask.run()n

請各位讀者思考,上面的代碼執行後會出現什麼樣的結果?

  • 輸出:run simple task! ?
  • 輸出:run task! ?
  • 還是會報錯?

實際執行會出現形如下面的異常:

Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.SimpleTaskn at java.net.URLClassLoader.findClass(URLClassLoader.java:381)n at java.lang.ClassLoader.loadClass(ClassLoader.java:424)n at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)n at java.lang.ClassLoader.loadClass(ClassLoader.java:357)n at java.lang.Class.forName0(Native Method)n at java.lang.Class.forName(Class.java:348)n at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)n at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)n at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)n at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)n at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)n at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)n at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)n

從異常信息來看,反序列過程中找不到SimpleTask類。由此可以推斷序列化後的數據是不包含類的定義信息的。那麼,ObjectOutputStream到底序列化了哪些信息呢?

對ObjectOutputStream實現機制感興趣的同學可以去看下JDK中這個類的實現,ObjectOutputStream序列化對象時,從父類的數據開始序列化到子類,如果override了writeObject方法,會反射調用writeObject來序列化數據。序列化的數據會按照以下的順序以二進位的形式輸出到OutputStream中:

  1. 類的descriptor(僅僅是類的描述信息,不包含類的定義)
  2. 對象的primitive類型數據(int,boolean等,String和Array是特殊處理的)
  3. 對象的其他obj數據

回到我們的問題上:Java的方法能否被序列化?通過我們代碼示例及分析,想必大家對這個問題應該清楚了。通過ObjectOutputStream序列化對象,僅包含類的描述(而非定義),對象的狀態數據,由於缺少類的定義,也就是缺少SimpleTask的位元組碼,反序列化過程中就會出現ClassNotFound的異常。

如何讓我們反序列化的對象能正常使用呢?我們還需要了解類載入器。

類載入器:ClassLoader

ClassLoader在Java中是一個抽象類,ClassLoader的作用是載入類,給定一個類名,ClassLoader會嘗試查找或生成類的定義,一種典型的載入策略是將類名對應到文件名上,然後從文件系統中載入class file。

在我們的示例中,反序列化SimpleTask失敗,是因為JVM找不到類的定義,因此要確保正常反序列化,我們必須將SimpleTask的class文件保存下來,反序列化的時候能夠讓ClassLoader載入到SimpleTask的class。

接下來,我們對代碼做一些改造,添加一個ClassManipulator類,用於將對象的class文件導出到當前目錄的文件中,默認的文件名就是對象的類名(不含包名):

object ClassManipulator {n def saveClassFile(obj: AnyRef): Unit = {n val classLoader = obj.getClass.getClassLoadern val className = obj.getClass.getNamen val classFile = className.replace(., /) + ".class"n val stream = classLoader.getResourceAsStream(classFile)nn // just use the class simple name as the file namen val outputFile = className.split(.).last + ".class"n val fileStream = new FileOutputStream(outputFile)n var data = stream.read()n while (data != -1) {n fileStream.write(data)n data = stream.read()n }n fileStream.flush()n fileStream.close()n }n}n

按照JVM的規範,假設對package.Simple這樣的一個類編譯,編譯後的class文件為package/Simple.class,因此我們可以根據路徑規則,從當前JVM進程的Resource中得到指定類的class數據。

在刪除SimpleTask前,我們除了將task序列化到文件外,還需要將task的class文件保存起來,執行完下面的代碼,SimpleTask類就可以從代碼中剔除了:

val task = new SimpleTask()nFileSerializer.writeObjectToFile(task, "task.ser")nClassManipulator.saveClassFile(task)n

由於我們保存class文件的方式比較特殊,既不在jar包中,也不是按package/ClassName.class這種標準的保存方式,因此還需要實現一個自定義的FileClassLoader按照我們保存class文件的方式來載入所需的類:

class FileClassLoader() extends ClassLoader {n override def findClass(fullClassName: String): Class[_] = {n val file = fullClassName.split(.).last + ".class"n val in = new FileInputStream(file)n val bos = new ByteArrayOutputStreamn val bytes = new Array[Byte](4096)n var done = falsen while (!done) {n val num = in.read(bytes)n if (num >= 0) {n bos.write(bytes, 0, num)n } else {n done = truen }n }n val data = bos.toByteArrayn defineClass(fullClassName, data, 0, data.length)n }n}n

ObjectInputStream類用於對象的反序列化,在反序列化過程中,它根據序列化數據中類的descriptor信息,調用resolveClass方法載入對應的類,但是通過Class.forName載入class使用的並不是我們自定義的FileClassLoader,所以如果直接使用ObjectInputStream進行反序列,依然會因為找不到類而報錯,下面是resolveClass的源碼:

protected Class<?> resolveClass(ObjectStreamClass desc)n throws IOException, ClassNotFoundExceptionn{n String name = desc.getName();n try {n return Class.forName(name, false, latestUserDefinedLoader());n } catch (ClassNotFoundException ex) {n Class<?> cl = primClasses.get(name);n if (cl != null) {n return cl;n } else {n throw ex;n }n }n}n

為了能讓ObjectInputStream在序列化的過程中使用我們自定義的ClassLoader,我們還需要對FileSerializer中的readObjectFromFile方法做些改造,修改的代碼如下:

def readObjectFromFile(file: String, classLoader: ClassLoader): Object = {n val fileStream = new FileInputStream(file)n val ois = new ObjectInputStream(fileStream) {n override def resolveClass(desc: ObjectStreamClass): Class[_] =n Class.forName(desc.getName, false, classLoader)n }n val obj = ois.readObject()n ois.close()n objn}n

最後,我們將反序列化的代碼調整為:

val fileClassLoader = new FileClassLoader()nval task = FileSerializer.readObjectFromFile("task.ser", fileClassLoader).asInstanceOf[Task]ntask.run()n

反序列化的過程中能夠通過fileClassLoader載入到所需的類,這樣我們在執行就不會出錯了,最終的執行結果為:run simple task!。到此為止,我們已經完整地模擬了代碼分散式執行的過程。完整的示例代碼,請參閱:github.com/stanzhai/jvm

Spark對closure序列化的處理

我們依然通過一個示例,快速了解下Scala對閉包的處理,下面是從Scala的REPL中執行的代碼:

scala> val n = 2nn: Int = 2nnscala> val f = (x: Int) => x * nnf: Int => Int = <function1>nnscala> Seq.range(0, 5).map(f)nres0: Seq[Int] = List(0, 2, 4, 6, 8)n

f是採用Scala的=>語法糖定義的一個閉包,為了弄清楚Scala是如何處理閉包的,我們繼續執行下面的代碼:

scala> f.getClassnres0: Class[_ <: Int => Int] = class $anonfun$1nnscala> f.isInstanceOf[Function1[Int, Int]]nres1: Boolean = truennscala> f.isInstanceOf[Serializable]nres2: Boolean = truen

可以看出f對應的類為$anonfun$1是Function1[Int, Int]的子類,而且實現了Serializable介面,這說明f是可以被序列化的。

Spark對於數據的處理基本都是基於閉包,下面是一個簡單的Spark分散式處理數據的代碼片段:

val spark = SparkSession.builder().appName("demo").master("local").getOrCreate()nval sc = spark.sparkContextnval data = Array(1, 2, 3, 4, 5)nval distData = sc.parallelize(data)nval sum = distData.map(x => x * 2).sum()nprintln(sum) // 30.0n

對於distData.map(x => x * 2),map中傳的一個匿名函數,也是一個非常簡單的閉包,對distData中的每個元素*2,我們知道對於這種形式的閉包,Scala編譯後是可以序列化的,所以我們的代碼能正常執行也合情合理。將入我們將處理函數的閉包定義到一個類中,然後將代碼改造為如下形式:

class Operation {n val n = 2n def multiply = (x: Int) => x * nn}n...nval sum = distData.map(new Operation().multiply).sum()n...n

我們在去執行,會出現什麼樣的結果呢?實際執行會出現這樣的異常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializablen at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)n ...nCaused by: java.io.NotSerializableException: Operationn

Scala在構造閉包的時候會確定他所依賴的外部變數,並將它們的引用存到閉包對象中,這樣能保證在不同的作用域中調用閉包不出現問題。

出現Task not serializable的異常,是由於我們的multiply函數依賴Operation類的變數n,雖然multiply是支持序列化的,但是Operation不支持序列化,這導致multiply函數在序列化的過程中出現了NotSerializable的異常,最終導致我們的Task序列化失敗。為了確保multiply能被正常序列化,我們需要想辦法去除對Operation的依賴,我們將代碼做如下修改,在去執行就可以了:

class Operation {n def multiply = (x: Int) => x * 2n}n...nval sum = distData.map(new Operation().multiply).sum()n...n

Spark對閉包序列化前,會通過工具類org.apache.spark.util.ClosureCleaner嘗試clean掉閉包中無關的外部對象引用,ClosureCleaner對閉包的處理是在運行期間,相比Scala編譯器,能更精準的去除閉包中無關的引用。這樣做,一方面可以儘可能保證閉包可被序列化,另一方面可以減少閉包序列化後的大小,便於網路傳輸。

我們在開發Spark應用的時候,如果遇到Task not serializable的異常,就需要考慮下,閉包中是否或引用了無法序列化的對象,有的話,嘗試去除依賴就可以了。

Spark中實現的序列化工具有多個:

從SparkEnv類的實現來看,用於閉包序列化的是JavaSerializer:

JavaSerializer內部使用的是ObjectOutputStream將閉包序列化:

private[spark] class JavaSerializationStream(n out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)n extends SerializationStream {n private val objOut = new ObjectOutputStream(out)n ...n}n

將閉包反序列化的核心代碼為:

private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)n extends DeserializationStream {nn private val objIn = new ObjectInputStream(in) {n override def resolveClass(desc: ObjectStreamClass): Class[_] =n try {n Class.forName(desc.getName, false, loader)n } catch {n case e: ClassNotFoundException =>n JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)n }n }n ...n}n

關於ObjectInputStream我們前面已有介紹,JavaDeserializationStream有個關鍵的成員變數loader,它是個ClassLoader,可以讓Spark使用非默認的ClassLoader按照自定義的載入策略去載入class,這樣才能保證反序列化過程在其他節點正常進行。

通過前面的介紹,想要代碼在另一端執行,只有序列化還不行,還需要保證執行端能夠載入到閉包對應的類。接下來我們探討Spark載入class的機制。

Spark Application的class是如何載入的

通常情況下我們會將開發的Spark Application打包為jar包,然後通過spark-submit命令提交到集群運行,下面是一個官網的示例:

./bin/spark-submit n --class org.apache.spark.examples.SparkPi n ... n --jars /path/to/dep-libs.jar n /path/to/examples.jar n

此時,我們編寫的代碼中所包含的閉包,對應的類已經被編譯到jar包中了,所以Executor端只要能載入到這個jar包,從jar包中定位閉包的class文件,就可以將閉包反序列化了。事實上Spark也是這麼做的。

Spark Application的Driver端在運行的時候會基於netty建立一個文件服務,我們運行的jar包,及--jars中指定的依賴jar包,會被添加到文件伺服器中。這個過程在SparkContext的addJar方法中完成:

/**n * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.n * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supportedn * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.n */ndef addJar(path: String) {n if (path == null) {n logWarning("null specified as parameter to addJar")n } else {n var key = ""n if (path.contains("")) {n // For local paths with backslashes on Windows, URI throws an exceptionn key = env.rpcEnv.fileServer.addJar(new File(path))n } else {n val uri = new URI(path)n // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependenciesn Utils.validateURL(uri)n key = uri.getScheme match {n // A JAR file which exists only on the driver noden case null | "file" =>n try {n env.rpcEnv.fileServer.addJar(new File(uri.getPath))n } catch {n case exc: FileNotFoundException =>n logError(s"Jar not found at $path")n nulln }n // A JAR file which exists locally on every worker noden case "local" =>n "file:" + uri.getPathn case _ =>n pathn }n }n if (key != null) {n val timestamp = System.currentTimeMillisn if (addedJars.putIfAbsent(key, timestamp).isEmpty) {n logInfo(s"Added JAR $path at $key with timestamp $timestamp")n postEnvironmentUpdate()n }n }n }n}n

Executor端在執行任務的時候,會從任務信息中得到依賴的jar包,然後updateDependencies從Driver端的文件伺服器下載缺失的jar包,並將jar包添加到URLClassLoader中,最後再將task反序列化,反序列化前所需的jar都已準備好,因此能夠將task中的閉包正常反序列化,核心代碼如下:

override def run(): Unit = {n ...nn try {n val (taskFiles, taskJars, taskProps, taskBytes) =n Task.deserializeWithDependencies(serializedTask)nn // Must be set before updateDependencies() is called, in case fetching dependenciesn // requires access to properties contained within (e.g. for access control).n Executor.taskDeserializationProps.set(taskProps)nn updateDependencies(taskFiles, taskJars)n task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)n ...n } finally {n runningTasks.remove(taskId)n }n}n

這麼來看,整個Spark Application分散式載入class的機制就比較清晰了。Executor端能夠正常載入class,反序列化閉包,分散式執行代碼自然就不存在什麼問題了。

Spark REPL(spark-shell)中的代碼是如何分散式執行的

spark-shell是Spark為我們提供的一個REPL的工具,可以讓我們非常方便的寫一些簡單的數據處理腳本。下面是一個運行在spark-shell的代碼:

scala> val f = (x: Int) => x + 1nf: Int => Int = <function1>nnscala> val data = Array(1, 2, 3, 4, 5)ndata: Array[Int] = Array(1, 2, 3, 4, 5)nnscala> val distData = sc.parallelize(data)ndistData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26nnscala> distData.map(f).sum()nres0: Double = 20.0n

我們已知,閉包f會被Scala編譯為匿名類,如果要將f序列化到Executor端執行,必須要載入f對應的匿名類的class數據,才能正常反序列化。

Spark是如何得到f的class數據的?Executor又是如何載入到的?

源碼面前,了無秘密。我們看一下Spark的repl項目的代碼入口,核心代碼如下:

object Main extends Logging {n ...n val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))n val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")nn def main(args: Array[String]) {n doMain(args, new SparkILoop)n }nn // Visible for testingn private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {n interp = _interpn val jars = Utils.getUserJars(conf, isShell = true).mkString(File.pathSeparator)n val interpArguments = List(n "-Yrepl-class-based",n "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",n "-classpath", jarsn ) ++ args.toListnn val settings = new GenericRunnerSettings(scalaOptionError)n settings.processArguments(interpArguments, true)nn if (!hasErrors) {n interp.process(settings) // Repl starts and goes in loop of R.E.P.Ln Option(sparkContext).map(_.stop)n }n }n ...n}n

Spark2.1.0的REPL基於Scala-2.11的scala.tools.nsc編譯工具實現,代碼已經相當簡潔,Spark給interp設置了2個關鍵的配置-Yrepl-class-based和-Yrepl-outdir,通過這兩個配置,我們在shell中輸入的代碼會被編譯為class文件輸出到執行的文件夾中。如果指定了spark.repl.classdir配置,會用這個配置的路徑作為class文件的輸出路徑,否則使用SPARK_LOCAL_DIRS對應的路徑。下面是我測試過程中輸出到文件夾中的class文件:

我們已經清楚Spark如何將shell中的代碼編譯為class了,那麼Executor端,如何載入到這些class文件呢?在org/apache/spark/executor/Executor.scala中有段和REPL相關的代碼:

private val urlClassLoader = createClassLoader()nprivate val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)nn/**n * If the REPL is in use, add another ClassLoader that will readn * new classes defined by the REPL as the user types coden */nprivate def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {n val classUri = conf.get("spark.repl.class.uri", null)n if (classUri != null) {n logInfo("Using REPL class URI: " + classUri)n try {n val _userClassPathFirst: java.lang.Boolean = userClassPathFirstn val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")n .asInstanceOf[Class[_ <: ClassLoader]]n val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],n classOf[String], classOf[ClassLoader], classOf[Boolean])n constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst)n } catch {n case _: ClassNotFoundException =>n logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")n System.exit(1)n nulln }n } else {n parentn }n}nnoverride def run(): Unit = {n ...n Thread.currentThread.setContextClassLoader(replClassLoader)n val ser = env.closureSerializer.newInstance()n ...n}n

Executor啟動時會判斷是否為REPL模式,如果是的話會使用ExecutorClassLoader做為反序列閉包時所使用的ClassLoader,ExecutorClassLoader會通過網路從Driver端(也就是執行spark-shell的節點)載入所需的class文件。這樣我們在spark-shell中寫的代碼就可以分散式執行了。

總結

Spark實現代碼的分散式執行有2個關鍵點:

  1. 對象必須可序列化
  2. Executor端能夠載入到所需類的class文件,保證反序列化過程不出錯,這點通過自定義的ClassLoader來保障

滿足以上2個條件,我們的代碼就可以分散式運行了。

當然,構建一個完整的分散式計算框架,還需要有網路通信框架、RPC、文件傳輸服務等作為支撐,在了解Spark代碼分散式執行原理的基礎上,相信讀者已有思路基於JVM相關的語言構建分散式計算服務。

類比其他非JVM相關的語言,實現一個分散式計算框架,依然是需要解決序列化,動態載入執行代碼的問題。

推薦閱讀:

一般而言常見的Spark的性能瓶頸有哪些?
Scala快速入門系列:聲明變數、控制結構與函數、常用數組操作
Spark 2017歐洲技術峰會摘要(Spark 生態體系分類)
如何利用spark快速計算笛卡爾積?

TAG:Spark | 大数据 | 分布式计算 |