深入淺出Spark(二) 什麼是RDD

本系列講座是沁原對Sameer Farooqui的《Advanced Apache Spark》的解說。

完整視頻:bittiger.io/videos/NBAT

原始視頻參考youtube.com/watch?

(二)什麼是RDD?

1. RDD的官方定義

RDD是Spark中的數據抽象,意思是彈性分散式數據集。在邏輯上是一個數據集,在物理上則可以分塊分布在不同的機器上並發運行。RDD的數據具有不可變性(immutable)

圖.1. 一個邏輯RDD在物理上分塊存儲在不同的伺服器。

如圖所示,一個RDD數據集被分成了五塊,運行在了三個worker伺服器上。第一台上運行了兩個RDD數據塊,第二台上運行兩個RDD資料庫塊,第三台上運行剩下的一個數據塊。

2. RDD的生命周期

在Spark程序中,首先要讀取或創建RDD, 然後對數據進行一系列的變換操作(Transform),保存中間結果(Cache),最後對變換結果進行處理(Action)

2.1 RDD的產生可以通過對內存中的數據並行化,或直接讀取分散式資料庫(S3, HDFS, Cassandra 等等)而來。

圖.2. 通過parallelize介面,將內存數據變成RDD。(圖中sc指的是spark context實例。)

圖.3. 直接讀取文件生成RDD.

2.2. RDD支持數據變換介面,如常用的filter, map等等,在變換的過程中,RDD的數據並不立即發生實際變化(Lazily transform),而是保存了數據的依賴關係,直到要求RDD進行動作(Action)時。RDD會從全局的角度來優化Transform的運行過程。從而節省時間。

2.3 RDD的cache操作將數據的中間結果保存在內存中,方便下次使用。

2.4 RDD的Action操作將數據的運算結果進行統計和返回。常見的如count 和 collect.

圖.4. RDD操作實例

舉個例子。如圖4所示。從日誌(Log)資料庫中讀取的文件生成logLinesRDD, 形成了四個物理分塊。通過filter變換提取出日誌中的錯誤信息, 形成errorsRDD。 通過合併coalesce形成兩個塊。進一步過濾提取只包含錯誤1的日誌errorMsg1RDD。最後進行collect 動作, 將結果合併返回到Driver。中間結果,我們使用了count動作來返回一共有多少條錯誤日誌 。用saveToCassandra將錯誤日誌保存到Cassandra資料庫中。圖中綠色的箭頭表示Action。紅色箭頭表示Transformation。

3. 根據數據源,RDD可以分成許多類,比如從Jdbc得來的RDD是JdbcRDD.

圖.5. RDD分類

每一類的RDD都定義如下幾個重要的的特徵。

  1. 如何分塊。(Partition)

  2. 與父RDD的依賴關係(Dependency)

  3. 從父RDD求子RDD的函數(function)

  4. 希望當前RDD存儲的位置(preferred location)

  5. 負責存儲RDD的分塊類(Partitioner)

特徵2,3是保存了數據的產生方式, 當數據丟失時可以進行數據恢復。4,5是本地化存儲策略。通過儘可能的本地存儲來提高運算速度。

例一: HadoopRDD

通過讀HDFS生成的RDD。它的分塊策略是每個HDFS塊生成一個分塊。該RDD沒有父節點。我們希望這個RDD的數據塊存在HDFS數據塊相同的位置。不用進一步分塊。

例二. FilteredRDD

FilteredRDD產生在Filter操作後。分塊與父RDD相同。與父RDD一一對應, 存儲位置與父塊相同。

例三:JoinedRDD

該RDD產生在shuffle操作之後。每個reduce操作有一個分區。依賴於被shuffle的父RDD。進一步分區是通過HashPartitioner實現的。

4. 總結

本節講解了

  1. 什麼是RDD

  2. RDD的生命周期。 創建(Create),懶變換(Lazily Transform),緩存(Cache),和動作(Action)。

  3. RDD的分類和特徵。

本文作者Lion, 更多精彩內容,歡迎訪問官網 BitTiger.io 或關注 「論碼農的自我修養」 微信公眾號:bit_tiger


推薦閱讀:

如何看UCBerkeley RISELab即將問世的Ray,replacement of Spark?
Scaling Memcache in Facebook 筆記(三)
幾個有意思的開源庫/工具
SparkSQL的3種Join實現
下一波計算浪潮

TAG:Spark | 分布式计算 |