標籤:

數據科學的新生代工具(附實操代碼)

自從「數據科學」進入人們視野以來, 它一直被用來形容處理那些一台電腦裝不下的大數據。所以,處理大數據的能力被認為是數據科學概念的核心。雖然Mapreduce依然是基礎工具,但很多新湧現的有趣工具已經超越了它的基本功能。比如說,Mantel-Haenszel 計量就無法在基本的Mapreduce中運行。Spark和Google Cloud Dataflow正是下一代數據處理體系的代表。本文將結合筆者的第一手經驗和調研對兩者進行比較。

引言

Mapreduce是設計大規模數據(成百上千TB)處理pipeline的方法。在Google框架內外,Hadoop/Mapreduce都已經被廣泛地使用。(對一些人來說, Hadoop就是數據科學的同義詞)。 但是, 書寫複雜的Mapreduce程序很快就會讓人很痛苦。 正是Mapreduce的局限性,讓Spark和Google Cloud Dataflow湧現,並在不同的方面顯現優勢。從兩者最終產品上微妙但明顯的差異上就能看出來:Dataflow將Google框架與複雜批量pipeline(比如FlumeJava)或實時pipeline(比如MillWheel)匹配; 而Spark則是當初加州伯克利設計用來實現探究分析和大規模機器學習的。

六個月前,Google將Cloud Dataflow的程序模型和軟體開發工具包(SDKs)捐給了Apache軟體基金,孵化了Apache Beam。將來,Apache Beam將成為展示數據處理pipeline的工具,而Cloud Dataflow則保留為運行這些pipeline的管理職能。如果說程序模型和API(應用程序界面)是Beam的話,那Cloud Dataflow就是運營GCP的服務。本文的代碼示例使用當前Dataflow的SDK;Apache Beam使用類似的概念和編程模式。

本文將比較Spark和Beam/ Dataflow的異同。從兩者框架差異 考慮,因為框架的考慮,Spark善於處理探索分析、交互分析和迭代演算法,比如梯度下降法gradient descent 和馬氏鏈蒙特卡羅法MCMC;而Dataflow的優勢則在於處理流數據和高度優化、靈活的數據。文章的其他內容還包括:兩者最重要的不同點和框架的主要差異;舉例如何分別應用Spark和Beam/ Dataflow,以及如何進行流處理;用Spark編寫聚類演算法clustering algorithm的示例。

不滿足於Mapreduce

Mapreduce是一個簡單平行數據處理的優秀工具,但如果想用Mapreduce處理更複雜的pipeline,問題出現了:

· 處理複雜pipeline需要關鍵的樣本文件、獨立的項目包 和層次之間的「界面」

· 在硬碟中寫入pipeline層次之間的交互結果是一個瓶 頸,這要求使用者手動優化這些分支的設置

· 運行探究分析要求同時讀寫硬碟,這會變得很慢

· 不支持流pipeline(原始數據來源,極少延遲)

· 編寫多層次pipeline時很容易被卡住,比如用曼特爾― 亨塞爾法(Mantel-Haenszel)衡量一個隨機實驗 參數和指標有效性(參見下文)

Spark和Beam/ Dataflow 通過將Mapreduce步驟的結果儲存下來,優化和減少步驟,在內存中儲存必要的交互結果,解決了以上的大部分困難。Dataflow中的PCollections和Spark中的RDD(resilient distributed dataset )不需要寫入硬碟,它們只需要重新存回內存里。

核心差異:圖像評估

Spark和Beam最根本的不同在於:Spark只建立必要的計算圖,而Dataflow在優化和發給伺服器或雲端執行之前就建立了完整的計算圖。其他主要的差異也根源於此。所以,Spark很適合用來做交互分析(來自python或scale的解釋器),或用來做機器學習演算法的模型;但用Dataflow就很難實現。澄清一點,不是說Dataflow不能做機器學習——簡單的流演算法是沒問題的。但很多機器學習演算法,比如比如梯度下降法gradient descent 和馬氏鏈蒙特卡羅法MCMC,需要決定在趨同之前需要多少迭代;而Spark就提供了高度的靈活性。

然而,建立完整計算圖的好處是,執行Beampipeline的系統(比如Dataflow)可以優化整個計算圖。更重要的是,從執行中分離計算圖架構使Beam可以重新呈現和語音無關的計算圖。這樣,Dataflow圖可以在其他的分布處理後台中執行,包括Spark。這也是建立Apache Beam項目的初衷。

Beam對整個計算圖進行詳細讀取的另一個影響是,Dataflow將作為一個功能存在。這種「將pipeline作為一種功能運行」的模式將讀取計算圖、輸入數據,並輸出數據。 Cloud Dataflow Service可以看成是一個黑匣子,處理的細節包括使用的虛擬存儲器的數量、每一級甚至總體的層次、任務分配等等。需要的時候,這些信息都可以查看更多的細節。 這是Spark一個很顯著的不同,因為Spark是聚類導向(集群導向, cluster-oriented),而不是任務導向的。在Spark里運行一個固定pipeline時,我們會在建立集群cluster-運行pipeline-重建集群中循環。雖然有很多工具(比如 Cloud Dataproc on GCP 或 Databricks on AWS)簡化這個過程,但用戶依然非常重視集群管理。

超越Hadoop/Mapreduce

為了更好地比較這兩個新工具,我將演示如何用兩種方法解決同一個問題,並且突出它們的不同。假設我們擁有一個產品眾多的在線拍賣網站。客戶瀏覽產品圖片,查看拍品描述,並決定是否競標。因為這是一個競拍網站,所以每一個產品的價格都在實時變化。產品範圍跨度很大,從幾塊錢的Emoji橡皮到幾千元的自動咖啡機都有,所以追蹤產品的一個重要方法是按產品目錄分類。

我們將做一個隨機控制實驗,來評估買賣中高像素圖片的有效率。比如,我們希望知道控制組對平均售價的影響。最簡單的做法是比較控制組和參照組的平均售價。但這可能會得到誤導的結果—因為就算在每一樣商品的價格保持不變、且只有產品組被影響的情況下,這樣的比較肯定會產生有差異的結果。

具體一點,假設高像素圖片為咖啡機帶來的銷量比Emoji套裝的多。曼特爾―亨塞爾法(Mantel-Haenszel,或Cochran-Mantel-Haenszel) , 正是通過平衡測試組和參照組中的變數來解決這個問題。基本的概念,是分別計算測試組和參照組中每一個獨立樣品(比如每一副圖片)的平均價格,然後得到測試組相對於參照組的價格權重組合。 MH的核心是,這種權重組合對於測試組和參照組都有效,並且因為篩選方法一致而最大程度地減少了其他變數的影響。

預測法和延伸應用在醫藥界的計數數據中被廣泛應用,Google的數據科學裡更是無處不在。 將來我們還會專門討論在Google分析中的應用和擴展。現在,讓我們先看這個非技術數據的價格案例。我們以產品目錄(i)為分類,測試組和參照組分別售出nti和nci, MH預測法對兩組間價格交叉調整的公式為:

權重wi是nti和nci的調和平均值。簡化計算時我們可以將公式寫為:

記得排除nti+nci=0的情況。以下是一個csv的架構:exp_id, product_id, price, sale_count 。

如果要用Mapreduce來計算MH價格指數,需要3次程序:

這很可能會得到一組龐大的code,因為一個基礎的計數就需要接近60行的標準java code。感興趣的讀者可以看看延伸閱讀:Hadoop/Mapreduce versus Spark,比如 Dattamsha上的這篇文章 Hadoop/MR vs Spark/RDD WordCount program。

用python寫MH

相比這些乏味的code,更喜歡python的朋友可以看這個github在jupyter notebook soluteon的例子Spark vs Dataflow。為了更好地比較,我從code中提取出兩個商業邏輯,分別是 calc_numerator calc_denominator;這兩個功能可以計算上述公式里的數量、特定商品的價格數據,並計算分子和分母。

用Apache Spark寫MH

打開Spark shell,為PATH添加組件,在pySpark/shell。py運行execfile(更多detail查看notebook)。然後編寫Spark代碼:

from operator import add

# We want to calculate MH(v_{t,i},n_{t,i},v_{c,i},n_{c,i}), where t and c are treatment

# and control. v and n in our cases are value of the sale prices and sale_count.

input_rdd = sc.textFile(sim_data_{0}_{1}.csv.format(NUM_LE, NUM_HE))

header = input_rdd.first() # Remove the first line.

parsed_input_rdd = input_rdd.filter(lambda x: x !=header)

.map(lambda x: convert_line(x.split(,)))

transformed = parsed_input_rdd.map(lambda x: ((x[exp], x[prod]),

(x[sale_count]*x[price], x[sale_count])))

(sp, clks) = (0, 1) # sale price and sale_count

(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count

(exp2, prod2) = (0, 1) # exp_id, product_id

# For each product cross exp_id, sum the sale prices and sale_count

grouped_result = transformed.reduceByKey(lambda x,y: (x[sp]+y[sp], x[clks]+y[clks]))

grouped_by_product = grouped_result.map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[spc][sp], x[spc][clks]))).groupByKey()

numerator_sum = grouped_by_product.map(lambda x: calc_numerator(x)).reduce(add)

denominator_sum = grouped_by_product.map(lambda x: calc_denominator(x)).reduce(add)

effect = numerator_sum / denominator_sum

print(numerator_sum, denominator_sum, effect)

Spark的功能有兩種:transformations處理文檔中(比如地圖或文件夾)中記錄的數據,actions讓數據被識別(比如利用groupByKey, reduceByKey)。核心的區別就是是transformations只在覆蓋到的地方執行,其他的部分不會產生直接的影響,這也是我們說Spark進行『懶惰評估』

除了商業邏輯,程序只運行很少的幾行代碼:

  • sc.textFile(...) transforms a file on disk into an RDD (the distributed data structure in Spark)

  • input_rdd.first() acts on the RDD returning first, header, element to the driver (my notebook).

  • input_rdd.filter(...).map(...) transforms input_rdd removing the header then converts each csv line into floats and ints.

  • parsed_input_rdd.map(...) transforms records into key-value tuples ((exp_id, product_id), (cost, clicks))

  • transformed.reduceByKey(...) acts on transformed causing input_rdd.filter(...).map(...) and parsed_input_rdd.map(...) to be executed and produces the total clicks and cost by (exp_id, product_id)

  • grouped_result.map(...).groupByKey() acts to produce the same data, only grouped by product_id instead of product_id and experiment_id.

  • grouped_by_product.map(...).reduce(add) transforms the data per product_id into the numerator and denominator of the MH calculation and then performs the action of summing the results using the add function.

用Apache Beam寫MH

Dataflow跟Spark的的代碼框架有少量的不同,總體上很類似,但卻有很重要的區別。 區別中的一個缺點是,Dataflow至今還沒有一個reduce的總功能,所以我要自己寫代碼(t_sum)。注意這個code用Dataflow SDK (而不是新的Beam SDK)。

Beam在概念上是兩個部分:pipeline建設和pipeline執行。 beam.Pipeline()返回一個pipeline,p,在其上構建(使用beam.Map,beam.GroupByKey等)和p.run()在Dataflow的伺服器或者在本地集群上執行Pipeline。

  • beam.Pipeline(options=pipeline_options) begins constructing a pipeline to run locally.

  • p | beam.io.Read(...) | beam.Filter(...) | beam.Map(...) add reading the file, filtering lines that look like the header (starting with 『#』), converting each line into floats and ints to the graph.

  • parsed_input_rdd | beam.Map(...) adds mapping each record to be keyed by exp_id, product_id to the graph

  • transformed | beam.CombinePerKey(...) | beam.Map(...) | beam.GroupByKey() adds summing clicks and cost by exp_id, product_id and regrouping by product_id to the graph

  • grouped_by_product | beam.Map(...) | beam.CombineGlobally(...) adds calculating the numerator/denominator values and the global sum to the graph

  • numerator_sum | beam.Write(...) adds a sync for the numerator (there is a matching output for the denominator).

  • p.run() optimizes constructed graph and ships the result to be executed (in our case the local machine)

應用的比較

總體來看,兩個應用很相似,都利用了相同的平行基礎操作。不同的是Spark只在行動(比如reduceByKey)中執行圖片; 而Beam(比如CloudDataflow), 在運行的同時執行了整個圖片。 另一個不同是,Dataflow要求有sources和sinks,這意味著結果必須用pipeline輸出到文件中,而不能輸出到發出指令的程序(除非使用本地運行器)。

Spark的優勢:整合演算法

起初Spark是探索性研究和機器學習的解決方案。最簡單的展示就是集群技術k-means集群。 K-means的基本原理是不斷地重複兩個步驟:分配每一個聚點到集群中,然後以此更新集群中心。直到集群中心被最終固定下來。以下就是用Spark寫的核心演算法。

# Load the data, remove the first line, and pick initial locations for the clusters.

# We put the x,y points in pt_rdd, and RDD of PtAgg (a class containing x, y,

# and count).

pt_rdd = parsed_input_rdd.map(lambda x: PtAgg(x[0], x[1], 1))

MAX_STEPS = 100; MIN_DELTA = 0.001; delta = 1.0; step = 0

while delta > MIN_DELTA and step < MAX_STEPS:

step += 1

c_centers_old = copy.deepcopy(c_centers)

b_c_centers = sc.broadcast(c_centers_old)

# For every point, find the cluster its closer to and add to its total x, y, and count

totals = pt_rdd.map(lambda x: pick_closest_center(x, b_c_centers.value)).reduce(

lambda a,b: center_reduce(a,b))

# Now update the location of the centers as the mean of all of the points closest to it

# (unless there are none, in which case pick a new random spot).

c_centers = [t.Normalize() if t.cnt != 0 else random_point_location() for t in totals]

# compute the distance that each cluster center moves, the set the max of those as

# the delta used to the stop condition.

deltas = [math.sqrt(c.DistSqr(c_old)) for c, c_old in zip(c_centers, c_centers_old)]

delta = max(deltas)

s = .join([str(x) for x in c_centers])

print(final centers: {0}.format(s))

在這過程中,我不需要真的寫這些代碼,因為Spark有ML library(mllib),提供了包括k-means在內的很多演算法。實際上,k-means只是很小的一個功能。

在表徵上,最大的不同就是: Beam不能表達迭代演算法,包括k-means。這是因為Beam建立、優化並輸出了整個圖片。在迭代演算法中,不能事先知道整個圖片框架(不知道要建立多少步驟),所以不能在里運行「loops」。用Dataflow表達loops是可以的,但只局限於添加固定數目。

Beam/Dataflow的優勢:流處理

對於數據科學,流處理是一個越來越重要的話題。畢竟,誰願意等日常輸送pipeline告訴我們實時數據到底表現得怎麼樣。關於流處理入門,我強烈推薦Tyler Akidau的兩篇博客 The world beyond batch: Streaming 101和 102。

當Dataflow發布時,它最吸引人的賣點就是流處理。不是說Spark不支持或缺少應對大量的流數據的統一處理能力。Dataflow的重要步驟是:

1. 統一的批量和流API。

2. 支持處理實時數據的事件,也就是在它們發生的 時候就分析結果,而不是等到數據到達分析機器 的時候 。

3. 水印(一個追蹤無界數據收集過程的方法)和窗口提 示完成進度(即收集完畢該階段的所有數據)。

事實上,Dataflow,現在是Beam,提供一個流API,解決了流處理問題的核心:what,where,when,how。 現在有不同的支持等級,這是沒有意義的,比如Dataflow支持Java的流處理,但不支持Python的。

從Dataflow團隊最近在blog上公布的例子就能很容易看出這一點。他們通過計算一個在線遊戲每小時的團隊積分來比較這兩者的不同。

其他框架"哲學"的不同

有幾個設計上的特點可以看出Spark鐘意默認的快速開發,並要求用戶進行選擇性執行,而Dataflow選擇以稍慢的開發時間來換取更好的表現。

超高速緩存caching

Dataflow通過"熔合"相同輸入的同級層次來避免了caching。 熔合帶來的局限性是不應該使用不穩定的類型,因為熔斷的層次之間共享相同的數據,這樣調和會產生無效的結果(可以通過run-time來核實)。 這意味著寫正確的Beam代碼需要對不穩定的類型格外注意。Spark默認雙執行,比較慢但不需要擔心正確的問題。Spark也支持不同類型的超高速緩衝方法。

等式

在GroupByKey和其他步驟中,Dataflow使用快速的、非語言的、程序化的byte級別的等式來比較classes。如果程序使用組合運營來管理classes,這將會是一個問題,Hashmaps或導致語義等式跟byte等式不一樣的結果的任何東西都可能是原因。Spark並行classes,默認用class』s comparison operator做grouping,允許用戶選擇byte級別的等式。

總體表現

很遺憾,現在我們還不能下定論說哪一個框架比較好。唯一一個比較兩者的文章使用的是比較舊的版本Spark1.3,但Spark 2.0有了顯著的更新,所以還不清楚現在哪一個系統的表現比較好。

總結

Spark和Dataflow從不同方面解決了Mapreduce的不足,哪一個系統更適合得取決於項目本身。如果是交互性,數據探索或演算法開發方面的,Spark可能是更好的選擇。如果處理複雜的流計算(特別是實時事件),Beam可能會表現更好。而對於運行大規模生產pipeline,Beam on Cloud Dataflow也許更適合。

原文: David Adams

翻譯:胡丁凡

編輯:Yvette Niu


推薦閱讀:

治療癌症的有效解藥--基因數據
若想修鍊成數據科學家,最重要的技能居然是...?
健身應用暴露了美軍秘密基位置
R語言實戰—04數據基本管理
安客誠成為阿里數據銀行首批認證服務商 助力數據營銷新生態

TAG:數據 |