Nesto - Hulu用戶分析平台的OLAP引擎
本文主要介紹Hulu用戶分析平台使用的OLAP引擎——Nesto(Nested Store),是一個提供近實時數據導入,嵌套結構、TB級數據量、秒級查詢延遲的分散式OLAP解決方案,包括一個互動式查詢引擎和數據處理基礎設施。
(萬字長文,從這裡出發,閱讀大概需要15分鐘)
1. 項目背景
Nesto起源於用戶分析團隊,業務上需要一個面向用戶、分析型的產品,提供任意維度的Ad-Hoc互動式查詢、導出數據,供運營、產品、第三方數據公司使用。
一個典型場景是:導出2018年1月看過《冰與火之歌》第7季第7集(S7E7)超過5次的新註冊用戶,包括用戶名、email兩個域,用於發送營銷郵件。
2. 數據平台pipeline
在正式介紹Nesto之前,有必要先介紹其產生的背景,所以先介紹數據平台pipeline。
如下圖所示,用戶分析平台的最核心的資產是一套pipeline,通過整合公司內多個團隊的數據,使用HBase集中存儲起來,提供一個UI Portal讓用戶簡單的描述需求,把需求存儲於metadata db中,然後定期運行一個Spark Job去scan HBase,進行批量的計算,最終把有價值的數據服務出去,例如作為用戶標籤服務的上遊方等。
先說存儲,用戶維度數據使用HBase存儲的原因在於:
- 大寬表KV模型。可以看做一個多維、稀疏、嵌套的sorted map。
- 橫向擴展能力。按照userId反轉作為row key,可以自動sharding分區,橫向擴展到PB級別。
- 隨機讀寫能力強。可進行隨機的查詢,以及數據修復。
- 高吞吐的導入能力。非同步批量寫入速度快,LSM結構保證寫入高吞吐,還可以使用bulk load繞過Region Server導入。
- 可以和大數據技術棧無縫融合,包括Hadoop/Spark等。
HBase中一行就是一個用戶的全部信息,有些值是上游直接使用,有些需要二次加工,邏輯上可以看做下面的結構。
{ "uid": 100, "raw_attributes": { "email": {"value": "jack@hulu.com"}, "gender": {"value": "m"}, "signup_date": {"value": 1507359323}, "age": {"value": 49} }, "behaviors": { "watch": [ { "cid": 9800, "duc": "Living Room", "dlc": "CONSOLE", "seventid": 800, "genre": ["Documentaries"], "video_type": "feature_film", "timestamp": 1273774176 }, { "cid": 9801, "duc": "Living Room", "dlc": "CONSOLE", "sid": 801, "genre": ["Animation and Cartoons", "Family","Kids"], "video_type": "feature_film", "timestamp": 1373774176 }, { "cid": 9802, "duc": "Computer", "dlc": "EMBED", "sid": 802, "genre": ["News and Information"], "video_type": "clip", "timestamp": 1473774176 } ] }}
uid是rowkey,raw_attributes、behaviors當做column family,birth_date、age當做column qualifer,其中age是經過birth_date二次計算而來的,timestamp可以看做業務有意義的時間信息,值就是存儲在Hbase中的Cell Value,每一個Cell都是一個帶有schema的Proto,便於高效序列化與反序列化。watch是觀看行為,是嵌套的,利用HBase的多版本實現,這樣一個用戶的多個觀看行為就封裝到了一行中。
UI Portal可以方便的通過拖拽生成一個DSL。下圖展示的用戶輸入某個過濾條件的截圖,對應的查詢場景是:導出2018年1月看過《冰與火之歌》第7季第7集(S7E7)超過5次的新註冊用戶。
目前我們支持使用Json或者SQL++來表示DSL。Json的DSL是自定義的,為了表達清楚只介紹SQL DSL。SQL++是一個開源的嵌套數據查詢SQL,其他類似的還是有Drill等,也支持嵌套數據查詢,他們功能類似,語法會有所不同。例如典型場景生成的嵌套查詢SQL如下:
SELECT reg.raw_attributes.username, reg.raw_attributes.emailFROM RegUsers AS reg WHERE ARRAY_COUNT( (SELECT w.cid FROM reg.behaviors AS b UNNEST b.watch AS w WHERE w.cid = 9800 AND w.ts >= 2018-01-01 00:00:00 AND w.ts < 2018-02-01 00:00:00)) > 5ANDreg.raw_attributes.reg_at >= 2018-01-01 00:00:00 AND reg.raw_attributes.reg_at < 2018-02-01 00:00:00;
這個SQL中暫時沒有包含GROUP-BY聚合處理,但是使用了ARRAY_COUNT這個針對嵌套類型的UDF,子查詢用UNNEST語句flatten下層的watch行為,關於語法的詳細描述,可以參考SQL++。
原始的pipeline依賴一個自研的predicate lib,處理流程如下,將HBase中某一行轉化為一個嵌套的數據結構,在這個數據結構上做predicate。依託分散式計算引擎,例如Spark,就可以掃描HBase中存儲的所有用戶,apply一個predicate,符合條件的即輸出結果列。目前predicate lib實現了很多的UDF,包括時間範圍、If-else邏輯處理等。
Array[User] -> Predicate(SQL++ or JSON DSL) -> Array[Field1, Field2... Field N]
但是pipeline本身不具備OLAP的能力,不能實現聚合、排序、TOP-K等運算元。
3. 數據規模
目前用戶數據規模如下,
1、HBase 1500 Regions, 佔HDFS 20T (一副本、LZO)。
2、300+列,其中有50多是嵌套的結構,例如觀看行為。
3、1億+用戶數據。
4、歷史全量近1000億觀看行為,最近一年近300億次。
4. Nesto的誕生
為了滿足OLAP的需求,包括
1、支持filter, projection和aggregation和自定義UDF。
2、Ad-hoc查詢, 普通列響應時間秒級,嵌套列小於百s。
3、數據從進入OLAP到能夠被查詢到,延遲要在小時級別。
我們對比了如下開源方案,
1、ROLAP
類似SparkSQL、Presto、Impala等,它們都必須把數據抽象為關係型的表,可以使用表達豐富的SQL,不存在數據冗餘,在實際運行期間往往會經過SQL詞法解析、語法解析、邏輯執行計劃生成和優化,再到物理執行計劃的生成和優化,會存在數據shuffle、join。這與現有的用戶分析平台已存數據模型——大寬表,不一致,需要經過ETL做數據轉換。另外我們的另一個目標支持近實時的數據導入,而這些方案的OLAP目前都不支持。
2、MOLAP
類似Druid、Kylin、百度Palo等,他們都支持多維查詢,通過預聚合的方式來提升查詢性能,但是需要抽象出維度列、指標列,甚至某個維度的分區等。同樣數據模型和大寬表不一致。另外,用戶分析平台往往涉及一個用戶所有行為數據,查詢請求往往就是要查詢若干月,甚至若干年之前的,涉及大量fact數據的全表scan,這也不能很好的match這種物化視圖或者上卷表的模式。
因此,hulu決定走自研路線,取名為nesto。自研的最大源動力在於:
1、復用大寬表這種嵌套的數據模型。好處在於可以符合HBase的模型,最大化復用自研的predicate lib。
2、避免join和shuffle,所有數據在內存里進行計算。最多兩層stage,不存在多個stage的劃分,最大化提高性能。
3、數據掃描往往涉及大量歷史數據。由於hulu視頻網站劇集的要求,往往需要查詢N年前至今的數據,因此時間跨度很大的全表scan場景很多。
4、近實時數據更新。希望融合Google MESA的思想。
5. Nesto的基礎
Nesto的實現依賴於一些已有的技術和理論。
1、存儲模型。採用嵌套模型,非關係型。
2、存儲格式。列式存儲,對於OLAP,可以跳過不符合條件的數據,降低IO數據量,加大磁碟吞吐。通過壓縮、編碼可以降低磁碟存儲空間。只讀取需要的列,甚至可以支持向量運算,能夠獲取更好的掃描性能。Nesto採用Parquet作為存儲格式,是Google Dremel的開源實現。Parquet對嵌套數據結構實現了打平和重構演算法,實現了按行分割(形成row group),按列存儲(由多個page組成),對列數據引入更具針對性的編碼和壓縮方案,來降低存儲代價,提升IO和計算性能。
3、MPP架構。大規模並行處理架構,可以支持查詢的橫向擴展,為海量數據查詢提供高性能解決方案,實際上Nesto借鑒了Presto。一個Parquet文件是splittable的,因此利用DAC分治的思想,把大問題劃分為小問題,分散式並行解決。
4、RPC選型。分散式系統的RPC通信是基礎,Presto大量採用RESTFul API解決,而Nesto選擇使用Thrift進行封裝解決,提供基於NIO全雙工、非阻塞I/O的通信模型,通過Reactor模式實現線程池和串列無鎖化來實現服務端API的暴露。
5、分散式配置。Nesto中的表結構存儲在分散式配置系統中,可做到熱部署更新。
6、高可用保證。使用YARN管理實例,保證高可用和資源的合理分配。使用Zookeeper做集群節點變更的通知與分發。
7、海量數據近實時查詢支持。借鑒Google MESA的思想,關於MESA的模型,請參考這篇文章了解《淺談從Google Mesa到百度PALO》。
8、其他技術點。使用MySQL存儲已完成任務情況,使用web技術構建管理Portal。使用Hadoop基礎設施,包括HDFS存儲數據。
9、實現語言。Java。
6. Nesto的存儲模型
邏輯上,可以看做一張嵌套的大平表(flat table),數據按照行存儲,每一行的結構都是嵌套的。和第二章提到的HBase的模型邏輯上是一致的。
物理上,採用開源列式存儲方案,Nesto選擇Parquet,它獨立於計算框架,按照Google Dremel提到的方案做按行切割,按列編碼壓縮。一張表對應1到N個Parquet文件。
下圖是Parquet官網的物理存儲圖。每個Parquet文件都是包含若干row group,這是做MPP的基本分割單元,一個MPP的sub-task可以對應K個row group,一個row group包含了若干用戶的全部信息,按照schema定義的列,進行列式存儲,每列包含若干個page,每個page是最小的編碼壓縮單元。每個列都可以採用自由的編碼方式,例如run length encoding、dict encoding、bit packed encoding,delta encoding等等,或者他們的組合。
用於MPP架構的存在,通常會多增加副本數,來支持讀負載均衡和本地化locality查詢。
表結構元數據不用DML描述,而是使用Parquet提供的proto schema方式,目前通過分散式配置中心管理,通過管理控制台新增表和修改表。例如,在配置中心表RegUsersAttributes的schema描述如下。
7. Nesto整體架構
Nesto分為查詢引擎和數據處理基礎設施兩大部分。
查詢引擎的架構如下。
Nesto-portal。客戶端,用於提供基於web的查詢請求輸入和下載結果。
Nesto-cli。客戶端,提供命令行互動式的查詢。
State store。使用zookeeper來做集群管理,進而實現高可用的分散式系統,任何節點都可以知道整個Nesto的拓撲結構。
Nesto server。非中心化的設計思想,類似Presto,任意節點分為兩種角色,包括coordinator和worker,一般來說都是少數的coordinator加上大量的worker的拓撲。每一個部署節點都是一個Nesto server,只不過角色有區分。目前nesto-server均部署在YARN上,做常駐進程,YARN做高可用保障和資源分配管理。
Coordinator。是某一個查詢的管理節點,負責接收客戶端的請求,解析請求,由於使用大寬表的數據結構,加上復用predicate lib,因此做完詞法分析、語法分析,生成AST後,查詢計劃的生成很簡單,把filter的邏輯全部下推到底層的worker去執行即可,只需要做table的split就可以生成sub-tasks,這些sub-tasks就是物理執行計劃的體現,所以不存在stage或者fragment等類似Presto、Impala的概念。Coordinator通過State store感知集群的拓撲,同時和每個worker都保持了一個心跳,worker通過心跳信息上報自己的狀態,coordinator可以進一步了解worker的負載和健康狀態。Coordinator通過一定的調度演算法,把sub-tasks分發給worker去執行,等待worker的結果匯總過來,如果需要再做一些aggregation和merge的工作。最終,流式的傳輸結果給客戶端展現或者下載。
Worker。接收coordinator分發的若干sub-tasks,放到線程池中執行(線程池就是槽位slot),通過Parquet提供的API,逐一讀取一行的數據,利用predicate lib進行filter,通過一個非同步的生產者消費模型,批量處理數據,然後分批序列化後,按照data chunk的方式,源源不斷的發送回coordinator。另外,worker會做一些優化工作,除了天然的filter pushdown,還包括pre-aggregation,limit pushdown等等。
數據處理基礎設施請參考第9章節。
8. Nesto的查詢引擎
下面針對一個典型的查詢執行過程,進一步展開描述各個組件的工作流程。
8.1 State store
主要做高可用,節點發現、上下線通知使用。依託zookeeper,每個nesto-server在啟動的時候都會註冊自己到zookeeper的某個臨時節點,任何想知道集群拓撲的地方,例如其他nesto-server、nesto-cli、nesto-portal都通過訂閱zookeeper得到集群的拓撲情況。當集群發生變化的時候,可以通過zookeeper訂閱變化。
8.2 Nesto-server之Coordinator
1) API
Nesto server需要提供兩套API,一個是客戶端與之交互,另一個是coordinator和worker通信的API,都通過thrift進行開發和編寫。使用TThreadedSelectorServer作為通信基礎設施,Linux上使用I/O多路復用的epoll技術,同時使用兩種線程池,一個做accept連接通信握手技術,一個做編解碼和業務邏輯的處理。
2) 解析請求
通過thrift API方法提交上來的請求,針對JSON或者SQL++ DSL解析,同時得到抽象語法樹AST,包含查詢表,filter條件,表的schema信息,要查詢的列,以及每一列的聚合函數。
3) 執行計劃生成
由於Ad-hoc查詢和MPP的架構,因此系統的並發查詢能力要做一定的限制,Nesto可通過參數配置並發執行請求的數量,在解析請求完畢後,會把request放入帶有超時機制的線程池中,如果查詢沒有在規定時間內完成,那麼就會取消查詢,並且revoke掉所有已經分發的sub-tasks。線程池中會進行執行計劃的生成和後續的處理流程。
執行計劃的生成分為兩步,第一步把大的表,也就是Parquet文件進行split,每個split都是一個sub-task,這裡復用了parquet-mr項目中的ParquetInputFormat,把一個大的Parquet文件split為若干個InputSplit,對於每一個split的大小可以通過參數控制,也就調整了每個sub-task掃描的數據大小,可以避免data skew問題,例如1G一個split,這1G可以包含多個Row Group,而每個Row Group可能是HDFS Block Size,例如256MB。第二步,根據filter條件列以及結果列構造新的schema,目的是用Parquet讀取文件的時候需要傳入這個schema,這樣就可以只查詢需要的列,發揮列式存儲在IO上面的優勢。
4) 調度分發任務
執行計劃生成了本次查詢的所有分片信息,如何調度分片給合適的worker去執行,也就是生成worker到多個Task的映射,是調度任務負責的。這裡可以包含很多策略,例如可以輪訓的assign task到每個node,也可以按照HDFS Locality來進行數據本地化的優化,還需要綜合考慮每個worker的負載狀況,把task分配給負載較為輕的worker,通常負載要考慮的維度,包括worker節點的CPU、IO、slot佔用數、權重等。如下圖所示。
同樣分發任務,也可以通過線程池來完成,通過thrift RPC調用worker的API來提交sub-tasks給worker。通過使用響應式編程(Reactive),或者帶有非同步回調機制(例如Java中的Future) 的方式來實現成功和失敗的邏輯,針對失敗的分發(例如因為worker拒絕或者下線)可以再分發給別的worker。
另外在分散式系統中不可忽略的因素,就是長尾效應,總會存在一些「拖後腿」的任務,進而影響整個query的查詢延遲,所以Nesto還開發了慢請求的檢測和重分發機制,針對straggler task,通過一定機制的判斷,最簡單的模型就是針對最後1%的sub-tasks,如果運行時間超過閾值,則進行speculative execution,也可以叫做duplicate execution,多分發若干的sub-tasks出去,誰先執行完畢,就用誰的結果,進而優化長尾效應。
5) 查詢執行
當sub-tasks都分發出去後,worker會源源不斷通過thrift RPC把結果批量的發送回coordinator,每一批可以看做是一個data chunk。
coordinator在內存中維護一個aggregator數據結構,來merge所有worker返回的data chunk,一個data chunk就是一個批次的結果,data chunk使用某種序列化協議(例如Java原生或者Kyro),data chunk可以支持去重。
一個data chunk包含了一部分結果列,例如select列為reg.raw_attributes.username, reg.raw_attributes.email,則在coordinator會追加累積這些數據,然後再源源不斷的推送給客戶端。
如果結果列包含聚合函數,例如GROUP BY cid,COUNT(userid),那麼worker會做pre-aggregation,把聚合pushdown到worker執行,最終給coordinator的數據就是聚合後的結果,aggregator做combine即可。同樣,常用的LIMIT pushdown也是支持的。
6) 失敗處理
類似Presto對於失敗的態度就是不太容忍,對於分發失敗的任務,在超過重試次數後就failfast整個query。對於worker返回失敗的data chunk,包括丟失響應,或者返回的data chunk非法等,超過一定的重試次數後也failfast整個query。
8.3 Nesto-server之worker
worker和coordinator類似,都存在一個thrift API,用於接收coordinator發送的sub-tasks查詢請求。
worker充分利用了線程池技術,池子里就是一些槽位(slot),每一個sub-task會佔用一個slot,當slot佔滿後就不能執行新的請求了,從而限制worker的計算能力不超負荷。
worker和coordinator維持一個心跳,定期彙報自己的負載信息,包括CPU、IO、slot佔用情況等,供coordinator調度演算法使用。
和Presto類型,抽象出connector的概念,Nesto的worker抽象出了scanner的概念,這是一個可擴展的介面,目前只支持Parquet文件的查詢,後續可以擴展到CarbonData等。
worker要做的工作就是根據Parquet提供的client API讀取文件,解壓縮、解碼文件,在內存中構造出所有row group的行視圖,這個過程是一個流水線式的,保證儘可能低的使用內存。文章最早提到了pipeline中已經存在的predicate lib,使用這個lib,apply到一行,就可以做filter,也就天然實現了filter pushdown的功能,對於符合filter條件的行,取其結果列,在內部維護一個隊列,批量的構造data chunk,序列化後不需要進行shuffle落盤,直接在內存裡面,網路直連的發送回coordinator即可。這種filter pushdown到最底層的方式,避免了落盤和JOIN operator,所以在性能上對於無法剪枝的scan型的場景會非常高效。具體流程如下圖所示。
另外,aggregation pushdown、limit pushdown都是worker已經實現的優化。
8.4 Portal & Cli
Nesto Portal提供了UI,可以方便PM、運營人員提交查詢,Portal截圖如下。例子中使用JSON DSL發送請求。
Nesto cli提供了命令行方式,進行查詢,一次查詢的請求如下。
./nesto-cli -mode i -schema RegUsersAttributes -table_path hdfs://nesto/RegUsersAttributes/sl2-prod-100No JAVA command specified and try to use $JAVA_HOME or just "java" instead. _ _ _ _ _ | | | ___ ___| |_ ___ ___| (_) | | |/ _ / __| __/ _ _____ / __| | | | | | __/\__ || (_) |_____| (__| | | |_| \_|\___||___/\__\___/ \___|_|_|Welcome to Nesto-cli. http://test.com/status will be used as Nesto portalStarting...Coordinator test.hulu.com:58807 will be used to serve callsType help, type bye to quit> SELECT reg.raw_attributes.username, reg.raw_attributes.email> FROM RegUsers AS reg > WHERE > COUNT(> (SELECT w.cid > FROM reg.behaviors AS b > UNNEST b.watch AS w > WHERE w.cid = 100> AND w.ts >= 2018-01-01 00:00:00 AND w.ts < 2018-02-01 00:00:00)> ) > 20> AND> reg.raw_attributes.reg_at >= 2018-01-01 00:00:00 AND reg.raw_attributes.reg_at < 2018-02-01 00:00:00;Querying.........If nesto.query.quite set to false, visit http://test.com:8090/query.jsp?queryid=query_aaa_60066_106 for more info.........ScannedCount: 109586, RowCount: 4 raw_attributes.username.value | raw_attributes.email.value---------------------------------+------------------------------------- Jack | jack@abc.com Richard | richard@abc.com Christy | christy@abc.com Jeanette | jeanetteburel@abc.comTotal records: 4Query done. (costs 2 seconds)
8.5 部署
Nesto在部署上,默認採用YARN來管理,以進程常住的形式部署在YARN Node節點上,通過利用YARN的編程介面,開發Client、AppMaster等程序,就可以管理coordinator和worker常駐進程,做到高可用保障和合理的資源分配。
另外,Nesto也支持本地standalone和pseudo偽分散式部署模式,方便調試和測試。
9. Nesto的數據處理基礎設施
前面介紹了Nesto的查詢引擎,還包括一個數據處理基礎設施。
Nesto支持近實時數據導入,利用了Google MESA提供的數據模型,由Base、Cumulative delta和Singleton delta組成。如下圖所示。
數據處理基礎設施架構圖如下。
其中Base文件定期通過ETL任務導出,由於數據量大(歷史全量數據,1000億行watch行為,300多列),因此這是一個很重的任務,而每天變化的數據量又非常小,因此儘可能低頻率的運行。在圖中體現為exporter組件。
Delta文件的生成通過HBase coprocessor捕獲,並且發布到Kafka中做存儲,多分區,單分區內數據可保序,因此單個用戶不會存在業務上的亂序影響。增量處理程序以小時級的頻率生成delta。在圖中體現為updater組件。
當過多的delta存在的時候,查詢引擎會進行更多的隨機I/O,不利用優化響應延遲,因此存在一個job以若干小時的頻率合併delta為一個大的delta,叫做cumulative delta。在圖中體現為compactor組件。
Updater和compactor會通過controller進行調度運行,他們都是無狀態的,因此通過controller保存和註冊生成的delta,每個delta都要一個version,查詢引擎在做執行計劃生成的時候,可以訪問controller查詢最優的query path。
在上面幾個章節的查詢引擎介紹里都簡化了處理,沒有考慮增量情況,便於讀者理解。
這裡要注意,在查詢引擎中使用delta,就必須保證和base中的row group中的行都是match的,這裡的任務都會按照某個業務主鍵,例如userid,來排序文件,這樣在查詢引擎內部就可以使用基於最小堆的多路歸併排序來做數據的merge了。
10. 總結和未來計劃
目前Nesto在Hulu內部承擔著實時用戶數據OLAP查詢的需求,供PM、運營人員使用。目前線上僅需部署了60個nesto-server節點,最大表的Parquet文件大小在3-4T(單副本壓縮後),對於簡單列的查詢,在秒級完成,對於包含watch行為的嵌套列,在百毫秒以內完成。
未來Nesto的增強點包括使用基於codegen技術的predicate lib(目前內部已完成藉助spark sql catalyst的改造),最大化filter性能。冷熱數據區分,智能表路由,避免粒度過粗的full scan。數據拖敏,更好的安全保障。通用化的平台,支持多租戶的使用。更好的支持嵌套SQL查詢。
目前Nesto由於綁定了內部的predicate lib,同時還利用了Hulu內部的很多基礎實施,有些實現不是非常的通用,所以暫時還無法開源出來。
總結下,本文介紹了Hulu用戶分析平台使用的OLAP引擎Nesto,它是一個提供近實時數據導入,嵌套結構、TB級數據量、秒級查詢延遲的分散式OLAP解決方案,包括一個互動式查詢引擎和數據處理基礎設施。雖然Nesto是一個in-house的內部系統,但是本文還是最大化的展示了一些背後的架構、思想、實現細節,以及自研的tradeoff和原因,希望可以給讀者一些OLAP方面的啟發。現在的OLAP市場百花齊放,但是其思想和技術都是建立在已經成熟或者前人的基礎上的,通過更好的理解和應用這些思想和技術,來滿足組織和業務的需求,才是大家的目的,希望在OLAP領域與讀者共勉一起學習和進步。
摘自作者博客的原文地址。
推薦閱讀: