標籤:

Spark源碼分析(1) RDD是什麼

RDD是Spark的基礎,是對大數據的抽象,所以先破解Spark,首先從RDD開始。* RDD 是什麼?有什麼特點?* RDD 包含什麼?* RDD 能做什麼?

文尾有結論

RDD 的注釋

org.apache.spark.rdd.RDD 類源代碼中有詳細的注釋:

  • A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

    翻譯:彈性的 分散式 數據集是 Spark 基礎的抽象。

    解釋:彈性的(可復原的),說明數據集具有容錯性、可修復性。 分散式,說明數據集可以分布在不同的機器上

  • Represents an immutable, partitioned collection of elements that can be operated on in parallel.

    翻譯:RDD 是不可變的 分區的 可並行處理的 元素集合

    解釋:不可變的,這和 Scala 的設計理念相同,數據集一旦構建完成,就不能再修改,這樣能輕鬆解決多個線程讀數據的一致性問題。 分區的=可並行處理的=分散式

  • This class contains the basic operations available on all RDDs, such as map, filter, and persist.

    翻譯:這個抽象類包含了所有 RDD 都應該有的基本操作,比如 map 、filter 、persist等

    解釋:這三個操作分別是:批量轉換、篩選、持久化

  • In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as groupByKey and join;

    翻譯:另外 PairRDDFunctions 對象中包含了 鍵值對型(KV型) RDD 的操作,例如 groupByKey和 join;

    解釋:KV 型可以支持按 Key 分組、關聯等操作

  • [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles;

    翻譯:DoubleRDDFunctions提供可 double 數據集的操作;

    解釋:數值型數據集有求和、平均、分布圖等統計性操作

  • and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.

    翻譯:SequenceFileRDDFunctions 提供了順序存儲操作

  • All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.

    翻譯:所有的的類通過隱式轉換自動地用於RDD實例中

    解釋:RDD 伴生對象里包含了隱式轉換函數,用implicit 修飾。隱式轉換是 Scala 的語法特性。

  • Internally, each RDD is characterized by five main properties:

    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

      翻譯:在 RDD 中,包含這樣的5個屬性(也就說要實現抽象方法或給空對象賦值):
    • 一個分區的列表(getPartitions)
    • 一個用於計算分區中數據的函數(compute)
    • 一個對其他 RDD 的依賴列表(getDependencies)
    • 可選:KV 型 RDD 應該有一個分區器,例如 hash-分區器(partitioner)
    • 可選:分區數據計算完後優先存儲的位置,例如 HDFS 的某個塊(getPreferredLocations)
  • All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions.

    翻譯: Spark 中所有的任務調度、任務執行都依賴於這些方法。RDD 可以覆蓋這些方法,實現有自己的計算方法。例如從一個新的存儲系統中讀取數據。

  • Please refer to the 101.96.8.165/people.csa for more details on RDD internals.

    翻譯:更多細節,可以 Spark 的論文

一段示例代碼

是的,我們從HelloWorld開始,官方推薦的第一個程序是計算π的近似值:

import scala.math.randomimport org.apache.spark.sql.SparkSessionobject SparkPi { def main(args: Array[String]) { val spark = SparkSession .builder .appName("Spark Pi") .getOrCreate() val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.sparkContext.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y <= 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / (n - 1)) spark.stop() }}

??什麼,RDD 在哪裡?

  1. 獲得 Spark 會話 和 Spark 上下文

    val spark = SparkSession.builder.appName("xx").getOrCreate()

    這4行,只能算是1行代碼。調用內部的構造器產生了一個Spark 會話對象,賦值給 spark。後面 spark.sparkContext 是獲得 Spark 的上下文對象。至於會話對象和上下文對象,以後再分析。

  2. 創建一個 RDD

    spark.sparkContext.parallelize(1 until n, slices)

    SparkContext 對象中有一個parallelize 函數,創建了一個RDD對象。 RDD是抽象類。進入源碼我們可以看到 創建的 RDD 是 ParallelCollectionRDD(字面翻譯為並行容器 RDD)。這個RDD是最簡單的RDD了。如果是我,我會將它命名為SimpleRDD。 這句話創建了一個包含slices個分區的 RDD,RDD 的內容是1到 n,這 n+1 個數。數據存在內存中,從內存讀分區的數據。

  3. 看看這個 RDD 中的細節

    還記得前一節翻譯的文字嗎?RDD 應該實現5個方法。這個並行容器 RDD 是怎麼實現的呢?

    • 一個分區的列表 將數據分成slices份,放在slices個容器中。每個容器就是一個分區,所有容器構成了分區列表
    • 一個用於計算分區中數據的函數 什麼都沒做,返回分區的迭代器
    • 一個依賴列表 依賴列表為Nil空列表。即,這個 RDD 不依賴別的 RDD
    • 一個分區器 不是 KV 型的,不需要
    • 一個運算存儲優先位置 SparkContext傳入了一個 Map,Map 有slices個key,對應slices個容器。可見,SparkContext希望結果存在內存中。
  4. map

    map是將分組中每一個元素映射成另一個元素的操作。我們說過,RDD是不可變的,map這個操作產生新MapPartitionsRDD對象。 那MapPartitionsRDD的5個方法呢?

    • 一個依賴列表:只依賴於上游的 RDD,本例中依賴於上游的ParallelCollectionRDD。
    • 一個分區列表:就是上游分區列表,直接讀取上游數據
    • 一個計算:計算過程就是「映射關係」,由外部傳入一個函數對象表達映射關係
    • 一個分區器:上游 RDD 的分區器,直接讀上游的分區
    • 一個優先存儲位置:上游 RDD 的優先位置,本例中直接寫到SparkContext傳入的 Map
  5. reduce

    reduce 也是一個操作,是多對一的聚合操作,聚合前後類型必須一致。本例中是求和操作。 過程可以簡述成,先計算每個分區的聚合結果,再將多個分區的結果再聚合。過程比較複雜,以後再深入。

  6. 如何計算π?

    random 是隨機數,範圍是 [0, 1),那麼x 和 y 是 [-1, 1)範圍內的隨機數。 計算x*x+y*y,這是點(x, y)到(0, 0) 的距離,當距離不大1(點落在r=1的圓內)時,取1,否則取0。

    那麼隨機取 N 個點,點落圓內的幾率等於圓的面積/邊長為2的正方形的面積。所以:

    圓的面積 ≌ 正方形面積 * 落在圓內的點數 / 所有的點數

    圓的面積=π,正方形面積=4

    根據大數定理和中心極限定理,取的點越多,π的估值越近似於正態分布;取得的點越多,正態分布的標準差越小;取得的點越多,正態分布的均值越接近π的真值。

    所以,隨著取點的增加,π估值約精確。

Scala 語法

1 until n

用到了三個 Scala 語法:

  1. 一切皆對象 在 Java 中,1會被認為是一個基本類型int,可以裝包成對象,在 Scala 中,1 就是一個對象。
  2. 隱式轉換 util是調用 RichInt.util 方法。Int 轉換成 RichInt 是隱式的,定義在 scala.Predef對象中的intWrapper方法。scala.Predef類似於宏。 參考: scala source implicit conversion from Int to RichInt - Stack Overflow
  3. 函數調用的寫法 1 until n等價於1.until(n),也就是說,如果對象方法若只有一個參數,可以省略掉點和括弧,這樣代碼更接近自然語言。

OK,那麼1 until n 這句話寫全了應該是什麼樣的呢? 答:scala.this.Predef.intWrapper(1).until(n);

總結:

  1. RDD 是數據集
  2. RDD 的特點是有彈性、分散式、不可變。
  3. RDD應該包含5個部分:一個分區集、一個依賴集、一個運算、[一個分區器、一個優先結果存儲位置]。
  4. RDD 有一系列的操作,包括映射、過濾、聚合、存儲等。

本文鏈源碼接:

RDD [spark/core/RDD/RDD.scala at master · apache/spark · GitHub](github.com/apache/spark)

map [spark/core/RDD/MapPartitionsRDD at master · apache/spark · GitHub](github.com/apache/spark)

計算π [spark/examples/SparkPi.scala at master · apache/spark · GitHub](github.com/apache/spark)


推薦閱讀:

梯度迭代樹(GBDT)演算法簡介及Spark MLlib調用
Apache Spark 中的 RangePartitioner 是如何實現數據採樣的?
Spark基礎性能優化

TAG:Spark |