x,..." />
標籤:

請教Spark 中 combinebyKey 和 reduceByKey的傳入函數參數的區別?

代碼如下
val testData = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val testDataCombine = testData.combineByKey(x=&>x,(x:Int,y:Int)=&>x+y,(x:Int,y:Int)=&>x+y)
val testDataReduce = testData.reduceByKey((x,y)=&>x+y)

這兩個函數 在combineByKey 傳入函數參數的時候
為什麼一定要指定x ,y的Int類型,如果不指定會報錯沒發識別"+"這個方法符,
而reduceByKey的時候可以不用指定而進行系統推斷。


題主示例代碼中 testData 這個 RDD 的類型是已經確定為 RDD[(String, Int)],然後通過 RDD.rddToRDDPairFunctions 這個隱式類型轉換轉為 PairRDDFunctions[String, Int],從而獲得 reduceByKey 和 combineByKey 這兩個 methods。

然後來對比下二者的函數簽名:

class PairRDDFunctions[K, V](...) {
def reduceByKey(func: (V, V) =&> V): RDD[(K, V)]

def combineByKey[C](
createCombiner: V =&> C,
mergeValue: (C, V) =&> C,
mergeCombiners: (C, C) =&> C): RDD[(K, C)]
}

可以看到 reduceByKey 的 func 參數的類型只依賴於 PairRDDFunction 的類型參數 V,在這個例子里也就是 Int。於是 func 的類型已經確定為 (Int, Int) =&> Int,所以就不需要額外標識類型了。

而 combineByKey 比 reduceByKey 更加通用,它允許各個 partition 在 shuffle 前先做 local reduce 得到一個類型為 C 的中間值,待 shuffle 後再做合併得到各個 key 對應的 C。

以求均值為例,我們可以讓每個 partiton 先求出單個 partition 內各個 key 對應的所有整數的和 sum 以及個數 count,然後返回一個 pair (sum, count)。在 shuffle 後累加各個 key 對應的所有 sum 和 count,再相除得到均值:

val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) =&> (0, 0L),

(pair: (Int, Long), value: Int) =&>
(pair._1 + value, pair._2 + 1L),

(pair1: (Int, Long), pair2: (Int, Long)) =&>
(pair1._1 + part2._1, pair2._2 + pair2._2)
)

val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) =&> 0D
case (sum, count) =&> sum.toDouble / count
}

由於 C 這個 類型參數是任意的,並不能從 testData 的類型直接推導出來,所以必須明確指定。只不過題主的例子是最簡單的用 reduceByKey 就可以搞定的情況,也就是 V 和 C 完全相同,於是就看不出區別了。


推薦閱讀:

TAG:Scala | Spark |