Spark RDD 論文簡析
來自專欄獃獃的分散式系統雜談4 人贊了文章
遙想我第一次參加實習的時候,我接手的第一個項目便是 Spark 插件的開發。當時為了做好這個工作,自己看了 Spark RDD 和 SparkSQL 的論文,還在閱讀 Spark 源碼的同時寫了好多 Spark 源碼分析的文章。過去了那麼久,現在便趁著學習 MIT 6.824 的機會,再來整理一下 Spark RDD 論文的內容吧。
本文由我按照 MIT 6.824 的課程安排閱讀 Spark RDD 的論文以及相關課程資料並總結而來,內容會更偏向於從科研的角度介紹 Spark RDD 誕生時所需要解決的問題以及對其基本工作方式的簡單介紹。
你也可以前往我的博客閱讀本文,以獲得更好的閱讀體驗。
背景
在 Apache Spark 廣泛使用以前,業界主要使用 Hadoop MapReduce 來對大數據進行分散式處理。誠然 Hadoop MapReduce 為企業及組織利用大量普通消費級機器組建集群進行數據處理成為可能,但隨著需求的不斷擴展,Hadoop MapReduce 也存在著這樣那樣的局限:
- MapReduce 編程模型的表達能力有限,僅靠 MapReduce 難以實現部分演算法
- 對分散式內存資源的使用方式有限,使得其難以滿足最近大量出現的需要復用中間結果的計算流程,包括:
- 如迭代式機器學習演算法及圖演算法
- 互動式數據挖掘
Spark RDD 作為一個分散式內存資源抽象便致力於解決 Hadoop MapReduce 的上述問題:
- 通過對分散式集群的內存資源進行抽象,允許程序高效復用已有的中間結果
- 提供比 MapReduce 更靈活的編程模型,兼容更多的高級演算法
接下來我們便詳細說說 Spark RDD 是如何達成上述目標的。
RDD:分散式內存資源抽象
RDD(Resilient Distributed Dataset,彈性分散式數據集)本質上是一種只讀、分片的記錄集合,只能由所支持的數據源或是由其他 RDD 經過一定的轉換(Transformation)來產生。通過由用戶構建 RDD 間組成的產生關係圖,每個 RDD 都能記錄到自己是如何由還位於持久化存儲中的源數據計算得出的,即其血統(Lineage)。
相比於 RDD 只能通過粗粒度的「轉換」來創建(或是說寫入數據),分散式共享內存(Distributed Shared Memory,DSM)是另一種分散式系統常用的分散式內存抽象模型:應用在使用分散式共享內存時可以在一個全局可見的地址空間中進行隨機的讀寫操作。類似的系統包括了一些常見的分散式內存資料庫(如 Redis、Memcached)。RDD 產生的方式限制了其只適用於那些只會進行批量數據寫入的應用程序,但卻使得 RDD 可以使用更為高效的高可用機制。
除了 Transformation 以外,Spark 還為 RDD 提供了 Action,可對 RDD 進行計算操作並把一個結果值返回給客戶端,或是將 RDD 里的數據寫出到外部存儲。
Transformation 與 Action 的區別還在於,對 RDD 進行 Transformation 並不會觸發計算:Transformation 方法所產生的 RDD 對象只會記錄住該 RDD 所依賴的 RDD 以及計算產生該 RDD 的數據的方式;只有在用戶進行 Action 操作時,Spark 才會調度 RDD 計算任務,依次為各個 RDD 計算數據。
RDD 具體實現與計算調度
前面我們提到,RDD 在物理形式上是分片的,其完整數據被分散在集群內若干機器的內存上。當用戶通過 Transformation 創建出新的 RDD 後,新的 RDD 與原本的 RDD 便形成了依賴關係。根據用戶所選 Transformation 操作的不同,RDD 間的依賴關係可以被分為兩種:
- 窄依賴(Narrow Dependency):父 RDD 的每個分片至多被子 RDD 中的一個分片所依賴
- 寬依賴(Wide Dependency):父 RDD 中的分片可能被子 RDD 中的多個分片所依賴
通過將窄依賴從寬依賴中區分出來,Spark 便可以針對 RDD 窄依賴進行一定的優化。首先,窄依賴使得位於該依賴鏈上的 RDD 計算操作可以被安排到同一個集群節點上流水線進行;其次,在節點失效需要恢復 RDD 時,Spark 只需要恢復父 RDD 中的對應分片即可,恢復父分片時還能將不同父分片的恢復任務調度到不同的節點上並發進行。
總的來說,一個 RDD 由以下幾部分組成:
- 其分片集合
- 其父 RDD 集合
- 計算產生該 RDD 的方式
- 描述該 RDD 所包含數據的模式、分片方式、存儲位置偏好等信息的元數據
在用戶調用 Action 方法觸發 RDD 計算時,Spark 會按照定義好的 RDD 依賴關係繪製出完整的 RDD 血統圖,並根據圖中各節點間依賴關係的不同對計算過程進行切分:
簡單來說,Spark 會把儘可能多的可以流水線執行的窄依賴 Transformation 放到同一個 Job Stage 中,而 Job Stage 之間則要求集群對數據進行 Shuffle。Job Stage 劃分完畢後,Spark 便會為每個 Partition 生成計算任務(Task)並調度到集群節點上運行。
在調度 Task 時,Spark 也會考慮計算該 Partition 所需的數據的位置:例如,如果 RDD 是從 HDFS 中讀出數據,那麼 Partition 的計算就會儘可能被分配到持有對應 HDFS Block 的節點上;或者,如果 Spark 已經將父 RDD 持有在內存中,子 Partition 的計算也會被儘可能分配到持有對應父 Partition 的節點上。對於不同 Job Stage 之間的 Data Shuffle,目前 Spark 採取與 MapReduce 相同的策略,會把中間結果持久化到節點的本地存儲中,以簡化失效恢復的過程。
當 Task 所在的節點失效時,只要該 Task 所屬 Job Stage 的父 Job Stage 數據仍可用,Spark 只要將該 Task 調度到另一個節點上重新運行即可。如果父 Job Stage 的數據也已經不可用了,那麼 Spark 就會重新提交一個計算父 Job Stage 數據的 Task,以完成恢復。有趣的是,從論文來看,Spark 當時還沒有考慮調度模塊本身的高可用,不過調度模塊持有的狀態只有 RDD 的血統圖和 Task 分配情況,通過狀態備份的方式實現高可用也是十分直觀的。
結語
總的來說,Spark RDD 的亮點在於如下兩點:
- 確定且基於血統圖的數據恢復重計算過程
- 面向記錄集合的轉換 API
比起類似於分散式內存資料庫的那種分散式共享內存模型,Spark RDD 巧妙地利用了其不可變和血統記錄的特性實現了對分散式內存資源的抽象,很好地支持了批處理程序的使用場景,同時大大簡化了節點失效後的數據恢復過程。
同時,我們也應該意識到,Spark 是對 MapReduce 的一種補充而不是替代:將那些能夠已有的能夠很好契合 MapReduce 模型的計算作業遷移到 Spark 上不會收穫太多的好處(例如普通的 ETL 作業)。除外,RDD 本身在 Spark 生態中也漸漸變得落伍,Spark 已經逐漸轉向使用從 SparkSQL 開始引入的 DataFrame 模型了。後續有時間的話我也許也會再總結一下 SparkSQL 的論文。
不管怎麼說,Spark RDD 依然是通過很簡單的方式解決了大數據計算領域中的一大痛點,閱讀其論文也是一次相當不錯的 Case Study。
推薦閱讀:
※Spark源碼剖析(五):Task提交流程
※SparkR專欄[5]—機器學習
※第十二章:HDFS客戶端操作
※Spark實戰(4)_Master原理剖析與源碼分析
※第四章:克隆虛擬機