標籤:

spark combinebykey?

有這樣的數據

(a,(b,c))

(a,(d,f))

用combinebykey做成groupbykey的效果。請問combinebykey代碼怎麼寫


本想來求助下知乎關於combineByValue()的問題,樓上的答案幫助並不大...

結果自己研究出來了.

我覺得重點是搞清楚每一步誰是x誰是y

# Lighting-Fast Big Data Analysis - OReilly

Example 4-12. Per-key average using combineByKey() in Python

sumCount = nums.combineByKey((lambda x: (x,1)),

(lambda x, y: (x[0] + y, x[1] + 1)),

(lambda x, y: (x[0] + y[0], x[1] + y[1])))

sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

下面是書上給的圖解和我自己的理解


pyspark中combineByKey的兩種理解方法


def groupByKey(self, numPartitions=None):
"""
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions.

Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey will provide much better
performance.

&>&>&> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
&>&>&> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[(a, [1, 1]), (b, [1])]
"""

def createCombiner(x):
return [x]

def mergeValue(xs, x):
xs.append(x)
return xs

def mergeCombiners(a, b):
return a + b

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions).mapValues(lambda x: ResultIterable(x))

上面的代碼摘自 Spark 1.0.0 的 PySpark,不過最新版中針對 GroupByKey 做優化了


推薦閱讀:

如何在 Spark 機器學習中應用 scikit-learn?
請教一個spark的小白問題?
大數據分析美國大選——Twitter數據情感分析
如何看待類似Spark亞太研究院的王家林打著開源旗號賺錢的行為?

TAG:Spark |