與 Hadoop 對比,如何看待 Spark 技術?
最近公司邀請來王家林老師來做培訓,其浮誇的授課方式略接受不了。其強烈推崇Spark技術,宣稱Spark是大數據的未來,同時宣布了Hadoop的死刑。
那麼與Hadoop相比,Spark技術如何?現工業界大數據技術都在使用何種技術?-----------補充------------
希望大家能將關注點放在Spark上。另Spark圈內應該會有人對此人有了解。此人在51CTO上有100期的課程,並號稱7歲接觸代碼,現年28歲,2010年閱讀完Android源碼後,專註大數據。王家林簡介摘自百度百科
王家林:Spark亞太研究院院長和首席專家,中國目前唯一移動互聯網和雲計算大數據集大成者。Android架構師、高級工程師、諮詢顧問、培訓專家;通曉Android、HTML5、Hadoop,迷戀英語播音和健美;致力於Android、HTML5、Hadoop的軟、硬、雲整合的一站式解決方案。
Hadoop
首先看一下Hadoop解決了什麼問題,Hadoop就是解決了大數據(大到一台計算機無法進行存儲,一台計算機無法在要求的時間內進行處理)的可靠存儲和處理。
- HDFS,在由普通PC組成的集群上提供高可靠的文件存儲,通過將塊保存多個副本的辦法解決伺服器或硬碟壞掉的問題。
- MapReduce,通過簡單的Mapper和Reducer的抽象提供一個編程模型,可以在一個由幾十台上百台的PC組成的不可靠集群上並發地,分散式地處理大量的數據集,而把並發、分散式(如機器間通信)和故障恢復等計算細節隱藏起來。而Mapper和Reducer的抽象,又是各種各樣的複雜數據處理都可以分解為的基本元素。這樣,複雜的數據處理可以分解為由多個Job(包含一個Mapper和一個Reducer)組成的有向無環圖(DAG),然後每個Mapper和Reducer放到Hadoop集群上執行,就可以得出結果。
(圖片來源:http://www.slideshare.net/davidengfer/intro-to-the-hadoop-stack-javamug)
用MapReduce統計一個文本文件中單詞出現的頻率的示例WordCount請參見:WordCount - Hadoop Wiki,如果對MapReduce不恨熟悉,通過該示例對MapReduce進行一些了解對理解下文有幫助。
在MapReduce中,Shuffle是一個非常重要的過程,正是有了看不見的Shuffle過程,才可以使在MapReduce之上寫數據處理的開發者完全感知不到分散式和並發的存在。
(圖片來源: Hadoop Definitive Guide By Tom White)
廣義的Shuffle是指圖中在Map和Reuce之間的一系列過程。Hadoop的局限和不足
但是,MapRecue存在以下局限,使用起來比較困難。
- 抽象層次低,需要手工編寫代碼來完成,使用上難以上手。
- 只提供兩個操作,Map和Reduce,表達力欠缺。
- 一個Job只有Map和Reduce兩個階段(Phase),複雜的計算需要大量的Job完成,Job之間的依賴關係是由開發者自己管理的。
- 處理邏輯隱藏在代碼細節中,沒有整體邏輯
- 中間結果也放在HDFS文件系統中
- ReduceTask需要等待所有MapTask都完成後才可以開始
- 時延高,只適用Batch數據處理,對於互動式數據處理,實時數據處理的支持不夠
- 對於迭代式數據處理性能比較差
比如說,用MapReduce實現兩個表的Join都是一個很有技巧性的過程,如下圖所示:
(圖片來源:Real World Hadoop)
因此,在Hadoop推出之後,出現了很多相關的技術對其中的局限進行改進,如Pig,Cascading,JAQL,OOzie,Tez,Spark等。
Apache PigApache Pig也是Hadoop框架中的一部分,Pig提供類SQL語言(Pig Latin)通過MapReduce來處理大規模半結構化數據。而Pig Latin是更高級的過程語言,通過將MapReduce中的設計模式抽象為操作,如Filter,GroupBy,Join,OrderBy,由這些操作組成有向無環圖(DAG)。例如如下程序:
visits = load 『/data/visits』 as (user, url, time);
gVisits = group visits by url;
visitCounts = foreach gVisits generate url, count(visits);
urlInfo = load 『/data/urlInfo』 as (url, category, pRank);
visitCounts = join visitCounts by url, urlInfo by url;
gCategories = group visitCounts by category;
topUrls = foreach gCategories generate top(visitCounts,10);
store topUrls into 『/data/topUrls』;
描述了數據處理的整個過程。
而Pig Latin又是通過編譯為MapReduce,在Hadoop集群上執行的。上述程序被編譯成MapReduce時,會產生如下圖所示的Map和Reduce:
(圖片來源:http://cs.nyu.edu/courses/Fall12/CSCI-GA.2434-001/sigmod08-pig-latin.ppt)
Apache Pig解決了MapReduce存在的大量手寫代碼,語義隱藏,提供操作種類少的問題。類似的項目還有Cascading,JAQL等。
Apache Tez
Apache Tez,Tez是HortonWorks的Stinger Initiative的的一部分。作為執行引擎,Tez也提供了有向無環圖(DAG),DAG由頂點(Vertex)和邊(Edge)組成,Edge是對數據的移動的抽象,提供了One-To-One,BroadCast,和Scatter-Gather三種類型,只有Scatter-Gather才需要進行Shuffle。
SELECT a.state, COUNT(*),
AVERAGE(c.price)
FROM a
JOIN b ON (a.id = b.id)
JOIN c ON (a.itemId = c.itemId)
GROUP BY a.state
(圖片來源:http://www.slideshare.net/hortonworks/apache-tez-accelerating-hadoop-query-processing)
途中藍色方塊表示Map,綠色方塊表示Reduce,雲狀表示寫屏障(write barrier,一種內核機制,可以理解為持久的寫),Tez的優化主要體現在:
- 去除了連續兩個作業之間的寫屏障
- 去除了每個工作流中多餘的Map階段(Stage)
通過提供DAG語義和操作,提供了整體的邏輯,通過減少不必要的操作,Tez提升了數據處理的執行性能。
Apache Spark
Apache Spark是一個新興的大數據處理的引擎,主要特點是提供了一個集群的分散式內存抽象,以支持需要工作集的應用。
這個抽象就是RDD(Resilient Distributed Dataset),RDD就是一個不可變的帶分區的記錄集合,RDD也是Spark中的編程模型。Spark提供了RDD上的兩類操作,轉換和動作。轉換是用來定義一個新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等,動作是返回一個結果,包括collect, reduce, count, save, lookupKey。
Spark的API非常簡單易用,Spark的WordCount的示例如下所示:val spark = new SparkContext(master, appName, [sparkHome], [jars])
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line =&> line.split(" "))
.map(word =&> (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
其中的file是根據HDFS上的文件創建的RDD,後面的flatMap,map,reduceByKe都創建出一個新的RDD,一個簡短的程序就能夠執行很多個轉換和動作。
在Spark中,所有RDD的轉換都是是惰性求值的。RDD的轉換操作會生成新的RDD,新的RDD的數據依賴於原來的RDD的數據,每個RDD又包含多個分區。那麼一段程序實際上就構造了一個由相互依賴的多個RDD組成的有向無環圖(DAG)。並通過在RDD上執行動作將這個有向無環圖作為一個Job提交給Spark執行。
例如,上面的WordCount程序就會生成如下的DAGscala&> counts.toDebugString
res0: String =
MapPartitionsRDD[7] at reduceByKey at &
ShuffledRDD[6] at reduceByKey at &
MapPartitionsRDD[5] at reduceByKey at &
MappedRDD[4] at map at &
FlatMappedRDD[3] at flatMap at &
MappedRDD[1] at textFile at &
HadoopRDD[0] at textFile at &
Spark對於有向無環圖Job進行調度,確定階段(Stage),分區(Partition),流水線(Pipeline),任務(Task)和緩存(Cache),進行優化,並在Spark集群上運行Job。RDD之間的依賴分為寬依賴(依賴多個分區)和窄依賴(只依賴一個分區),在確定階段時,需要根據寬依賴劃分階段。根據分區劃分任務。
(圖片來源:https://databricks-training.s3.amazonaws.com/slides/advanced-spark-training.pdf)
Spark支持故障恢復的方式也不同,提供兩種方式,Linage,通過數據的血緣關係,再執行一遍前面的處理,Checkpoint,將數據集存儲到持久存儲中。
Spark為迭代式數據處理提供更好的支持。每次迭代的數據可以保存在內存中,而不是寫入文件。
Spark的性能相比Hadoop有很大提升,2014年10月,Spark完成了一個Daytona Gray類別的Sort Benchmark測試,排序完全是在磁碟上進行的,與Hadoop之前的測試的對比結果如表格所示:
(表格來源: Spark officially sets a new record in large-scale sorting)
從表格中可以看出排序100TB的數據(1萬億條數據),Spark只用了Hadoop所用1/10的計算資源,耗時只有Hadoop的1/3。
Spark的優勢不僅體現在性能提升上的,Spark框架為批處理(Spark Core),互動式(Spark SQL),流式(Spark Streaming),機器學習(MLlib),圖計算(GraphX)提供一個統一的數據處理平台,這相對於使用Hadoop有很大優勢。
(圖片來源:https://gigaom.com/2014/06/28/4-reasons-why-spark-could-jolt-hadoop-into-hyperdrive/)
按照Databricks的連城的說法是One Stack To Rule Them All
特別是在有些情況下,你需要進行一些ETL工作,然後訓練一個機器學習的模型,最後進行一些查詢,如果是使用Spark,你可以在一段程序中將這三部分的邏輯完成形成一個大的有向無環圖(DAG),而且Spark會對大的有向無環圖進行整體優化。
例如下面的程序:val points = sqlContext.sql( 「SELECT latitude, longitude FROM historic_tweets」)
val model = KMeans.train(points, 10)
sc.twitterStream(...) .map(t =&> (model.closestCenter(t.location), 1)) .reduceByWindow(「5s」, _ + _)
(示例來源:http://www.slideshare.net/Hadoop_Summit/building-a-unified-data-pipeline-in-apache-spark)
這段程序的第一行是用Spark SQL 查尋出了一些點,第二行是用MLlib中的K-means演算法使用這些點訓練了一個模型,第三行是用Spark Streaming處理流中的消息,使用了訓練好的模型。
Lambda Architecture
Lambda Architecture是一個大數據處理平台的參考模型,如下圖所示:
(圖片來源: Lambda Architecture)
其中包含3層,Batch Layer,Speed Layer和Serving Layer,由於Batch Layer和Speed Layer的數據處理邏輯是一致的,如果用Hadoop作為Batch Layer,而用Storm作為Speed Layer,你需要維護兩份使用不同技術的代碼。而Spark可以作為Lambda Architecture一體化的解決方案,大致如下:
- Batch Layer,HDFS+Spark Core,將實時的增量數據追加到HDFS中,使用Spark Core批量處理全量數據,生成全量數據的視圖。,
- Speed Layer,Spark Streaming來處理實時的增量數據,以較低的時延生成實時數據的視圖。
- Serving Layer,HDFS+Spark SQL(也許還有BlinkDB),存儲Batch Layer和Speed Layer輸出的視圖,提供低時延的即席查詢功能,將批量數據的視圖與實時數據的視圖合併。
總結
如果說,MapReduce是公認的分散式數據處理的低層次抽象,類似邏輯門電路中的與門,或門和非門,那麼Spark的RDD就是分散式大數據處理的高層次抽象,類似邏輯電路中的編碼器或解碼器等。
RDD就是一個分散式的數據集合(Collection),對這個集合的任何操作都可以像函數式編程中操作內存中的集合一樣直觀、簡便,但集合操作的實現確是在後台分解成一系列Task發送到幾十台上百台伺服器組成的集群上完成的。最近新推出的大數據處理框架Apache Flink也使用數據集(Data Set)和其上的操作作為編程模型的。
由RDD組成的有向無環圖(DAG)的執行是調度程序將其生成物理計劃並進行優化,然後在Spark集群上執行的。Spark還提供了一個類似於MapReduce的執行引擎,該引擎更多地使用內存,而不是磁碟,得到了更好的執行性能。
那麼Spark解決了Hadoop的哪些問題呢?- 抽象層次低,需要手工編寫代碼來完成,使用上難以上手。
- =&>基於RDD的抽象,實數據處理邏輯的代碼非常簡短。。
- 只提供兩個操作,Map和Reduce,表達力欠缺。
- =&>提供很多轉換和動作,很多基本操作如Join,GroupBy已經在RDD轉換和動作中實現。
- 一個Job只有Map和Reduce兩個階段(Phase),複雜的計算需要大量的Job完成,Job之間的依賴關係是由開發者自己管理的。
- =&>一個Job可以包含RDD的多個轉換操作,在調度時可以生成多個階段(Stage),而且如果多個map操作的RDD的分區不變,是可以放在同一個Task中進行。
- 處理邏輯隱藏在代碼細節中,沒有整體邏輯
- =&>在Scala中,通過匿名函數和高階函數,RDD的轉換支持流式API,可以提供處理邏輯的整體視圖。代碼不包含具體操作的實現細節,邏輯更清晰。
- 中間結果也放在HDFS文件系統中
- =&>中間結果放在內存中,內存放不下了會寫入本地磁碟,而不是HDFS。
- ReduceTask需要等待所有MapTask都完成後才可以開始
- =&> 分區相同的轉換構成流水線放在一個Task中運行,分區不同的轉換需要Shuffle,被劃分到不同的Stage中,需要等待前面的Stage完成後才可以開始。
- 時延高,只適用Batch數據處理,對於互動式數據處理,實時數據處理的支持不夠
- =&>通過將流拆成小的batch提供Discretized Stream處理流數據。
- 對於迭代式數據處理性能比較差
- =&>通過在內存中緩存數據,提高迭代式計算的性能。
因此,Hadoop MapReduce會被新一代的大數據處理平台替代是技術發展的趨勢,而在新一代的大數據處理平台中,Spark目前得到了最廣泛的認可和支持,從參加Spark Summit 2014的廠商的各種基於Spark平台進行的開發就可以看出一二。
我本人是類似Hive平台的系統工程師,我對MapReduce的熟悉程度是一般,它是我的底層框架。我隔壁組在實驗Spark,想將一部分計算遷移到Spark上。
年初的時候,看Spark的評價,幾乎一致表示,Spark是小數據集上處理複雜迭代的交互系統,並不擅長大數據集,也沒有穩定性。但是最近的風評已經變化,尤其是14年10月他們完成了Peta sort的實驗,這標誌著Spark越來越接近替代Hadoop MapReduce了。
Spark the fastest open source engine for sorting a petabyte
Sort和Shuffle是MapReduce上最核心的操作之一,比如上千個Mapper之後,按照Key將數據集分發到對應的Reducer上,要走一個複雜的過程,要平衡各種因素。Spark能處理Peta sort的話,本質上已經沒有什麼能阻止它處理Peta級別的數據了。這差不多遠超大多數公司單次Job所需要處理的數據上限了。
回到本題,來說說Hadoop和Spark。Hadoop包括Yarn和HDFS以及MapReduce,說Spark代替Hadoop應該說是代替MapReduce。
MapReduce的缺陷很多,最大的缺陷之一是Map + Reduce的模型。這個模型並不適合描述複雜的數據處理過程。很多公司(包括我們)把各種奇怪的Machine Learning計算用MR模型描述,不斷挖(lan)掘(yong)MR潛力,對系統工程師和Ops也是極大挑戰了。很多計算,本質上並不是一個Map,Shuffle再Reduce的結構,比如我編譯一個SubQuery的SQL,每個Query都做一次Group By,我可能需要Map,Reduce+Reduce,中間不希望有無用的Map;又或者我需要Join,這對MapReduce來說簡直是噩夢,什麼給左右表加標籤,小表用Distributed Cache分發,各種不同Join的Hack,都是因為MapReduce本身是不直接支持Join的,其實我需要的是,兩組不同的計算節點掃描了數據之後按照Key分發數據到下一個階段再計算,就這麼簡單的規則而已;再或者我要表示一組複雜的數據Pipeline,數據在一個無數節點組成的圖上流動,而因為MapReduce的呆板模型,我必須一次一次在一個Map/Reduce步驟完成之後不必要地把數據寫到磁碟上再讀出,才能繼續下一個節點,因為Map Reduce2個階段完成之後,就算是一個獨立計算步驟完成,必定會寫到磁碟上等待下一個Map Reduce計算。
上面這些問題,算是每個號稱下一代平台都嘗試解決的。
現在號稱次世代平台現在做的相對有前景的是Hortonworks的Tez和Databricks的Spark。他們都嘗試解決了上面說的那些問題。Tez和Spark都可以很自由地描述一個Job里執行流(所謂DAG,有向無環圖)。他們相對現在的MapReduce模型來說,極大的提升了對各種複雜處理的直接支持,不需要再絞盡腦汁「挖掘」MR模型的潛力。
有興趣的童鞋可以看看這個PPT
http://www.slideshare.net/Hadoop_Summit/w-235phall1pandey
這是Hadoop峰會上Tez的材料,第九頁開始有描述Hive on Tez和傳統MR Hive的區別,這些區別應該也適用於MR Hive和Spark SQL,也很清楚的體現了為何MR模型很笨重。
相比Tez,Spark加入了更多內存Cache操作,但據了解它也是可以不Cache直接處理的,只是效率就會下降。
再說Programming Interface,Tez的Interface更像MapReduce,但是允許你定義各種Edge來連接不同邏輯節點。Spark則利用了Functional Programming的理念,API十分簡潔,相比MR和Tez簡單到令人髮指。我不清楚Spark如果要表現複雜的DAG會不會也變得很麻煩,但是至少wordcount的例子看起來是這樣的,大家可以比較感受下:
incubator-tez/WordCount.java at master · apache/incubator-tez · GitHub
Examples | Apache Spark
處理大規模數據而言,他們都需要更多proven cases。至少Hadoop MapReduce是被證明可行的。
作為Data Pipeline引擎來說,MapReduce每個步驟都會存檔,而Spark和Tez可以直接網路發送到下一個步驟,速度上是相差很多的,但是存檔的好處是允許繼續在失敗的數據上繼續跑,所以直觀上說MapReduce作為pipeline引擎更穩健。但理論上來說,如果選擇在每個完成的小步驟上加CheckPoint,那Tez和Spark完全能和現在的MapReduce達到一樣的穩健。
總結來說,即便現在不成熟,但是並沒有什麼阻礙他們代替現有的MapReduce Batch Process。對Tez而言,似乎商業上宣傳不如Spark成功。Databricks頭頂Berkley的光環,商業宣傳又十分老道,陣營增長極快。光就系統設計理念,沒有太大的優劣,但是商業上可能會拉開差距。Cloudera也加入了Spark陣營,以及很多其他大小公司,可以預見的是,Spark會成熟的很快,相比Tez。
但Tez對於Hortonworks來說是贏取白富美的關鍵,相信為了幸福他們也必須努力打磨推廣tez。
所以就算現在各家試用會有種種問題,但是畢竟現在也就出現了2個看起來有戲的「次世代」平台,那慢慢試用,不斷觀望,逐步替換,會是大多數公司的策略。
hadoop和Spark是兩種不同的大數據處理框架,他們的組件都非常多,往往也不容易學,我把他們兩者整理在一幅圖中,給大家一個全貌的感覺。至於各組件的詳細介紹、相關聯繫和區別,以及它們在大數據平台建設中的具體實施關注點,待點贊數達到1000,我再對帖子進行詳細的更新,請大家隨手幫忙點個贊。
以上這些大數據組件是日常大數據工作中經常會碰到的,每個組件大概的功能,我已經在圖中做了標識。下面,針對這幅圖我給大家兩點重要提示:
a.藍色部分,是Hadoop生態系統組件,黃色部分是Spark生態組件,雖然他們是兩種不同的大數據處理框架,但它們不是互斥的,Spark與hadoop 中的MapReduce是一種相互共生的關係。Hadoop提供了Spark許多沒有的功能,比如分散式文件系統,而Spark 提供了實時內存計算,速度非常快。有一點大家要注意,Spark並不是一定要依附於Hadoop才能生存,除了Hadoop的HDFS,還可以基於其他的雲平台,當然啦,大家一致認為Spark與Hadoop配合默契最好擺了。
b.技術趨勢:Spark在崛起,hadoop和Storm中的一些組件在消退。大家在學習使用相關技術的時候,記得與時俱進掌握好新的趨勢、新的替代技術,以保持自己的職業競爭力。
HSQL未來可能會被Spark SQL替代,現在很多企業都是HIVE SQL和Spark SQL兩種工具共存,當Spark SQL逐步成熟的時候,就有可能替換HSQL;
MapReduce也有可能被Spark 替換,趨勢是這樣,但目前Spark還不夠成熟穩定,還有比較長的路要走;
Hadoop中的演算法庫Mahout正被Spark中的演算法庫MLib所替代,為了不落後,大家注意去學習Mlib演算法庫;
Storm會被Spark Streaming替換嗎?在這裡,Storm雖然不是hadoop生態中的一員,但我仍然想把它放在一起做過比較。由於Spark和hadoop天衣無縫的結合,Spark在逐步的走向成熟和穩定,其生態組件也在逐步的完善,是冉冉升起的新星,我相信Storm會逐步被擠壓而走向衰退。
歡迎大家關注我的知乎專欄「大數據實踐與職業生涯」並留言,專欄會陸續的推出過往十多年的大數據工作經驗總結和我的一些研究實踐成果。如果你是大數據新人,或者想轉行進入大數據領域,或者大數據職業生涯上存在一些疑惑,都歡迎關注我的知乎live分享「大數據人的職業生涯規劃」 、 「數據分析師-從零入門到精通」、「大數據人的數據科學家之路」。
Spark 的優點,排名第一(用心閣)的已經說得很好了
根據最近走讀spark core/shuffle, core/scheduler and core/rdd 的部分代碼來說說自己的感受
1. spark 是hadoop mapreduce 的不斷改進,同時又兼容並包了很多資料庫裡面的一些基本思想來實現和發展。沒有什麼怪力亂神,什麼內存計算,什麼下一代之類的花哨說法的。spark 是站在hadoop and database 這兩個巨人肩膀上的。
舉個spark 處理迭代計算的Example
2. Spark 版本的PageRank 比Hadoop 快的不是一點點。根本原因就是在每一步迭代的時候,需要做兩個big table euqi-join。hadoop mr 的演算法是要做data shuffle,同時需要把需要計算的數據從hdfs 多次讀出寫入。回溯到5年前,最先發現Hadoop 在處理迭代計算,存在i/o 讀寫浪費的瓶頸的是 vldb10 的 LoopHadoop 的論文,其中的一作是一個中國哥們,現在citation 已經超過400多了。解決的方法就是把多個迭代中不變的數據,cache 下來,下一步計算就不需要從disk 裡面讀寫了,Spark 也是根據類似的idea 可以把需要反覆計算的數據, cache 下來。
3. 接著上面的問題,數據cache 下來之後,why spark 在迭代的時候不需要shuffle 了阿? 因為spark 定義了rdd 之間的dependence 關係,這個關係決定了是否需要shuffle. 比如一個spark 有多個partition, 如果一個rdd 到另外一個rdd 是 全依賴關係,就是說一個partition 的數據,始終在local 計算,或者只是需要去取指定的一個partition 的數據的話,那麼shuffle 就是不需要了。我們可以看到,所有計算的依賴關係都是在計算之前就定義好了。有了 rdd 之間的依賴關係,就是可以得到計算的 logical plan and physical plan, 然後去執行計算. 這就是典型的資料庫的思想。當然rdd 也就是資料庫view 的思想的實現,因為rdd 和view 都是需要的時候再計算的模式,這樣就可以有了計算的pipeline,也完全是資料庫pipeline 的實現的嗎。大家如果看到spark rdd code 裡面到處的iterator,就更明白了 。
4. 回到page rank 的列子,兩個大表(A,B)之間的join, 因為數據已經按照hash patitioner 把數據分塊劃好了。就是A B 數據的key 在一個範圍的已經分入到具體的partition 裡面了,那join的時候,就只是需要對應的partition 作對應的join 就可以的。這樣就避免了數據的shuffle 了的。
5. 其實spark 是一點點一點點的工程和學術的結合基礎上做出來的, 本質就是Hadoop mapreduce 的增強版本。大家可以看到,在理論上都沒有太多新的東西。 所以人家馬鐵大神當年論文也是被拒了好多遍的。但是系統就是這一點點的改進的基礎上做出的。馬鐵的老闆之一是Franklin,F的老闆是 UCI 的大牛Carey, Carey 的老闆大家自己google 吧。UCB的人的確是牛!
6. 最後我覺得比較hadoop vs spark 不是一個很好的比較方式,因為他們都是一個流派的。比較合適的,是比較 MapReduce Vs MPI,因為在MPI 的計算模式和MapReduce 有大的區別。如果在計算傳輸數據量不大的時候,比如單純的numerical 計算的時侯,MPI 都要甩 MapReduce 幾條街。最簡單就是你想想人家超算做了幾十年了,在大的計算集群上的計算上則幾萬核啊,經典的 one to all, all to all, all to one, data grid 都是很妙的方法好吧。但是如果在處理文本啊之類的數據的時候,一個單詞可能出現了很多次,需要大量的數據傳輸,這時候MapReduce shuffle 機制就顯示出優勢來了。當然MPI 沒有考慮 fault tolerance,也是在cloud 環境下,MapReduce 更有效的原因之一。我根據我有限的知識對Hadoop和Spark做一下對比,在附加一點自己的評論就好了。
原生語言:hadoop-JAVA,Spark-scala
評註:雖然可以實現介面,但原生的語言就是好用,如果某人痛恨java,Spark給你一條生路。
計算模型:hadoop-MapReduce,Spark-DAG(有向無環圖)
評註:經常有人說Spark就是內存版的MapReduce,實際上不是的。Spark使用的DAG計算模型可以有效的減少Map和Reduce人物之間傳遞的數據,尤其適合反覆迭代的機器學習場景。而Hadoop則更擅長批處理。不過Tez也是使用的DAG計算模型,他也是Hadoop,明眼人都知道DAG計算模型比MR更好。
評註:spark既可以僅用內存存儲,也可以在HDFS上存儲,即使Spark在HDFS上存儲,DAG計算模型在迭代計算上還是比MR的更有效率。
我並不覺得這兩個及系統又大多的矛盾,只不過Spark一直宣稱比hadoop快而已。實際上從應用場景上區分,Hadoop更適合做批處理,而Spark更適合做需要反覆迭代的機器學習。
主要是先看MapReduce模型有什麼問題?
第一:需要寫很多底層的代碼不夠高效,第二:所有的事情必須要轉化成兩個操作Map/Reduce,這本身就很奇怪,也不能解決所有的情況。
其實Spark出現就是為了解決上面的問題。介紹一些Spark的起源。發自 2010年Berkeley AMPLab,發表在hotCloud 是一個從學術界到工業界的成功典範,也吸引了頂級VC:Andreessen Horowitz的 注資 AMPLab這個實驗室非常厲害,做大數據,雲計算,跟工業界結合很緊密,之前就是他們做Mesos,Hadoop online, 在2013年,這些大牛(MIT最年輕的助理教授)從Berkeley AMPLab出去成立了Databricks。它是用函數式語言Scala編寫,Spark簡單說就是內存計算(包含迭代式計算,DAG計算,流式計算 )框架,之前MapReduce因效率低下大家經常嘲笑,而Spark的出現讓大家很清新。 Reynod 作為Spark核心開發者, 介紹Spark性能超Hadoop百倍,演算法實現僅有其1/10或1/100。
為啥用Spark,最直接的就是快啊,你用Hadoop跑大規模數據幾個小時跑完,這邊才幾十秒,這種變化不僅是數量級的,並且是對你的開發方式翻天覆地的變化,比如你想驗證一個演算法,你也不知道到底效果如何,但如果能在秒級就給你反饋,你可以立馬去調節。其他的如比MapReduce靈活啊,支持迭代的演算法,ad-hoc query, 不需要你費很多力氣花在軟體的搭建上。在去年的Sorting Benchmark上,Spark用了比Hadoop更少的節點在23min跑完了100TB的排序,刷新了之前Hadoop保持的世界紀錄。
這個是跟Hadoop跟Spark在回歸演算法上比較,在Hadoop的世界裡,做迭代計算是非常耗資源,它每次的IO 序列畫代價很大,所以每次迭代需要差不多的等待。而Spark第一次啟動需要載入到內存,之後迭代直接在內存利用中間結果做不落地的運算,所以後期的迭代速度快到可以忽略不計。
這個是著名的Berkeley Data Analytics Stack, 除了Spark,還有Mesos 和Techyon
Mesos:一個分散式環境的資源管理平台,它使得Hadoop、MPI、Spark作業在統一資源管理環境下執行。它對Hadoop2.0支持很好。Twitter,Coursera都在使用。
Tachyon:是一個高容錯的分散式文件系統,允許文件以內存的速度在集群框架中進行可靠的共享,就像Spark和MapReduce那樣。有幸跟項目發起人李浩源聊過幾次,這個項目目前發展非常快,甚至比Spark當時還要驚人。目前到0.6版本,參與開源的規模和版本迭代速度都很快。已經拿到著名VC A16Z 750萬美金的投資,https://www.crunchbase.com/organization/tachyon-networks
最新做出的Spark SQL提出Dataframe介面,看起來確實很有野心!
Spark是基於內存的分散式計算引擎,以處理的高效和穩定著稱。然而在實際的應用開發過程中,開發者還是會遇到種種問題,其中一大類就是和性能相關。在本文中,筆者將結合自身實踐,談談如何儘可能地提高應用程序性能。
分散式計算引擎在調優方面有四個主要關注方向,分別是CPU、內存、網路開銷和I/O,其具體調優目標如下:
- 提高CPU利用率。
- 避免OOM。
- 降低網路開銷。
- 減少I/O操作。
第1章 數據傾斜
數據傾斜意味著某一個或某幾個Partition中的數據量特別的大,這意味著完成針對這幾個Partition的計算需要耗費相當長的時間。
如果大量數據集中到某一個Partition,那麼這個Partition在計算的時候就會成為瓶頸。圖1是Spark應用程序執行並發的示意圖,在Spark中,同一個應用程序的不同Stage是串列執行的,而同一Stage中的不同Task可以並發執行,Task數目由Partition數來決定,如果某一個Partition的數據量特別大,則相應的task完成時間會特別長,由此導致接下來的Stage無法開始,整個Job完成的時間就會非常長。
要避免數據傾斜的出現,一種方法就是選擇合適的key,或者是自己定義相關的partitioner。在Spark中Block使用了ByteBuffer來存儲數據,而ByteBuffer能夠存儲的最大數據量不超過2GB。如果某一個key有大量的數據,那麼在調用cache或persist函數時就會碰到spark-1476這個異常。
下面列出的這些API會導致Shuffle操作,是數據傾斜可能發生的關鍵點所在
- groupByKey
- reduceByKey
- aggregateByKey
- sortByKey
- join
- cogroup
- cartesian
- coalesce
- repartition
- repartitionAndSortWithinPartitions
圖1: Spark任務並發模型
def rdd: RDD[T]
}
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class DemoPairRDD[K &<% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv =&> ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t =&> (t._1._1, t._2))
}
case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =&>
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) =&> iter.map {
case (value, index) =&> (startValues(i) + index.toLong, value)
})
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
})
.map(_._2)
}
}
定義隱式的轉換
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] =
new DemoRDD[T](rdd)
implicit def toDemoPairRDD[K &<% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
在spark-shell中就可以使用了
import RDDConversions._
yourRdd.grouped(5)
第2章 減少網路通信開銷
Spark的Shuffle過程非常消耗資源,Shuffle過程意味著在相應的計算節點,要先將計算結果存儲到磁碟,後續的Stage需要將上一個Stage的結果再次讀入。數據的寫入和讀取意味著Disk I/O操作,與內存操作相比,Disk I/O操作是非常低效的。
使用iostat來查看disk i/o的使用情況,disk i/o操作頻繁一般會伴隨著cpu load很高。
如果數據和計算節點都在同一台機器上,那麼可以避免網路開銷,否則還要加上相應的網路開銷。 使用iftop來查看網路帶寬使用情況,看哪幾個節點之間有大量的網路傳輸。 圖2是Spark節點間數據傳輸的示意圖,Spark Task的計算函數是通過Akka通道由Driver發送到Executor上,而Shuffle的數據則是通過Netty網路介面來實現。由於Akka通道中參數spark.akka.framesize決定了能夠傳輸消息的最大值,所以應該避免在Spark Task中引入超大的局部變數。
圖2: Spark節點間的數據傳輸
第1節 選擇合適的並發數為了提高Spark應用程序的效率,儘可能的提升CPU的利用率。並發數應該是可用CPU物理核數的兩倍。在這裡,並發數過低,CPU得不到充分的利用,並發數過大,由於spark是每一個task都要分發到計算結點,所以任務啟動的開銷會上升。
並發數的修改,通過配置參數來改變spark.default.parallelism,如果是sql的話,可能通過修改spark.sql.shuffle.partitions來修改。
第1項 Repartition vs. Coalescerepartition和coalesce都能實現數據分區的動態調整,但需要注意的是repartition會導致shuffle操作,而coalesce不會。
第2節 reduceByKey vs. groupBygroupBy操作應該儘可能的避免,第一是有可能造成大量的網路開銷,第二是可能導致OOM。以WordCount為例來演示reduceByKey和groupBy的差異
reduceByKey
sc.textFile(「README.md」).map(l=&>l.split(「,」)).map(w=&>(w,1)).reduceByKey(_ + _)
圖3:reduceByKey的Shuffle過程
Shuffle過程如圖2所示
groupByKey
sc.textFile(「README.md」).map(l=&>l.split(「,」)).map(w=&>(w,1)).groupByKey.map(r=&>(r._1,r._2.sum))
圖4:groupByKey的Shuffle過程
建議: 儘可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
假設有一RDD如下所示,求每個key的均值
val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
方法一:reduceByKey
data.map(r=&>(r._1, (r.2,1))).reduceByKey((a,b)=&>(a._1 + b._1, a._2 + b._2)).map(r=&>(r._1,(r._2._1/r._2._2)).foreach(println)
方法二:combineByKeydata.combineByKey(value=&>(value,1), (x:(Double, Int), value:Double)=&> (x._1+value, x._2 + 1), (x:(Double,Int), y:(Double, Int))=&>(x._1 + y._1, x._2 + y._2))
第3節 BroadcastHashJoin vs. ShuffleHashJoin在Join過程中,經常會遇到大表和小表的join. 為了提高效率可以使用BroadcastHashJoin, 預先將小表的內容廣播到各個Executor, 這樣將避免針對小表的Shuffle過程,從而極大的提高運行效率。
其實BroadCastHashJoin核心就是利用了BroadCast函數,如果理解清楚broadcast的優點,就能比較好的明白BroadcastHashJoin的優勢所在。
以下是一個簡單使用broadcast的示常式序。
val lst = 1 to 100 toList
val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2)
val broadcastLst = sc.broadcast(lst)
exampleRDD.filter(i=&>broadcastLst.valuecontains(i)).collect.foreach(println)
第4節 map vs. mapPartitions
有時需要將計算結果存儲到外部資料庫,勢必會建立到外部資料庫的連接。應該儘可能的讓更多的元素共享同一個數據連接而不是每一個元素的處理時都去建立資料庫連接。
在這種情況下,mapPartitions和foreachPartitons將比map操作高效的多。
第5節 數據就地讀取移動計算的開銷遠遠低於移動數據的開銷。
Spark中每個Task都需要相應的輸入數據,因此輸入數據的位置對於Task的性能變得很重要。按照數據獲取的速度來區分,由快到慢分別是:
- PROCESS_LOCAL
- NODE_LOCAL
- RACK_LOCAL
Spark在Task執行的時候會盡優先考慮最快的數據獲取方式,如果想儘可能的在更多的機器上啟動Task,那麼可以通過調低spark.locality.wait的值來實現, 默認值是3s。
除了HDFS,Spark能夠支持的數據源越來越多,如Cassandra, HBase,MongoDB等知名的NoSQL資料庫,隨著Elasticsearch的日漸興起,spark和elasticsearch組合起來提供高速的查詢解決方案也成為一種有益的嘗試。
上述提到的外部數據源面臨的一個相同問題就是如何讓spark快速讀取其中的數據, 儘可能的將計算結點和數據結點部署在一起是達到該目標的基本方法,比如在部署Hadoop集群的時候,可以將HDFS的DataNode和Spark Worker共享一台機器。
以cassandra為例,如果Spark的部署和Cassandra的機器有部分重疊,那麼在讀取Cassandra中數據的時候,通過調低spark.locality.wait就可以在沒有部署Cassandra的機器上啟動Spark Task。
對於Cassandra, 可以在部署Cassandra的機器上部署Spark Worker,需要注意的是Cassandra的compaction操作會極大的消耗CPU,因此在為Spark Worker配置CPU核數時,需要將這些因素綜合在一起進行考慮。
這一部分的代碼邏輯可以參考源碼TaskSetManager::addPendingTask
private def addPendingTask(index: Int, readding: Boolean = false) {
// Utility method that adds `index` to a list only if readding=false or it"s not already there
def addTo(list: ArrayBuffer[Int]) {
if (!readding || !list.contains(index)) {
list += index
}
}
for (loc &<- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =&>
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
case e: HDFSCacheTaskLocation =&> {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =&> {
for (e &<- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
}
case None =&> logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
}
case _ =&> Unit
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack &<- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
}
}
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}
if (!readding) {
allPendingTasks += index // No point scanning this whole list to find the old task there
}
}
如果準備讓spark支持新的存儲源,進而開發相應的RDD,與位置相關的部分就是自定義getPreferredLocations函數,以elasticsearch-hadoop中的EsRDD為例,其代碼實現如下。
override def getPreferredLocations(split: Partition): Seq[String] = {
val esSplit = split.asInstanceOf[EsPartition]
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
}
第6節 序列化
使用好的序列化演算法能夠提高運行速度,同時能夠減少內存的使用。
Spark在Shuffle的時候要將數據先存儲到磁碟中,存儲的內容是經過序列化的。序列化的過程牽涉到兩大基本考慮的因素,一是序列化的速度,二是序列化後內容所佔用的大小。
kryoSerializer與默認的javaSerializer相比,在序列化速度和序列化結果的大小方面都具有極大的優勢。所以建議在應用程序配置中使用KryoSerializer.
spark.serializer org.apache.spark.serializer.KryoSerializer
默認的cache沒有對緩存的對象進行序列化,使用的StorageLevel是MEMORY_ONLY,這意味著要佔用比較大的內存。可以通過指定persist中的參數來對緩存內容進行序列化。
exampleRDD.persist(MEMORY_ONLY_SER)
需要特別指出的是persist函數是等到job執行的時候才會將數據緩存起來,屬於延遲執行; 而unpersist函數則是立即執行,緩存會被立即清除。
更多內容可以訪問 community.qingcloud.com我們公司現在Spark和Hadoop都在用,從我的感受來看,雖然Spark目前還不夠成熟,但是今後一定會代替Hadoop。
1. 相同的演算法,Spark比Hadoop快數倍,如果是一些迭代或者要對數據反覆讀取的演算法,Spark比Hadoop快數十倍至上百倍
2. Spark對於數據的操作種類更多,對於一些比較特殊的計算需求,比如求兩個集合的交集並集,Spark都有函數直接計算,而Hadoop實現這樣的計算無比繁瑣
3. Spark的開發效率比Hadoop高很多
spark相對hadoop編程模型簡單,能進行迭代操作,利用內存(甚至是堆外內存)緩存數據,能進行流水線優化,上層封裝了sql、streaming、mlib、graphx等或成熟或不成熟的框架,明顯有取hadoop而代之的傾向。
特別是最近以十分之一的資源條件打破了Hadoop之前保持的排序紀錄。
利益相關:未來3-5年靠spark吃飯。又看到王家林大忽悠。
Spark是完全可以取代Hadoop MapReduce的計算框架。
Hadoop = HDFS + YARN + MapReduce
YARN負責資源調度,依然發揮重要作用,不可獲取的重要組件之一,Spark也可以跑在它上面,相比Mesos(C++),YARN是Hadoop自帶的,用起來比較方便。
MapReduce計算框架,在Spark面前已失去性能及速度優勢,基本面臨淘汰。
既然是搞大數據的,我們的用數據來說話:
結論:Hadoop是大數據基礎生態(存儲、計算、資源調度)的事實標準,Spark正成為大數據生態中計算組件的事實標準,也就說大數據技術繞不開spark了,成為泛Hadoop不可或缺的一員。
- Hadoop包括HDFS、Mapreduce、YARN三大核心組件,Spark主要解決計算問題,也就是主要用來替代Mapreduce的功能,底層存儲和資源調度仍然使用HDFS、YARN來承載;
- 從兩者的google搜索趨勢看,Spark的搜索趨勢已與Hadoop持平甚至趕超,標誌著其成為了計算部分的事實標準
- 從兩者的百度搜索趨勢看,與Google差不多一致
- 百度的需求圖譜(詞語緊密度),也證明了Spark取代了Mapduce進入了Hadoop新一屆的「常委」,Mapreduce快候補委員都不是了。。。
原文地址:[原]海納百川 有容乃大:SparkR與Docker的機器學習實戰
題圖為美國尼米茲核動力航空母艦
介紹
大數據時代,我們常常面對海量數據而頭疼。作為學統計出身的人,我們想折騰大數據但又不想學習Hadoop或者Java,我們更傾向於把精力放在建模和演算法設計上,SparkR和Docker的完美結合,讓R的計算直接從一架戰鬥機的當兵作戰華麗轉變為一個航空母艦戰鬥群!不僅僅簡化了分散式計算的操作,還簡化了安裝部署的環節,我們只幾乎不需要做什麼改動就可以直接運用R中的data frame進行分散式的計算。
什麼是SparkR參考前文 打造大數據產品:Shiny的Spark之旅,我們可以知道,SparkR是一個為R提供了輕量級的Spark前端的R包。 SparkR提供了一個分散式的data frame數據結構,解決了 R中的data frame只能在單機中使用的瓶頸,它和R中的data frame 一樣支持許多操作,比如select,filter,aggregate等等。(類似dplyr包中的功能)這很好的解決了R的大數據級瓶頸問題。 SparkR也支持分散式的機器學習演算法,比如使用MLib機器學習庫。
什麼是Docker參考前文 打造數據產品的快速原型:Shiny的Docker之旅,我們也可以知道,Docker是一種類似於虛擬機的技術,主要解決標準化快速部署的問題,在Docker中安裝的軟體和主機中的軟體可以完全隔離,並通過Daocloud或者hub.docker.com等雲服務快速建立Docker倉庫,快速復用Docker鏡像。Docker已經不僅僅是DevOps人員手中的神器了,每一個開發者都應該學會如何使用Docker。
為什麼要結合SparkR和DockerSparkR的精髓在於分散式計算,而Docker的精髓在於標準容器的拓展性,SparkR和Docker的組合充分結合了二者各自的優點,將分散式應用底層化繁為簡,為高層計算直接暴露介面,給科學計算節省了大量時間。
部署本文將通過Docker講解如何快速部署SparkR-RStudio容器,並通過一些簡單的機器學習例子展示如何使用這個航母級別的組合拳。
步驟一:安裝Docker和Daocloud由於國內的鏡像質量不夠高,國外的鏡像下載速度比較慢,出於試驗的考慮,建議大家可以嘗試使用Daocloud的鏡像加速服務。
首先,我們需要在Daocloud註冊一個賬號,然後選擇鏡像加速,根據指示選擇主機並安裝Docker和Daocloud加速器。
步驟二:安裝Spark-RStudio感謝 vinicius85 在GitHub上的開源貢獻,為我們已經做好了 Spark1.6+R+RStduio的鏡像,我們利用daocloud加速拉取鏡像。
dao pull vinicius85/spark-rstudio
以daemon形式運行容器,暴露Rstudio-server默認的8787埠, 並持久化docker內的/srv目錄下的所有文件作為通訊。
docker run -d -v /home/docker:/srv -p 8787:8787 --name sparkrstudio vinicius85/sparkr-rstudio
步驟三:配置RStudio登陸賬號
參考前文 R語言工程化實踐:RStudio Server環境快速配置教程
docker exec -d sparkrstudio bash命令表示以daemon形式執行容器中的shell腳本
我們設置一下RStudio-Server的賬號密碼
docker exec -d sparkrstudio bash adduser harryzhu # 設置新用戶名
docker exec -d sparkrstudio bash passwd harryzhu # 設置該用戶的密碼
步驟四:登陸RStudio
ifconfig命令可以查看到Docker當前的IP地址,透過這個IP,我們可以訪問到RStudio-Server。
比如:
查看資源佔用情況
docker stats sparkrstudio
CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O
sparkrstudio 4.50% 481.3 MB / 5.039 GB 9.55% 133.6 kB / 117.4 kB 3.252 MB / 135.2 kB
機器學習示例:
出於演示的考慮,這裡引用並稍微改進了 tcosta 完成的一個邏輯回歸的例子:
初始化使用SparkR之前,我們需要確定,我們的容器內存要在2G以上,如果用AWS的乞丐版套裝,馬上就會報內存不足的錯誤。
Error in sparkR.init(master = "local") :
JVM is not ready after 10 seconds
如果內存不足,可以退出docker並且在虛擬機中重新提高docker的內存和cpu的配置。
# 配置環境變數
Sys.setenv(SPARK_HOME="/opt/spark-1.6.0-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
Sys.setenv(JAVA_HOME="/usr/lib/jvm/java-8-oracle/")
# 載入 SparkR包
library(SparkR)
# 初始化RRD
sc &<- sparkR.init(master = "local")
sqlContext &<- sparkRSQL.init(sc)
# 創建DataFrame
mtcarsDF &<- createDataFrame(sqlContext, mtcars)
head(mtcarsDF)
mpg cyl disp hp drat wt qsec vs am gear carb
1 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
2 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
3 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
4 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
5 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
6 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
邏輯回歸
model &<- glm(vs ~ mpg + disp + hp + wt , data = mtcarsDF, family = "binomial")# 邏輯回歸
# model &<- glm(vs ~ mpg + disp + hp + wt , data = mtcarsDF, family = "gaussian")# 線性回歸
predictions &<- predict(model, newData = mtcarsDF )
modelPrediction &<- select(predictions, "vs", "prediction")
head(modelPrediction)
vs prediction
1 0 0.58006945
2 0 0.64060709
3 1 0.72468718
4 1 0.47803842
5 0 0.06070972
6 1 0.54994276
模型評估
# error變數: 觀測值和預測值的差值
modelPrediction$error &<- abs(modelPrediction$vs - modelPrediction$prediction)
# modelPrediction 現在對 SQLContext 是可見的
registerTempTable(modelPrediction, "modelPrediction")
num_errors &<- sql(sqlContext, "SELECT count(error) FROM modelPrediction WHERE error = 1")
total_errors &<- sql(sqlContext, "SELECT count(error) FROM modelPrediction")
# 模型錯誤率
training_acc &<- collect(num_errors) / collect(total_errors)
training_acc
_c0
1 0
參考資料
[原]打造數據產品的快速原型:Shiny的Docker之旅
[原]R語言工程化實踐:RStudio Server環境快速配置教程
[譯]打造大數據產品:Shiny的Spark之旅
Tiago Vinícius: SparkR 1.5 MLlib Logistic Regression Example
SparkR: Distributed data frames with Spark and R
SparkR R frontend for Spark
作為分享主義者(sharism),本人所有互聯網發布的圖文均遵從CC版權,轉載請保留作者信息並註明作者 Harry Zhu 的 FinanceR專欄:FinanceR - SegmentFault,如果涉及源代碼請註明GitHub地址:harryprince (HarryZhu) · GitHub。微信號: harryzhustudio
商業使用請聯繫作者。
屬於Hadoop生態的一部分,Hadoop生態計算引擎目前包括:Hadoop MapReduce、Spark/Spark 2.0、TEZ、Flink等,這裡從計算模型,各自的特點分為了1G、2G、3G、3.8G、4G,分別代表其理論先進程度。Spark理論上並不是最先進的,但是目前來講應該是最適合的。和封神一起「深挖」Spark-博客-雲棲社區-阿里雲
我是從事存儲的,計算上不是很擅長,但從我對Hadoop的理解上來看:
Hadoop是10年前的技術了,在這個技術更新換代太快的時代,的確是長江後浪推前浪。
只說Spark和Hadoop。
簡單地接觸過,Spark的編程模型要好用很多,理念也要先進不少,比如說,寫一個經典的word count(統計詞頻):input.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).foreach(println)
沒錯,它看起來和普通的串列程序沒多少區別,只是方法綁定的對象是RDD(分散式彈性數據集),你可以直接對分散式數據進行操作,就像操作本機上的數據一樣。其抽象的程度要高於傳統的MapReduce,不用再糾結於程序的底層實現。
而Hadoop,性質和Spark並不一樣,它不僅僅是一個框架,而致力於發展成一個分散式計算的平台。所以,現在的Spark是可以運行在YARN(Yet another Resource Negotiator)上的,而其他的一些框架也可以運行在這個平台上,達到資源共享的目的。
對我個人來講,用了Spark之後真的不想再去用Hadoop Map/Reduce了。
個人的一些淺見。最近課程需要,認真看了看MapReduce和Spark最初的那兩篇paper。從一個學生的角度談談自己的看法。文中英文只是出於方便,或者不知道如何恰當翻譯。
首先,二者都是並行化計算技術,目的是處理海量數據。因數據量大,也往往需要分散式存儲系統(GFS,HDFS等)的支持。
關於MapReduce的原理,很多答案都講明白了,我就不贅述了。主要說說Spark相比MapReduce的改變。
Spark的提出很大程度上是為了解決MapReduce在處理迭代演算法上的缺陷。由於MapReduce的數據流是acyclic的,且數據存儲在磁碟,這就導致在迭代計算時需要反覆進行磁碟讀寫操作,大大降低了計算效率。而事實上當前機器學習的大多數演算法都是迭代演算法,因此解決這一問題具有很大的應用價值。
Spark解決這一問題的方法是提供了一個更強大的primitive數據抽象模型--RDD(Resilient Distributed Datasets),並定義了一系列轉化(map,filter,sample,...)和分散式操作(reduce,collect,count...)。
RDD的妙處很多。舉例如下:- RDD可以被cache在內存中。這種機制無疑可以簡單有效地解決MapReduce在迭代計算時反覆讀取寫出磁碟的問題,但同時大大增大了內存開銷。這裡有一個tradeoff.
- RDD幫助Fault Recovery. RDD可存儲lineage信息來重建lost partitions. 比如記錄RDD的轉化過程。MapReduce採用的是checkpointing機制,代價要大很多。
- RDD是一個logical單元,甚至不需要實例化,而只需包含從磁碟重建workset的信息。
更具體的細節就不講了。簡單說說結論:
- Spark無疑更快。原因除了上面所說之外,還有就是RDD是immutable的因此不存在synchronisation的問題。
- Spark的caching機制允許講數據和中間結果cache在內存中,大大提高了計算效率。
- Spark的recovery機制比MapReduce更有效。
- Spark的API更強大,抽象層次更高。
- Spark更消耗內存。但是在雲計算時代性價比更高,參考雲收費方法。
- Spark在安全方面還有很大空間,相比之下Hadoop較為成熟。不過隨著Spark的發展,此方面的劣勢相信也會逐漸減小。
你們公司。。。竟然會被這種典型的騙子騙。。。哎
這是典型騙子,不作過多解釋了,你趕緊想辦法跳槽吧Hadoop Strom Spark 都有各自的優勢和特點 你可以通吃
看樣子王大師此人過於浮誇,在網路上的名聲似乎不太好.
同時宣布了Hadoop的死刑 ,我覺得這句話說得非常的過。hadoop根深蒂固,怎麼可能被幹掉?試問現在那個做大數據的能脫離得了hdfs與yarn?spark不也是在yarn上用的模式居多?
當然了,spark確實是一個好東西,我們也基於spark之上做了不少東西,尤其是我們的檢索與排序,基於spark之上後,性能不僅杠杠的,而且支持很多複雜的SQL。
下面是我們之前基於spark之上二次開發的一些東西,可以說挺爽的。
基於spark排序的一種更廉價的實現方案-附基於spark的性能測試
排序可以說是很多日誌系統的硬指標(如按照時間逆序排序),如果一個大數據系統不能進行排序,基本上是這個系統屬於不可用狀態,排序算得上是大數據系統的一個「剛需」,無論大數據採用的是hadoop,還是spark,還是impala,hive,總之排序是必不可少的,排序的性能測試也是必不可少的。
有著計算奧運會之稱的Sort Benchmark全球排序每年都會舉行一次,每年巨頭都會在排序上進行巨大的投入,可見排序速度的高低有多麼重要!但是對於大多數企業來說,動輒上億的硬體投入,實在划不來、甚至遠遠超出了企業的項目預算。相比大數據領域的暴力排序有沒有一種更廉價的實現方式?
在這裡,我們為大家介紹一種新的廉價排序方法,我們稱為blockSort。
500G的數據300億條數據,只使用4台 16核,32G內存,千兆網卡的虛擬機即可實現 2~15秒的 排序 (可以全表排序,也可以與任意篩選條件篩選後排序)。
一、基本的思想是這樣的,如下圖所示:
1.將數據按照大小預先劃分好,如劃分成 大、中、小三個塊(block)。
2.如果想找最大的數據,那麼只需要在最大的那個塊里去找就可以了。
3.這個快還是有層級結構的,如果每個塊內的數據量很多,可以到下面的子快內進行繼續查找,可以分多個層進行排序。
4.採用這種方法,一個億萬億級別的數據(如long類型),最壞最壞的極端情況也就進行2048次文件seek就可以篩選到結果。
怎麼樣,原理是不是非常簡單,這樣數據量即使特別多,那麼排序與查找的次數是固定的。
二、這個是我們之前基於spark做的性能測試,供大家參考
在排序上,YDB具有絕對優勢,無論是全表,還是基於任意條件組合過濾,基本秒殺Spark任何格式。
測試結果(時間單位為秒)
三、當然除了排序上,我們的其他性能也是遠遠高於spark,這塊大家也可以了解一下
1、與Spark txt在檢索上的性能對比測試。
注釋:備忘。下圖的這塊,其實沒什麼特別的,只不過由於YDB本身索引的特性,不想spark那樣暴力,才會導致在掃描上的性能遠高於spark,性能高百倍不足為奇。
下圖為ydb相對於spark txt提升的倍數
2、這些是與 Parquet 格式對比(單位為秒)
3、與ORACLE性能對比
跟傳統資料庫的對比,已經沒啥意義,Oracle不適合大數據,任意一個大數據工具都遠超oracle 性能。
4.稽查布控場景性能測試
四、YDB是怎麼樣讓spark加速的?
基於Hadoop分散式架構下的實時的、多維的、互動式的查詢、統計、分析引擎,具有萬億數據規模下的秒級性能表現,並具備企業級的穩定可靠表現。
YDB是一個細粒度的索引,精確粒度的索引。數據即時導入,索引即時生成,通過索引高效定位到相關數據。YDB與Spark深度集成,Spark對YDB檢索結果集直接分析計算,同樣場景讓Spark性能加快百倍。
五、哪些用戶適合使用YDB?
1.傳統關係型數據,已經無法容納更多的數據,查詢效率嚴重受到影響的用戶。
2.目前在使用SOLR、ES做全文檢索,覺得solr與ES提供的分析功能太少,無法完成複雜的業務邏輯,或者數據量變多後SOLR與ES變得不穩定,在掉片與均衡中不斷惡性循環,不能自動恢復服務,運維人員需經常半夜起來重啟集群的情況。
3.基於對海量數據的分析,但是苦於現有的離線計算平台的速度和響應時間無滿足業務要求的用戶。
4.需要對用戶畫像行為類數據做多維定向分析的用戶。
5.需要對大量的UGC(User Generate Content)數據進行檢索的用戶。
6.當你需要在大數據集上面進行快速的,互動式的查詢時。
7.當你需要進行數據分析,而不只是簡單的鍵值對存儲時。
8.當你想要分析實時產生的數據時。
ps: 說了一大堆,說白了最適合的還是蹤跡分析因為數據量大,數據還要求實時,查詢還要求快。這才是關鍵。
視頻地址 (看不清的同學可以進入騰訊視頻 高清播放)
https://v.qq.com/x/page/q0371wjj8fb.html
https://v.qq.com/x/page/n0371l0ytji.html
感興趣的讀者也可以閱讀YDB編程指南 http://url.cn/42R4CG8 。也可以參考該書自己安裝延雲YDB進行測試。
我們公司也請了這個人授課,貌似價格還不菲…
我也感覺是騙子推薦閱讀:
※有哪些「神奇」的數據獲取方式?
※有哪些數據風向標更好地預測了這次美國總統選舉結果?
※如何評價亞馬遜新推出的 Amazon Machine Learning?
※數據科學家 (Data Scientist) 的核心技能是什麼?
※機器學習的演算法和普通《演算法導論》里的演算法有什麼本質上的異同?