都說spark那麼牛,有沒有什麼坑啊?
這玩意到底怎麼樣?有沒有傳說中宣傳的那麼好?真的比hadoop 好么?
您好,我是延雲的技術,這個回答不好一概而論,畢竟我們也用了一年的spark。總體來說這東西還是非常不錯的,當然也有一些地方不是很穩定的。
下面是我們這一年裡,遇到的主要問題,比較偏技術,供您參考。
spark 內存泄露
我們用的是 spark1.6.3,說實話,這個版本相比先前的版本已經穩定了很多,由於要給企業用,spark2.0剛出來不久,我們還在觀望中。
但是內存泄露的問題,真的是spark的硬傷。如果不能在企業中持續的使用不穩定,那麼再快又有什麼用呢?
1.高並發情況下的內存泄露的具體表現
很遺憾,Spark的設計架構並不是為了高並發請求而設計的,我們嘗試在網路條件不好的集群下,進行100並發的查詢,在壓測3天後發現了內存泄露。
a)在進行大量小SQL的壓測過程中發現,有大量的activejob在spark ui上一直處於pending狀態,且永遠不結束,如下圖所示
b)並且發現driver內存爆滿
c)用內存分析分析工具分析了下
2.高並發下AsynchronousListenerBus引起的WEB UI的內存泄露
短時間內 SPARK 提交大量的SQL ,而且SQL裡面存在大量的 union與join的情形,會創建大量的event對象,使得這裡的 event數量超過10000個event ,
一旦超過10000個event就開始丟棄 event,而這個event是用來回收 資源的,丟棄了 資源就無法回收了。 針對UI頁面的這個問題,我們將這個隊列長度的限制給取消了
3.AsynchronousListenerBus本身引起的內存泄露
抓包發現
這些event是通過post方法傳遞的,並寫入到隊列里
但是也是由一個單線程進行postToAll的
但是在高並發情況下,單線程的postToAll的速度沒有post的速度快,會導致隊列堆積的event越來越多,如果是持續性的高並發的SQL查詢,這裡就會導致內存泄露
接下來我們在分析下postToAll的方法裡面,那個路徑是最慢的,導致事件處理最慢的邏輯是那個?
可能您都不敢相信,通過jstack抓取分析,程序大部分時間都阻塞在記錄日誌上
可以通過禁用這個地方的log來提升event的速度
log4j.logger.org.apache.spark.scheduler=ERROR
4.高並發下的Cleaner的內存泄露
說道這裡,Cleaner的設計應該算是spark最糟糕的設計。spark的ContextCleaner是用於回收與清理已經完成了的 廣播boradcast,shuffle數據的。但是高並發下,我們發現這個地方積累的數據會越來越多,最終導致driver內存跑滿而掛掉。
l我們先看下,是如何觸發內存回收的
沒錯,就是通過System.gc() 回收的內存,如果我們在jvm里配置了禁止執行System.gc,這個邏輯就等於廢掉(而且有很多jvm的優化參數一般都推薦配置禁止system.gc 參數)
lclean過程
這是一個單線程的邏輯,而且每次清理都要協同很多機器一同清理,清理速度相對來說比較慢,但是SQL並發很大的時候,產生速度超過了清理速度,整個driver就會發生內存泄露。而且brocadcast如果佔用內存太多,也會使用非常多的本地磁碟小文件,我們在測試中發現,高持續性並發的情況下本地磁碟用於存儲blockmanager的目錄佔據了我們60%的存儲空間。
我們再來分析下 clean裡面,那個邏輯最慢
真正的瓶頸在於blockManagerMaster裡面的removeBroadcast,因為這部分邏輯是需要跨越多台機器的。
針對這種問題,
l我們在SQL層加了一個SQLWAITING邏輯,判斷了堆積長度,如果堆積長度超過了我們的設定值,我們這裡將阻塞新的SQL的執行。堆積長度可以通過更改conf目錄下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值來設置。
l建議集群的帶寬要大一些,萬兆網路肯定會比千兆網路的清理速度快很多。
l給集群休息的機會,不要一直持續性的高並發,讓集群有間斷的機會。
l增大spark的線程池,可以調節conf下的spark-defaults.conf的如下值來改善。
5.線程池與threadlocal引起的內存泄露
發現spark,Hive,lucene都非常鍾愛使用threadlocal來管理臨時的session對象,期待SQL執行完畢後這些對象能夠自動釋放,但是與此同時spark又使用了線程池,線程池裡的線程一直不結束,這些資源一直就不釋放,時間久了內存就堆積起來了。
針對這個問題,延雲修改了spark關鍵線程池的實現,更改為每1個小時,強制更換線程池為新的線程池,舊的線程數能夠自動釋放。
6.文件泄露
您會發現,隨著請求的session變多,spark會在hdfs和本地磁碟創建海量的磁碟目錄,最終會因為本地磁碟與hdfs上的目錄過多,而導致文件系統和整個文件系統癱瘓。在YDB裡面我們針對這種情況也做了處理。
7.deleteONExit內存泄露
為什麼會有這些對象在裡面,我們看下源碼
8.JDO內存泄露
多達10萬多個JDOPersistenceManage
9.listerner內存泄露
通過debug工具監控發現,spark的listerner隨著時間的積累,通知(post)速度運來越慢
發現所有代碼都卡在了onpostevent上
jstack的結果如下
研究下了調用邏輯如下,發現是循環調用listerners,而且listerner都是空執行才會產生上面的jstack截圖
通過內存發現有30多萬個linterner在裡面
發現都是大多數都是同一個listener,我們核對下該處源碼
最終定位問題
確系是這個地方的BUG ,每次創建JDBC連接的時候 ,spark就會增加一個listener, 時間久了,listener就會積累越來越多 針對這個問題 我簡單的修改了一行代碼,開始進入下一輪的壓測
二十二、spark源碼調優
測試發現,即使只有1條記錄,使用 spark進行一次SQL查詢也會耗時1秒,對很多即席查詢來說1秒的等待,對用戶體驗非常不友好。針對這個問題,我們在spark與hive的細節代碼上進行了局部調優,調優後,響應時間由原先的1秒縮減到現在的200~300毫秒。
以下是我們改動過的地方
1.SessionState 的創建目錄 佔用較多的時間
另外使用Hadoop namenode HA的同學會注意到,如果第一個namenode是standby狀態,這個地方會更慢,就不止一秒,所以除了改動源碼外,如果使用namenode ha的同學一定要注意,將active狀態的node一定要放在前面。
2.HiveConf的初始化過程佔用太多時間
頻繁的hiveConf初始化,需要讀取core-default.xml,hdfs-default.xml,yarn-default.xml
,mapreduce-default.xml,hive-default.xml等多個xml文件,而這些xml文件都是內嵌在jar包內的。
第一,解壓這些jar包需要耗費較多的時間,第二每次都對這些xml文件解析也耗費時間。
3.廣播broadcast傳遞的hadoop configuration序列化很耗時
lconfiguration的序列化,採用了壓縮的方式進行序列化,有全局鎖的問題
lconfiguration每次序列化,傳遞了太多了沒用的配置項了,1000多個配置項,佔用60多Kb。我們剔除了不是必須傳輸的配置項後,縮減到44個配置項,2kb的大小。
4.對spark廣播數據broadcast的Cleaner的改進
由於SPARK-3015 的BUG,spark的cleaner 目前為單線程回收模式。
大家留意spark源碼注釋
其中的單線程瓶頸點在於廣播數據的cleaner,由於要跨越很多台機器,需要通過akka進行網路交互。
如果回收並發特別大,SPARK-3015 的bug報告會出現網路擁堵,導致大量的 timeout出現。
為什麼回收量特變大呢? 其實是因為cleaner 本質是通過system.gc(),定期執行的,默認積累30分鐘或者進行了gc後才觸發cleaner,這樣就會導致瞬間,大量的akka並發執行,集中釋放,網路不瞬間癱瘓才不怪呢。
但是單線程回收意味著回收速度恆定,如果查詢並發很大,回收速度跟不上cleaner的速度,會導致cleaner積累很多,會導致進程OOM(YDB做了修改,會限制前台查詢的並發)。
不論是OOM還是限制並發都不是我們希望看到的,所以針對高並發情況下,這種單線程的回收速度是滿足不了高並發的需求的。
對於官方的這樣的做法,我們表示並不是一個完美的cleaner方案。並發回收一定要支持,只要解決akka的timeout問題即可。
所以這個問題要仔細分析一下,akka為什麼會timeout,是因為cleaner佔據了太多的資源,那麼我們是否可以控制下cleaner的並發呢?比如說使用4個並發,而不是默認將全部的並發線程都給佔滿呢?這樣及解決了cleaner的回收速度,也解決了akka的問題不是更好么?
針對這個問題,我們最終還是選擇了修改spark的ContextCleaner對象,將廣播數據的回收 改成多線程的方式,但現在了線程的並發數量,從而解決了該問題。
推薦閱讀:
※MapReduce初窺 · 一
※分散式機器學習的故事:LDA和MapReduce
※Hadoop就是「存儲」加「計算」這麼簡單
※知識布局-mapreduce-wordcount
※MapReduce中間結果為什麼只能存到磁碟?