深度剖析 Spark 分散式執行原理
讓代碼分散式運行是所有分散式計算框架需要解決的最基本的問題。
Spark 是大數據領域中相當火熱的計算框架,在大數據分析領域有一統江湖的趨勢,網上對於 Spark 源碼分析的文章有很多,但是介紹 Spark 如何處理代碼分散式執行問題的資料少之又少,這也是我撰寫文本的目的。
轉載請註明 AIQ - 最專業的機器學習大數據社區 http://www.itzyshare.com
Spark 運行在 JVM 之上,任務的執行依賴序列化及類載入機制,因此本文會重點圍繞這兩個主題介紹 Spark 對代碼分散式執行的處理。本文假設讀者對 Spark、Java、Scala 有一定的了解,代碼示例基於 Scala,Spark 源碼基於 2.1.0 版本。閱讀本文你可以了解到:
- Java 對象序列化機制
- 類載入器的作用
- Spark 對 closure 序列化的處理
- Spark Application 的 class 是如何載入的
- Spark REPL(spark-shell)中的代碼是如何分散式執行的
根據以上內容,讀者可以基於 JVM 相關的語言構建一個自己的分散式計算服務框架。
Java 對象序列化
序列化 (Serialization) 是將對象的狀態信息轉換為可以存儲或傳輸的形式的過程。所謂的狀態信息指的是對象在內存中的數據,Java 中一般指對象的欄位數據。我們開發 Java 應用的時候或多或少都處理過對象序列化,對象常見的序列化形式有 JSON、XML 等。
JDK 中內置一個 ObjectOutputStream 類可以將對象序列化為二進位數據,使用 ObjectOutputStream 序列化對象時,要求對象所屬的類必須實現 java.io.Serializable 介面,否則會報 java.io.NotSerializableException 的異常。
基本的概念先介紹到這。接下來我們一起探討一個問題:Java 的方法能否被序列化?
假設我們有如下的 SimpleTask 類(Java 類):
import java.io.Serializable;public abstract class Task implements Serializable { public void run() { System.out.println("run task!"); }}public class SimpleTask extends Task { @Override public void run() { System.out.println("run simple task!"); }}
還有一個用於將對象序列化到文件的工具類 FileSerializer:
import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}object FileSerializer { def writeObjectToFile(obj: Object, file: String) = { val fileStream = new FileOutputStream(file) val oos = new ObjectOutputStream(fileStream) oos.writeObject(obj) oos.close() } def readObjectFromFile(file: String): Object = { val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) val obj = ois.readObject() ois.close() obj }}
簡單起見,我們採用將對象序列化到文件,然後通過反序列化執行的方式來模擬代碼的分散式執行。SimpleTask 就是我們需要模擬分散式執行的代碼。
我們先將 SimpleTask 序列化到文件中:
val task = new SimpleTask()FileSerializer.writeObjectToFile(task, "task.ser")
然後將 SimpleTask 類從我們的代碼中刪除,此時只有 task.ser 文件中含有 task 對象的序列化數據。
接下來我們執行下面的代碼:
val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task]task.run()
請各位讀者思考,上面的代碼執行後會出現什麼樣的結果?
- 輸出:run simple task! ?
- 輸出:run task! ?
- 還是會報錯?
實際執行會出現形如下面的異常:
Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.SimpleTask at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)
從異常信息來看,反序列過程中找不到 SimpleTask 類。由此可以推斷序列化後的數據是不包含類的定義信息的。那麼,ObjectOutputStream 到底序列化了哪些信息呢?
對 ObjectOutputStream 實現機制感興趣的同學可以去看下 JDK 中這個類的實現,ObjectOutputStream 序列化對象時,從父類的數據開始序列化到子類,如果 override 了 writeObject 方法,會反射調用 writeObject 來序列化數據。序列化的數據會按照以下的順序以二進位的形式輸出到 OutputStream 中:
類的 descriptor(僅僅是類的描述信息,不包含類的定義)對象的 primitive 類型數據 (int,boolean 等,String 和 Array 是特殊處理的)對象的其他 obj 數據回到我們的問題上:Java 的方法能否被序列化?通過我們代碼示例及分析,想必大家對這個問題應該清楚了。通過 ObjectOutputStream 序列化對象,僅包含類的描述(而非定義),對象的狀態數據,由於缺少類的定義,也就是缺少 SimpleTask 的位元組碼,反序列化過程中就會出現 ClassNotFound 的異常。
如何讓我們反序列化的對象能正常使用呢?我們還需要了解類載入器。
類載入器:ClassLoader
ClassLoader 在 Java 中是一個抽象類,ClassLoader 的作用是載入類,給定一個類名,ClassLoader 會嘗試查找或生成類的定義,一種典型的載入策略是將類名對應到文件名上,然後從文件系統中載入 class file。在我們的示例中,反序列化 SimpleTask 失敗,是因為 JVM 找不到類的定義,因此要確保正常反序列化,我們必須將 SimpleTask 的 class 文件保存下來,反序列化的時候能夠讓 ClassLoader 載入到 SimpleTask 的 class。
接下來,我們對代碼做一些改造,添加一個 ClassManipulator 類,用於將對象的 class 文件導出到當前目錄的文件中,默認的文件名就是對象的類名(不含包名):
object ClassManipulator { def saveClassFile(obj: AnyRef): Unit = { val classLoader = obj.getClass.getClassLoader val className = obj.getClass.getName val classFile = className.replace(., /) + ".class" val stream = classLoader.getResourceAsStream(classFile) // just use the class simple name as the file name val outputFile = className.split(.).last + ".class" val fileStream = new FileOutputStream(outputFile) var data = stream.read() while (data != -1) { fileStream.write(data) data = stream.read() } fileStream.flush() fileStream.close() }}
按照 JVM 的規範,假設對 package.Simple 這樣的一個類編譯,編譯後的 class 文件為 package/Simple.class,因此我們可以根據路徑規則,從當前 JVM 進程的 Resource 中得到指定類的 class 數據。
在刪除 SimpleTask 前,我們除了將 task 序列化到文件外,還需要將 task 的 class 文件保存起來,執行完下面的代碼,SimpleTask 類就可以從代碼中剔除了:
val task = new SimpleTask()FileSerializer.writeObjectToFile(task, "task.ser")ClassManipulator.saveClassFile(task)
由於我們保存 class 文件的方式比較特殊,既不在 jar 包中,也不是按 package/ClassName.class 這種標準的保存方式,因此還需要實現一個自定義的 FileClassLoader 按照我們保存 class 文件的方式來載入所需的類:
class FileClassLoader() extends ClassLoader { override def findClass(fullClassName: String): Class[_] = { val file = fullClassName.split(.).last + ".class" val in = new FileInputStream(file) val bos = new ByteArrayOutputStream val bytes = new Array[Byte](4096) var done = false while (!done) { val num = in.read(bytes) if (num >= 0) { bos.write(bytes, 0, num) } else { done = true } } val data = bos.toByteArray defineClass(fullClassName, data, 0, data.length) }}
ObjectInputStream 類用於對象的反序列化,在反序列化過程中,它根據序列化數據中類的 descriptor 信息,調用 resolveClass 方法載入對應的類,但是通過 Class.forName 載入 class 使用的並不是我們自定義的 FileClassLoader,所以如果直接使用 ObjectInputStream 進行反序列,依然會因為找不到類而報錯,下面是 resolveClass 的源碼:
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException{ String name = desc.getName(); try { return Class.forName(name, false, latestUserDefinedLoader()); } catch (ClassNotFoundException ex) { Class<?> cl = primClasses.get(name); if (cl != null) { return cl; } else { throw ex; } }}
為了能讓 ObjectInputStream 在序列化的過程中使用我們自定義的 ClassLoader,我們還需要對 FileSerializer 中的 readObjectFromFile 方法做些改造,修改的代碼如下:
def readObjectFromFile(file: String, classLoader: ClassLoader): Object = {
val fileStream = new FileInputStream(file) val ois = new ObjectInputStream(fileStream) { override def resolveClass(desc: ObjectStreamClass): Class[_] =Class.forName(desc.getName, false, classLoader)
} val obj = ois.readObject() ois.close() obj}最後,我們將反序列化的代碼調整為:
val fileClassLoader = new FileClassLoader()
val task = FileSerializer.readObjectFromFile("task.ser", fileClassLoader).asInstanceOf[Task]task.run()反序列化的過程中能夠通過 fileClassLoader 載入到所需的類,這樣我們在執行就不會出錯了,最終的執行結果為:run simple task!。到此為止,我們已經完整地模擬了代碼分散式執行的過程。完整的示例代碼,請參閱:點擊這裡
Spark 對 closure 序列化的處理
我們依然通過一個示例,快速了解下 Scala 對閉包的處理,下面是從 Scala 的 REPL 中執行的代碼:
scala> val n = 2
n: Int = 2scala> val f = (x: Int) => x * nf: Int => Int = <function1>scala> Seq.range(0, 5).map(f)res0: Seq[Int] = List(0, 2, 4, 6, 8)f 是採用 Scala 的 => 語法糖定義的一個閉包,為了弄清楚 Scala 是如何處理閉包的,我們繼續執行下面的代碼:
scala> f.getClass
res0: Class[_ <: Int => Int] = class $anonfun$1scala> f.isInstanceOf[Function1[Int, Int]]res1: Boolean = true
scala> f.isInstanceOf[Serializable]res2: Boolean = true可以看出 f 對應的類為 $anonfun$1 是 Function1[Int, Int] 的子類,而且實現了 Serializable 介面,這說明 f 是可以被序列化的。
Spark 對於數據的處理基本都是基於閉包,下面是一個簡單的 Spark 分散式處理數據的代碼片段:
val spark = SparkSession.builder().appName("demo").master("local").getOrCreate()
val sc = spark.sparkContextval data = Array(1, 2, 3, 4, 5)val distData = sc.parallelize(data)val sum = distData.map(x => x * 2).sum()println(sum) // 30.0對於 distData.map(x => x * 2),map 中傳的一個匿名函數,也是一個非常簡單的閉包,對 distData 中的每個元素 *2,我們知道對於這種形式的閉包,Scala 編譯後是可以序列化的,所以我們的代碼能正常執行也合情合理。將入我們將處理函數的閉包定義到一個類中,然後將代碼改造為如下形式:
class Operation {
val n = 2 def multiply = (x: Int) => x * n}...val sum = distData.map(new Operation().multiply).sum()...我們在去執行,會出現什麼樣的結果呢?實際執行會出現這樣的異常:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ...Caused by: java.io.NotSerializableException: Operation
Scala 在構造閉包的時候會確定他所依賴的外部變數,並將它們的引用存到閉包對象中,這樣能保證在不同的作用域中調用閉包不出現問題。
出現 Task not serializable 的異常,是由於我們的 multiply 函數依賴 Operation 類的變數 n,雖然 multiply 是支持序列化的,但是 Operation 不支持序列化,這導致 multiply 函數在序列化的過程中出現了 NotSerializable 的異常,最終導致我們的 Task 序列化失敗。
為了確保 multiply 能被正常序列化,我們需要想辦法去除對 Operation 的依賴,我們將代碼做如下修改,在去執行就可以了:
class Operation {
def multiply = (x: Int) => x * 2}...val sum = distData.map(new Operation().multiply).sum()...Spark 對閉包序列化前,會通過工具類 org.apache.spark.util.ClosureCleaner 嘗試 clean 掉閉包中無關的外部對象引用,ClosureCleaner 對閉包的處理是在運行期間,相比 Scala 編譯器,能更精準的去除閉包中無關的引用。這樣做,一方面可以儘可能保證閉包可被序列化,另一方面可以減少閉包序列化後的大小,便於網路傳輸。
我們在開發 Spark 應用的時候,如果遇到 Task not serializable 的異常,就需要考慮下,閉包中是否或引用了無法序列化的對象,有的話,嘗試去除依賴就可以了。
Spark 中實現的序列化工具有多個:
從 SparkEnv 類的實現來看,用於閉包序列化的是 JavaSerializer:
JavaSerializer 內部使用的是 ObjectOutputStream 將閉包序列化:
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) extends SerializationStream { private val objOut = new ObjectOutputStream(out) ...}將閉包反序列化的核心代碼為:
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream { private val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass): Class[_] = try { Class.forName(desc.getName, false, loader) } catch { case e: ClassNotFoundException => JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e) } } ...}關於 ObjectInputStream 我們前面已有介紹,JavaDeserializationStream 有個關鍵的成員變數 loader,它是個 ClassLoader,可以讓 Spark 使用非默認的 ClassLoader 按照自定義的載入策略去載入 class,這樣才能保證反序列化過程在其他節點正常進行。
通過前面的介紹,想要代碼在另一端執行,只有序列化還不行,還需要保證執行端能夠載入到閉包對應的類。接下來我們探討 Spark 載入 class 的機制。
Spark Application 的 class 是如何載入的
通常情況下我們會將開發的 Spark Application 打包為 jar 包,然後通過 spark-submit 命令提交到集群運行,下面是一個官網的示例:
./bin/spark-submit
--class org.apache.spark.examples.SparkPi ... --jars /path/to/dep-libs.jar /path/to/examples.jar此時,我們編寫的代碼中所包含的閉包,對應的類已經被編譯到 jar 包中了,所以 Executor 端只要能載入到這個 jar 包,從 jar 包中定位閉包的 class 文件,就可以將閉包反序列化了。事實上 Spark 也是這麼做的。
Spark Application 的 Driver 端在運行的時候會基於 netty 建立一個文件服務,我們運行的 jar 包,及–jars 中指定的依賴 jar 包,會被添加到文件伺服器中。這個過程在 SparkContext 的 addJar 方法中完成:
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */def addJar(path: String) { if (path == null) { logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("")) { // For local paths with backslashes on Windows, URI throws an exception key = env.rpcEnv.fileServer.addJar(new File(path)) } else { val uri = new URI(path) // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies Utils.validateURL(uri) key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => try { env.rpcEnv.fileServer.addJar(new File(uri.getPath)) } catch { case exc: FileNotFoundException => logError(s"Jar not found at $path") null } // A JAR file which exists locally on every worker node case "local" => "file:" + uri.getPath case _ => path } } if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } } }}Executor 端在執行任務的時候,會從任務信息中得到依賴的 jar 包,然後 updateDependencies 從 Driver 端的文件伺服器下載缺失的 jar 包,並將 jar 包添加到 URLClassLoader 中,最後再將 task 反序列化,反序列化前所需的 jar 都已準備好,因此能夠將 task 中的閉包正常反序列化,核心代碼如下:
override def run(): Unit = {
... try { val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask) // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskProps) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) ... } finally { runningTasks.remove(taskId) }}這麼來看,整個 Spark Application 分散式載入 class 的機制就比較清晰了。Executor 端能夠正常載入 class,反序列化閉包,分散式執行代碼自然就不存在什麼問題了。
Spark REPL(spark-shell)中的代碼是如何分散式執行的
spark-shell 是 Spark 為我們提供的一個 REPL 的工具,可以讓我們非常方便的寫一些簡單的數據處理腳本。下面是一個運行在 spark-shell 的代碼:scala> val f = (x: Int) => x + 1
f: Int => Int = <function1>scala> val data = Array(1, 2, 3, 4, 5)data: Array[Int] = Array(1, 2, 3, 4, 5)scala> val distData = sc.parallelize(data)distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26scala> distData.map(f).sum()res0: Double = 20.0我們已知,閉包 f 會被 Scala 編譯為匿名類,如果要將 f 序列化到 Executor 端執行,必須要載入 f 對應的匿名類的 class 數據,才能正常反序列化。
Spark 是如何得到 f 的 class 數據的?Executor 又是如何載入到的?
源碼面前,了無秘密。我們看一下 Spark 的 repl 項目的代碼入口,核心代碼如下:
object Main extends Logging {
... val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") def main(args: Array[String]) { doMain(args, new SparkILoop) } // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp val jars = Utils.getUserJars(conf, isShell = true).mkString(File.pathSeparator) val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", jars ) ++ args.toList val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) if (!hasErrors) { interp.process(settings) // Repl starts and goes in loop of R.E.P.L Option(sparkContext).map(_.stop) } } ...}Spark2.1.0 的 REPL 基於 Scala-2.11 的 scala.tools.nsc 編譯工具實現,代碼已經相當簡潔,Spark 給 interp 設置了 2 個關鍵的配置 -Yrepl-class-based 和 -Yrepl-outdir,通過這兩個配置,我們在 shell 中輸入的代碼會被編譯為 class 文件輸出到執行的文件夾中。
如果指定了 spark.repl.classdir 配置,會用這個配置的路徑作為 class 文件的輸出路徑,否則使用 SPARK_LOCAL_DIRS 對應的路徑。下面是我測試過程中輸出到文件夾中的 class 文件:
我們已經清楚 Spark 如何將 shell 中的代碼編譯為 class 了,那麼 Executor 端,如何載入到這些 class 文件呢?在 org/apache/spark/executor/Executor.scala 中有段和 REPL 相關的代碼:
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)/** * If the REPL is in use, add another ClassLoader that will read * new classes defined by the REPL as the user types code */private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) try { val _userClassPathFirst: java.lang.Boolean = userClassPathFirst val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader") .asInstanceOf[Class[_ <: ClassLoader]] val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv], classOf[String], classOf[ClassLoader], classOf[Boolean]) constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst) } catch { case _: ClassNotFoundException => logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") System.exit(1) null } } else { parent }}override def run(): Unit = { ... Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() ...}Executor 啟動時會判斷是否為 REPL 模式,如果是的話會使用 ExecutorClassLoader 做為反序列閉包時所使用的 ClassLoader,ExecutorClassLoader 會通過網路從 Driver 端(也就是執行 spark-shell 的節點)載入所需的 class 文件。這樣我們在 spark-shell 中寫的代碼就可以分散式執行了。
總結 Spark 實現代碼的分散式執行有 2 個關鍵點: 1.對象必須可序列化
2.Executor端能夠載入到所需類的class文件,保證反序列化過程不出錯,這點通過自定義的ClassLoader來保障
滿足以上 2 個條件,我們的代碼就可以分散式運行了。
當然,構建一個完整的分散式計算框架,還需要有網路通信框架、RPC、文件傳輸服務等作為支撐,在了解 Spark 代碼分散式執行原理的基礎上,相信讀者已有思路基於 JVM 相關的語言構建分散式計算服務。
類比其他非 JVM 相關的語言,實現一個分散式計算框架,依然是需要解決序列化,動態載入執行代碼的問題。
(完)
轉載請註明 AIQ - 最專業的機器學習大數據社區 http://www.itzyshare.com
推薦閱讀:
※國內哪些互聯網公司在用mesos,哪些互聯網公司在用yarn,兩者前景如何?
※spark中的RDD叫做彈性分布數據集,如何理解彈性兩個字?
※Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優、王家林、2018清華大學出版
※第十二章:HDFS客戶端操作
※Spark比Hadoop的優勢有這麼大嗎?