標籤:

Spark 為什麼 不允許 RDD 嵌套(如 RDD[RDD[T]])?


謝邀,絕好的宣傳我們項目的問題,先佔個坑。如有錯誤,歡迎指正。

基於百度內部遇到的需求,我先猜想一下題主的使用場景,應該是期望於在groupBy產生RDD上進行mapValues操作時,拿到每個key的數據後,對每個key上調用一個RDD上面的現有已經實現過的演算法,例如,想在每個Key下調用MLLib里的接收RDD的一個演算法。

在現有的SparkRDD設計下,這種演算法是無法復用的,因為在作為mapValues的參數的函數內部,我們拿到的數據是實際的Iterable的數據,而不是RDD。我在百度內部曾經遇到過多次有用戶嘗試把每個組內的數據通過parallelize轉化為RDD,然後再進行一些計算。但,最終都發現會遇到問題。

我們只好遺憾的告訴用戶,你必須重新自己去實現一套單機的、不使用RDD的演算法,目前Spark的抽象里沒法復用(除非你容忍在每個Spark的slave節點上都再重新啟動一個Spark Driver來再發起一堆SparkJob——這一般是不可接受的)。

下圖是我遇到一個用戶遇到問題時,向我們諮詢解決辦法的聊天記錄截圖。

不僅僅是現有庫里的RDD上的函數,還有許多場景是用戶希望自己的某些函數,寫完之後可用於全局計算,也可用於每個組上計算,用戶經常需要實現兩個版本的代碼,例如,通過一個網站的用戶訪問日誌求一個網站的UV(Unique Visitor)的代碼,大約可以寫成(這裡用Python來描述):

def calc_uv(users):
return users.distinct().count()

而在求多個網站每個網站的uv時,用戶需要寫成:

def calc_websites_uv(logs):
return logs.groupByKey().mapValues(lambda users: len(set(users)))

從這個簡單的例子就可以看出,明顯是同一個需求,結果用戶的代碼卻完全不能復用,並不能把calc_uv函數直接傳到mapValues里去使用。

這樣也導致了Spark現有不少介面看起來總顯得有些冗餘,但卻又無法去除,例如,為什麼既要提供reduce、combine、aggregate這些一堆的聚合函數,又要提供reduceByKey、combineByKey、aggregateByKey之類的函數?

其根本原因就是Spark的介面不支持嵌套,groupByKey產生的RDD的value並不是一個RDD,無法直接復用reduce/combine/aggregate達成xxxByKey的效果。

從一個分散式執行引擎的層面來看,Spark RDD的介面已經可以很完備的表達出分散式計算的語義了,但如果它的介面還是太過偏向於計算引擎,設計時對優化、代碼復用等都沒有做太精細的考慮。

在Spark 2.0的Dataset介面中,Spark完成了一定的優化工作,也使得一些聚合函數可復用於全局,亦可用於每個分組,但作用十分有限,只有框架默認提供的幾個聚合函數可以較好的復用,用戶自定義的聚合方法即使可以復用,也因為框架無法了解聚合操作的實現細節,基本上沒有任何的優化。

在流式計算的場景中,除了上述場景外,我們還可以考慮,我實現了一個演算法,我如何才能將其復用於每個窗口之內?例如,如何在流式引擎上統計每10分鐘內每個網站的uv?我們該如何設計才能使得窗口上可以復用之前已經寫好的多網站求uv的程序?

這一切的需求,都可以通過使RDD可嵌套來完美的解決。(未完待續)


RDD不支持嵌套,是Spark目前的工作機制導致的。

Spark目前的工作機制是這樣的:

可以看到Spark的原理就是Driver把RDD分割成若干塊,並分配Executor去處理每個塊。如果我們需要RDD實現嵌套功能,勢必選擇以下2種方法之一:

(1)每個Executor同時扮演Driver的角色,把子RDD再dispatch給別的Executor。也就是說在Executor在處理數據塊的同時還要跑Spark Driver的Dispatcher和Shuffle Server。這樣一來,一方面分散式架構的複雜性大大增加,網路開銷變大,另一方面也消耗了Executor的內存,使得用於execution和storage的內存減少。

(2)Executor把子RDD的元數據傳回Driver,Driver再另起線程partition子RDD。這種方法其實和在RDD上執行flatmap的效果是一樣的。並且flatmap性能會優於這種方法,因為flatmap可以將多個子RDD合在一起處理,這樣既節省了額外Spark stage的開銷,又不會導致Driver處理子RDD的時候Executor守著父RDD的一個partition等在一邊。

總的來說,在目前Spark的架構上實現嵌套RDD效果不會好。

--------------------------

@張雲聰 指出的問題本質上是一個介面問題,Spark是Scala寫的,介面自然是遵循Scala規範。我的想法是可以給python的數據結構實現一些仿造Scala的介面,這樣一來代碼重用就方便很多了。


謝邀,但不得不指出您這樣的問題是很模糊的,以至於難以回答。

您需要指明您的嵌套的具體語境。

是RDD內部數據的嵌套,還是不同RDD數據集的嵌套。以及您要做的嵌套是一種什麼樣的transfer運算元?

以上。


推薦閱讀:

怎樣理解spark中的partition和block的關係?
在Spark集群中,集群的節點個數、RDD分區個數、?cpu內核個數三者與並行度的關係??
1.5版本的Spark自己來管理內存而不是使用JVM,這不使用JVM而是自己來管理內存是咋回事?
還有必要學習Hadoop 么?
大家覺得目前 初學者學數據做hadoop時的集群配置是不是特別麻煩?有沒有一種便捷的方法?

TAG:Spark |