穩定和性能如何兼顧?58大數據平台的技術演進與實踐

本文將分享58大數據平台在最近一年半內技術演進的過程,包括:58大數據平台目前的整體架構是怎麼樣的;最近一年半的時間內面臨的問題、挑戰以及技術演進過程;以及未來的規劃。

寫在前面

趙健博,來自58趕集,本文將為大家分享58大數據這塊的經驗。本科和研究生分別是在北京郵電大學和中國科學院計算技術研究所,之前在百度和360工作,現在是58趕集高級架構師、58大數據平台負責人。多年的分散式系統(存儲、計算)的實踐和研發經驗,在工作的這些年中運營了大大小小的集群,最大單集群也達到了四五千台,在這個過程中做了大量的功能研發、系統優化,也淌了大量的坑,本文會給大家介紹一些自認為比較重要的經驗。

首先看一下58大數據平台架構。大的方面來說分為三層:數據基礎平台層、數據應用平台層、數據應用層,還有兩列監控與報警和平台管理。

數據基礎平台層又分為四個子層:

  • 接入層,包括了Canal/Sqoop(主要解決資料庫數據接入問題)、還有大量的數據採用Flume解決方案;

  • 存儲層,典型的系統HDFS(文件存儲)、HBase(KV存儲)、Kafka(消息緩存);

  • 再往上就是調度層,這個層次上我們採用了Yarn的統一調度以及Kubernetes的基於容器的管理和調度的技術;

  • 再往上是計算層,包含了典型的所有計算模型的計算引擎,包含了MR、HIVE、Storm、Spark、Kylin以及深度學習平台比如Caffe、Tensorflow等等。

數據應用平台主要包括以下功能:

  • 元信息管理,還有針對所有計算引擎、計算引擎job的作業管理,之後就是交互分析、多維分析以及數據可視化的功能。

  • 再往上是支撐58集團的數據業務,比如說流量統計、用戶行為分析、用戶畫像、搜索、廣告等等。針對業務、數據、服務、硬體要有完備的檢測與報警體系。

  • 平台管理方面,需要對流程、許可權、配額、升級、版本、機器要有很全面的管理平台。

這個就是目前58大數據平台的整體架構圖:

這個圖展示的是架構圖中所包含的系統數據流動的情況。分為兩個部分:

首先是實時流,就是黃色箭頭標識的這個路徑。數據實時採集過來之後首先會進入到Kafka平台,先做緩存。實時計算引擎比如Spark streaming或storm會實時的從Kafka中取出它們想要計算的數據。經過實時的處理之後結果可能會寫回到Kafka或者是形成最終的數據存到MySQL或者HBase,提供給業務系統,這是一個實時路徑。

對於離線路徑,通過接入層的採集和收集,數據最後會落到HDFS上,然後經過Spark、MR批量計算引擎處理甚至是機器學習引擎的處理。其中大部分的數據要進去數據倉庫中,在數據倉庫這部分是要經過數據抽取、清洗、過濾、映射、合併匯總,最後聚合建模等等幾部分的處理,形成數據倉庫的數據。然後通過HIVE、Kylin、SparkSQL這種介面將數據提供給各個業務系統或者我們內部的數據產品,有一部分還會流向MySQL。

在數據流之外還有一套管理平台。包括元信息管理(雲窗)、作業管理平台(58dp)、許可權審批和流程自動化管理平台(NightFury)。

規模可能不算大,跟BAT比起來有些小,但是也過了一千台,目前有1200台的機器。我們的數據規模目前有27PB,每天增量有50TB。作業規模每天大概有80000個job,核心job(產生公司核心指標的job)有20000個,每天80000個job要處理數據量是2.5PB。

技術平台技術演進與實現

接下來我會重點介紹一下在最近一年半時間內我們大數據平台的技術演進過程,共分四個部分:穩定性、平台治理、性能以及異構計算。第一個部分關於穩定性的改進,穩定性是最基礎的工作,我們做了比較多的工作。第二個部分是在平台治理方面的內容。第三個方面我們針對性能也做了一些優化。第四個方面,我們針對異構環境,比如說機器的異構、作業的異構,在這種環境下怎麼合理地使用資源。

穩定性改進

首先看一下穩定性的改進。穩定性包含了幾個方面,其中第一個方面就是系統的可用性,大家可以採用社區提供的HDFS HA、Yarn HA,Storm HA來解決。另外一方面是關於擴展性,例如Flume、HDFS,Yarn,Storm的擴展性。這裡主要介紹下Flume和HDFS的擴展性相關的一些考慮。

此外,有了可用性和擴展性,系統就穩定了嗎?實際上不是這樣。因為還有很多的突發問題。即使解決了可用性和擴展性,但突發問題還是可能會造成系統不可用,例如由於一些問題造成兩台NameNode全部宕機。

首先看一下Flume的擴展性。我們人為的把它定義了兩層。一個是FlumeLocal(主要解決一台機器的日誌採集問題,簡稱Local),一個是FlumeCenter(主要從Local上收集數據,然後把數據寫到HDFS上,簡稱Center),Local和Center之間是有一個HA的考慮的,就是Local需要在配置文件里指定兩個Center去寫入,一旦一個Center出現問題,數據可以馬上從另一個Center流向HDFS。

此外,我們還開發了一個高可靠的Agent。業務系統中會把數據產生日誌寫到磁碟上,Agent保證數據從磁碟上實時可靠的收集給本地的Local,其中我們採用了檢查點的技術來解決數據可靠性的問題。

這是Flume的典型架構。Local需要在配置文件裡面指定死要連到哪幾個Center上。如果說10台,可能還OK,100台也OK,如果一千台呢?如果發現兩台Flume Center已經達到機器資源的上限,如何做緊急的擴容呢?所以從這個角度看Flume的擴展性是有問題的。

我們的解決方法是在Local和Center中間加了一個ZooKeeper,Local通過ZK動態發現Center,動態的發現下游有什麼,就可以達到Center自動擴容的目標了。我們公司Local有兩千多台,擴容一台Center僅需一分鐘,這種架構實際上可以支持達到萬台規模的,這是Flume擴展性的一些改進。

接下來看一下HDFS擴展性的問題。上面這張圖展示了hdfs federation的架構,左側是一個單namespace架構,即整個目錄樹在一個namespace中,整個集群的文件數規模受限制於單機內存的限制。federation的思想是把目錄樹拆分,形成不同的namespace,不同namespace由不同namenode管理,這樣就打破了單機資源限制,從而達到了可擴展的目標,如右側圖。

但這個方案有一些隱藏的問題,不知道大家有沒有注意到,比如這裡每個Datanode都會與所有的NameNode去心跳,如果DataNode數量上萬台,那麼就可能會出現兩個問題:第一,從主節點之間的心跳、塊彙報成為瓶頸,第二,如果單個部門的數據規模過大那該怎麼辦?

針對從主節點之間交互的問題,我們可以進行拆分,控制一個NameNode管理的DateNode的數量,這樣就可以避免主從節點交互開銷過大的問題。針對單部門數據過大的話可以針對部門內數據進行進一步細拆,就OK了。或者可以考慮百度之前提供的一個方案,即把目錄樹和inode信息進行抽象,然後分層管理和存儲。當然我們目前採用社區federation的方案。如果好好規劃的話,也是可以到萬台了。

不知道大家有沒有在自己運營集群過程中遇到過一些問題,你們是怎麼解決的,有些問題可能相當的棘手。突發問題是非常緊急而且重要的,需要在短時間內搞定。接下來我會分享三個例子。

第一個例子是HDFS的Active NN會不定期異常退出,觸發HA切換,這就好像一個不定時炸彈一樣。這個圖展示了HDFS的HA的架構圖,客戶端進行變更操作(如創建文件)的話會發出請求給namenode,namenode請求處理完之後會進行持久化工作,會在本地磁碟存一份,同時會在共享存儲存一份,共享存儲是為了active和standby之間同步狀態的,standby會周期從共享存儲中拉取更新的數據應用到自己的內存和目錄樹當中,所有的DataNode都是雙彙報的,這樣兩個namenode都會有最新的塊信息。最上面的是兩個Checker,是為了仲裁究竟誰是Active的。

還有一個過程,Standby NameNode會定期做checkpoint工作,然後在checkpoint做完之後會回傳最新的fsimage給active,最終保存在active的磁碟中,默認情況下在回傳過程會造成大量的網路和磁碟的壓力,導致active的本地磁碟的Util達到100%,此時用戶變更請求延遲就會變高。如果磁碟的Util100%持續時間很長就會導致用戶請求超時,甚至Checher的檢測請求也因排隊過長而超時,最終然後觸發Checker仲裁HA切換。

切換的過程中在設計上有很重要一點考慮,不能同時有兩個Active,所以要成為新Active NameNode,要把原來的Active NameNode停止掉。先會很友好地停止,什麼是友好呢?就是發一個RPC,如果成功了就是友好的,如果失敗了,就會ssh過去,把原來active namenode進程kill掉,這就是Active NameNode異常退的原因。

當這個原因了解了之後,其實要解決這個問題也非常簡單。

第一點要把editlog與fsimage保存的本地目錄分離配置,這種分離是磁碟上的分離,物理分離。

第二是checkpoint之後fsimage回傳限速。把editlog與fsimage兩個磁碟分離,fsimage回傳的io壓力不會對客戶端請求造成影響,另外,回傳限速後,也能限制io壓力。這是比較棘手的問題。原因看起來很簡單,但是從現象找到原因,這個過程並沒有那麼容易。

第二個案例也是一樣,Active NN又出現異常退出,產生HA切換。這次和網路連接數有關,這張圖是Active NameNode的所在機器的網路連接數,平時都挺正常,20000到30000之間,忽然有一個點一下打到60000多,然後就打平了,最後降下來,降下來的原因很明顯,是服務進程退了。

為什麼會出現這個情況呢?在後續分析的過程中我們發現了一個線索,在NameNode日誌里報了一個空指針的異常。就順藤摸瓜發現了一個JDK1.7的BUG,參見上面圖片所示,在java select庫函數調度路徑過程中最終會調用這個函數(setUpdateEvents),大家可以看到,如果fd的個數超過了MAX_UPDATE_ARRAY_SIZE(65535)這個數的話,將會走到else路徑,這個路徑在if進行不等表達式判斷時,將會出發空指針異常。

接下來的問題是,為什麼會產生這麼多的鏈接呢?經過分析我們發現,在問題出現的時候,存在一次大目錄的DU操作,而DU會鎖住整個namespace,這樣就導致後續的寫請求被阻塞,最終導致請求的堆積,請求的堆積導致了連接數大量堆積,連接數堆積到一定程度就觸發JDK1.7的這個BUG。這個問題的解決,從兩個方面看,首先我們先把JDK升級到1.8。其次,調整參數dfs.content-summary.limit,限制du操作的持鎖時間。該參數默認參數是0。我們現在是設成10000了,大家可以參考。這是第二個非常棘手的問題。

第三個案例關於YARN主節點的,有一天中午,我們收到報警,發現Active RM異常進程退出,觸發HA的切換,然而切換後一會新的Active RM節點也會異常退出,這就比較悲劇,我們先進行了恢復。

之後我們從當時的日誌中發現了原因:一個用戶寫了一萬個文件到分散式緩存里,分散式緩存里數據會同步到ZK上,RM持久化作業狀態到ZK時超過Znode單節點最大上限,拋出異常,最終導致ResourceManager進程的異常退出。其實問題的解決方法也非常簡單,我們增加了限制邏輯,對於序列化數據量大於Znode節點大小的Job,直接拋異常觸發Job的失敗。另外我們還適當提升Znode節點大小。

以上是在穩定性方面的一些工作,這三個案例跟大家分享一下,如果有類似的問題建議大家可以嘗試一下,這些方案是被我們驗證OK的。

平台治理

接下來介紹一下平台治理這塊。包含幾個問題,其中第一問題是關於數據的,一方面,就是大家開發了數據之後,經常找不到,要靠喊,比如說在群里喊一下什麼數據在哪,誰能告訴我一下,這個效率很低下。另外一方面是之前的管理數據是共享的,不安全,任何人都可以訪問其他人的數據。

第二個問題是關於資源,之前是「大鍋飯」模式,大家共享計算資源,相互競爭,這樣「能吃的「肯定是擠兌」不能吃的「,經常出現核心任務不能按時按點完成,老闆看不到數據,這點很可怕。還有是整個集群資源使用情況沒有感知,這樣根本不知道資源要怎麼分配,是否夠用。

第三個問題是關於作業的,開發人員開發大量的作業之後,這些作業要怎麼管理,實際上他們可能都不知道。還有就是關於作業之間依賴,經常一個指標計算出來要經歷多個作業,作業之間依賴是怎麼考慮的,單純靠時間上的依賴是非常脆弱的,如果前期的job延遲產生了,後續的job必然失敗。最後一個問題是數據開發人員的效率不高,所需要做的步驟過多。

針對這四個問題我們做了一些改進,首先是數據與資源治理。數據方面要引入安全策略、元信息管理與基礎數倉建設。我們自己開發了一套安全控制策略,主要增加了白名單和許可權控制策略。一個HDFS的請求的流程,首先客戶端會向NameNode發請求,NameNode接到請求之後首先要做連接解析,讀取出請求相關內容做請求處理,再把結果反饋回來,之後客戶端向相應的DataNode進行寫入數據或者讀取數據。從上述流程可以看出,所有HDFS操作全部要經過NameNode這一層。

那麼安全策略只要在NameNode的兩個點做下控制既可完成:在連接解析後,我們會驗證請求方的IP,以及用戶是不是在合法配置下面的。如果驗證失敗,則拒絕請求。如果驗證通過,我們會進一步在請求處理過程中驗證用戶訪問的目錄和用戶在否在合法的配置下。

比如說用戶A想訪問用戶B的數據,如果沒在允許的情況下會把連接關掉,通過簡單的策略調整就能達到靈活的數據的安全控制和數據共享的方式。接下來針對數據找不到的問題,我們開發了全公司層面的基礎數據倉庫以及針對全公司層面元數據管理平台。

這張圖展示了基礎數據倉庫覆蓋度,它覆蓋了集團各個公司,又覆蓋了多個平台,比如說手機、App端、PC端、微信端等等。數據層次,是數據倉庫層、數據集市層還是數據應用層,所屬哪個事業群,最後針對數據進行分類標籤,比如說帖子數據、用戶數據等等都可以通過標籤的方式來找到。當想找具體一份數據的時候可以通過這個界面,點一些標籤,篩選出一些數據表,甚至在搜索框裡面搜數據的關鍵字。

當查到數據表的時候可以在右側按鈕,將顯示出表結構,還有表信息,表信息表明了這個表有多少列,這個表的負責人是什麼,還有關於數據質量,表的數據量的變化情況等等,如果你想申請可以點擊最右邊的許可權開通。整體開通流程也是自動化的。這是針對數據找不到的問題做的一些改進。

針對資源問題要避免大鍋飯,必須要引入賬號概念,資源按照賬號預留與隔離。我們劃分了不同的配額,根據預算、業務需求去申請配額,然後我們調整配額。針對隊列這塊我們劃分多個隊列,每個業務線有自己的隊列,不同業務線不能跨隊列提交任務,每個隊列劃分出不同資源,資源主要是針對業務線需求而定的。通過這些改進可以達到資源的隔離以及適度的共享。

有了賬號的概念之後我們就可以統計每個業務線資源使用情況。我們每天都會有報表。顯示了業務線的計算和存儲資源的使用情況,甚至是Job的細節情況。

對於業務線開發效率低下問題的改進,實際上我們在易用性上也做了很多改進。首先我們開發了雲窗平台,它主要解決了元信息查找、數據查詢、可是化展示和多維分析這些需求。然後針對任務開發這塊我們開發了58DP解決了元信息開發、作業管理與統計等。針對實時多維分析開發了飛流,實時作業開發全部配置化、同時支持多種統計運算元、自動圖表生成等等。還有NightFury,流程自動化管理平台。當然這塊通過一些定製的BI也能實現。

這是雲窗的界面,上面是一個SQL查詢界面,下面是可視化產品界面,這是我們數據可視化的一個結果。

然後關於任務開發的話,我們用58DP來做任務開發,可以支持的不同任務,涵蓋目前的所有主流作業以及作業依賴等管理。這是58DP的頁面,可以設置基本信息、調度及依賴等。

飛流是支持周期性的統計、全天累計性的統計,大家可以定義統計方法、定義任務的一些基本信息,設置維度、設置度量,設置完之後就展現了圖形,也提供了跟昨天的對比情況。當在圖裡點任何一個點的時候,可以看到不同維度組合下在這個點上的數據分布,點擊兩個點可以看到不同維度下兩個點的分布對比。針對歷史數據可以進行對比,我們可以把時間拉的更長,可以查看不同周的實時統計結果,而不是一天。

這是NightFury的界面,這就是我們運維的自動化管理平台,大家可以看到有很多個流程和許可權的開通申請,表單的填寫、工單審批,審批之後的一些流程全部是自動化的。

性能

性能方面,主要分為四個方面:

MR作業性能、數據收集性能、SQL查詢性能和多維分析的性能。針對MR作業性能,我們引用多租戶功能,資源預留,核心作業執行有保障。

第二點小文件合併處理,可以提升任務執行效率,減少調度本身的開銷。

第三點我們針對Shuffle階段參數優化,可以實現並發度提升,IO消耗降低。

經過三個方面的改進之後,整體任務的運行時間實際上有一倍左右的提升。數據傳輸優化方面,我們經過消息合併改進數據傳輸性能,提升了20倍。在SQL優化方面我們引用內存執行引擎與列存儲方案的結合,在同等資源情況下針對線上一百多條SQL進行測試,總體性能大概提升80%。在多維計算這塊,我們引入Kylin,針對多維的查詢95%以上查詢能控制在2s以內。

異構計算

異構計算方面我們面臨了兩個主要問題,一個是作業的異構,我們有多種類型的作業,比如說實時作業強調低時延,而離線作業強調高吞吐,這本身就是矛盾的,怎麼解決這個矛盾。第二方面是機器異構,CPU、內存、網路、磁碟配置不同,這種異構環境又要怎麼辦。

從上面圖中可以看出:如果實時作業的task和批處理作業的task被調度到一台機器上了,如果批處理作業把資源佔滿了(例如網路帶寬),則實時作業的task必將收到影響。所以,需要對實時作業和批處理作業做隔離才行。

做資源隔離,我們的思路是採用標籤化,給每個NodeManager賦予不同標籤,表示不同機器被分配了不同標籤;資源隊列也賦予不同標籤,然後在RM調度時,保證相同標籤的隊列里容器資源必從相同標籤的NodeManager上分配的。這樣就可以通過標籤的不同達到物理上的資源隔離目標。

這張圖是實現圖。首先可以看到NodeManager分成了兩個集合,一個是實時的,一個是離線的,不同的隊列也被賦予了實時或離線的標籤,當用戶提交一個job的時候它可以指定一個隊列,提交到離線隊列里就是離線任務,ResourceManager就會把這個作業所需要的資源分配到離線標籤的NodeManager上,這樣就可以做到物理資源隔離。

未來規劃

以上主要是介紹了我們最近一年半做的一些工作。接下來我會介紹一下未來的規劃。首先就是深度學習。這個概念今年非常火爆,甚至是要爆炸了,深度學習在58這塊需求也是蠻強烈的。目前深度學習工具有這麼多,caffe、theano、torch等等非常多,怎麼做整合,怎麼降低使用成本,這是第一個問題。

第二個問題,機器是有限的,怎麼高效利用資源,需要把機器分配模式變成資源分配模式。還有光有單機的機器學習或者深度學習工具還不夠,因為性能太差,所以我們需要將深度學習訓練分散式化。我們做了一個初步的測試,針對caffe與Tensorflow工具的分散式化訓練做了比較,4卡相對於單卡模型訓練性能提升100%~170%,所以分散式化的工作本身意義也是非常大的。

這個圖展示的是工具融合方案。我們這裡利用的是Kubernetes,支持主流的深度學習工具,每個工具做成鏡像形成POD,用戶需要的話可以直接把POD分發給他,用戶在訓練的時候從HDFS上直接拉取樣本,並且把訓練的參數回寫到HDFS上,也就是說通過HDFS做數據的共享,通過這種模式可以很輕鬆地支持多種深度學習工具,也可以達到按所需資源量進行資源的分配目標。

另外我們會做一個深度學習工具分散式的改造,是針對caffe,我們用的是CaffeOnSpark,即把整個分散式的方案做成模板供用戶使用。首先啟動多個POD,通過POD啟動一個Spark集群,然後再提一個Spark job來做訓練,最後在整個訓練結束之後再把集群停掉。Tensorflow也是一樣的,首先啟動tensorflow集群,然後提交任務,任務訓練完以後再把集群停掉。其他工具分散式化我們也會採取類似的思路解決。以上是關於深度學習這塊我們目前的一些工作。

其次,是關於空間資源利用率的。目前我們有一千多台機器,存儲是很大的成本。之前也提到了,我們是屬於花錢的部門,所以壓力非常大。那怎麼節省成本是一個很重要的問題。除了傳統壓縮之外,還能做什麼?HDFS RAID是一個比較好的解決方案。

HDFS RAID採用是RC編碼,類似RAID6,比如一個文件有m個塊,根據m個塊生成k個校驗塊,然後能保證k個塊丟失的情況下數據還能找回來,舉個例子來說,比如文件2.5G大小,256M一個塊,可以分成10個塊,根據RC演算法再生成4個校驗塊,可以保證丟了4個塊情況下,數據都能找回來。在這個例子中,3副本情況下,一共需要30個塊,而採用HDFS RAID,僅需要14個塊。但他們的可靠性一樣,空間佔用情況卻差了57%。

具體實施時,第一步對集群數據進行冷熱分析,RAID畢竟有些性能問題,一旦數據有問題,你要通過計算才能恢復,勢必會造成性能低下,所以針對冷數據做肯定是風險最低的。第二步就是壓縮+archive+RAID,通過三方面技術結合把文件數和空間全部節省出來。歸檔實際上是會變換目錄的,為了做適配,我們通過軟連接功能,做到對用戶透明。最後在數據讀取時,如果是RAID數據,就要具備實時RAID修復功能才能保證在數據缺失的情況下不影響數據的訪問。

後續我們會對計算資源利用率再做進一步提升。另外也會考慮Storm和YARN擴展性。還有Kubernetes調度優化,比如針對GPU資源管理功能。

作者:趙健博原文自微信號InfoQ

往期閱讀推薦:

美團的大數據平台架構實踐

鏈家網大數據平台建設,平台樞紐--工具鏈

大數據平台在互聯網行業的應用


推薦閱讀:

大數據那些事(26):你還愛我嗎之Stinger的努力

TAG:大数据 | 信息技术IT | Hadoop |