計算與存儲分離實踐—swift消息系統

摘要: swift是搜索事業部自主研發分散式消息系統,它的主要存儲基於分散式文件系統,資源需求基於分散式調度系統。swift能支持每秒數億的消息傳遞,支持PB級消息的存儲。

原文:click.aliyun.com/m/4184

1. 相關背景

搜索事業部與計算平台事業部目前使用消息隊列主要有以下三種場景:

1. 每天有上萬張表需要通過Build Service來構建索引。這些表主要來自主搜索,IGRAPH,Rank Service等業務,且每個表包含的文檔數差別很大。總數據量為PB級別,總文檔數達萬億級。文檔的大小不一,小到幾十Byte大到幾百KB。在Build Service內部,文檔處理與索引構建需要一個消息隊列來傳送消息。因此在build時,容易產生突發大流量(幾百G/秒,幾千萬條/秒)持續消息寫入與讀取。

2. 搜索的在線服務如主搜索查詢服務,RankService打分服務或IGRAPH服務需要毫秒級的實時文檔更新。這些服務引擎基本上是多行多列結構,即每一行是一個完整的服務單元,由多台機器組成,多行提升服務的總能力。大的服務通常包含數百行,所以一條實時消息通常會被消費數百次,在線同時實時消費的機器規模也達上萬台。

3. 在線的實時消息主要來自離線實時模型訓練,用戶的實時瀏覽、點擊、加購行為或者商家的增刪改寶貝等。離線訓練任務會同時啟動幾十萬個worker對上千張表產生實時消息,寫請求每秒達千萬次。

對於這幾種場景,傳統的消息隊列(如Kafka等)要同時滿足,至少需要成百上千台物理機,且系統還需要做改造來適用於每天上萬個topic的增減和幾十萬的生產者與消費者同時讀寫消息。另外這些機器的failover管理也是個大問題。現實中,搜索團隊所有的機器都是由調度系統統一管理和復用,沒有專門的物理機可給消息系統獨佔使用。

本文將介紹搜索事業部目前使用消息系統swift。主要介紹系統結構和消息可靠傳遞機制這兩方面。最後介紹下swift系統在今年雙十一期間的表現。

2. Swift介紹

傳統的消息隊列通常為消息的安全性,一般先要求消息落盤到本機後才返回成功。這限制的機器的遷移,擴展和復用。因為消息數據只存一兩台機器,機器遷移必然導致數據的遷移。傳統消息隊列要有較高的性能,通常先要解寫磁碟的毛刺等io問題。特別是機器與其它應用復用時IO問題並不好解決。所以傳統的消息隊列一般要求機器獨佔使用。

計算與存儲分離一直是最近年來研究與應用的熱點。計算與存儲分離帶來最大的好處是機器的遷移與調度不再受到數據存儲大小與位置的限制。計算資源在調度系統的管理下可以近無限的擴展。存儲系統如HDFS,PANGU能提供的PB級的存儲空間以及百萬級的文件讀寫。

計算與存儲的分離在消息中間件系統中的應用還比較少,主要的問題是傳統的分散式文件系統的讀寫響應latency遠大於本機的磁碟。如果要保證消息先落盤,導致整個請求的latency就會飆升,影響整個消息系統的吞吐率。但隨著硬體水平的提升如25G/100G網路和RDMA等新技術的出現,分散式文件系統也有了質的飛躍,例如集團盤古2.0等系統為低延時高可靠的存儲提供很好的存儲平台。

2.1 Swift系統結構

Swift消息系統是在計算與存儲分離上的一次嘗試。它主要有以下特點:

首先,Swift的每個計算結點都是無狀態的,即每個worker上除log記錄,不存儲任何消息系統相關數據。在swift消息系統中,系統的狀態數據存儲在zookeeper上,消息的內容則存儲在分散式文件系統如HDFS, PANGU等上。

其次,Swift的每個計算結點都是等價的,只要消息系統需要計算資源,就可以通過調度系統不停的申請並提升整個消息系統的服務能力。目前Swift可以跑在Hippo或者Yarn上面。 Swift消息系統每次申請的資源粒度也比較小,可以充當調度系統的碎片利用者。

最後,Swift自身的client與server的消息讀寫協議,能夠保證消息高效可靠的傳遞。

圖 1 SWIFT系統結構圖

圖1是SWIFT系統結構,其主要分成2種worker: Admin和Broker。Admin和Broker的資源分配與啟動都是基於調度系統。目前支持Hippo與Yarn這兩種調度系統,這2種worker都會有很多個實例,Broker worker都是等價的,Admin worker則有一個leader,其餘的等價,這些worker一般在Docker容器中工作。

Admin角色主要負責:1. Topic的增刪改 2. Topic對應物理partition與broker調度 3. Client讀寫數據時物理partition的定位。4.資源的調整,如broker個數的增減等。

Broker角色主要負責:1.partiton相關的消息的讀寫 2. Partition相關數據的管理如過期數據的清理等。

2.2 Swift Topic介紹

Swift系統中的topic與其它消息系統的類似,它是一堆相關消息的集合,通常由業務自定義。如圖二所示,在swift中topic是由65536個邏輯分區組成,編號是[0 - 65535]。在Swift消息系統內部,topic是由partition組成的,每個partition負責一個range的邏輯partition讀寫。

在用戶層面,用戶看不到Swift的物理partition,寫消息時要麼需要提供一個hash欄位(由swift client自動映射到相應的邏輯分區)要麼提供一個0-65535的邏輯編號。 swift根據topic下每個partition的服務range,把消息寫入相應partition的writer中。Writer可以通過同步與非同步方式把消息append到對應的物理partition中。

Topic里物理Partition個數的多少影響整個topic的讀寫能力,通過邏輯partition與物理partition映射,當topic的服務能力不足時,可以動態的擴展物理partition來提升讀寫能力。另外,物理partition是Swift的基本調度單元,admin會根據每個broker worker負載,盡可以平衡的調度partition。

圖 2 swift topic數據寫入示意圖

3. 可靠消息讀寫機制

先前提到傳統的消息系統為了保證消息的可靠性,在寫消息時需要先落盤,以防機器掛掉時,消息不丟失。Swift也提供類似的模式,但落盤的對象是分散式文件系統如HDFS。這種模式下正常寫落盤消息延時的毫秒級,當HDFS壓力大時,會變成秒級,所以其性能不太穩定。Swift 設計了一種client與broker之間,broker與HDFS之間的消息寫入與確認協議來保證消息高效可靠的寫入與持久化,其機制類似TCP的滑動窗口協議。

圖3是消息非同步安全發送的示意圖。Broker在分配到partition進行服務時,會生成一個標記,其由partition的版本號(V),broker載入partition時間戳(S)以及消息持久化的checkpoint (C)組成。Client在向admin定位到partition所在broker的時也會獲取partition的版本號(V)。版本號V主要在topic屬性發生變化時(例如partition的個數等)會更新。時間戳每次partition發生重載入或調度都會發生變化。

圖 3 SWIFT非同步安全發送消息示意圖

非同步安全寫消息工作流程如下:

1. 用戶通過客戶端寫入一條消息,client定位到寫哪個物理partition,同時把消息寫入到對應的buffer中。用戶寫消息時,還可以給每條消息設置一個遞增編號,swift client會自動映射寫消息進度與編號的關係。在非同步模式的,client會有專門的提交線程與broker進行通信。

2. Client第一次向partition發送消息時,broker會驗證partition的版本V0, 匹配後才會接受消息,同時會把三元組(V,S, C)返回。client收到accept消息後,會更新已接受消息的游標和協議的三元組信息。

3. 客戶端可以持續的寫入消息,同時broker那能partition中的消息做非同步持久化,當持久化成功時,會更新持久化信息(Ca)。持久化成功的消息在內存中不會馬上刪除,只有內存不足時才會被回收。

4. Client的後台發送線程繼續工作,發送消息b,同時請求帶上了(V0,S0)。

5. Broker端驗證(V0,S0),收受消息b,順便把持久化信息也返回(V0,S0,Ca), client接收到accept信息後,更新已發送的游標到b,同時更新已接受的游標到a。消息a已經持久化成功,在使用的內存將會被writer回收。Writer更新checkpoint (Ca)給用戶層,表示消息a已經持久化。

6. 同3一樣,client繼續寫消息c,broker繼續持久化消息b。

7. 此時partition發生了調度(例如被分配到了其它機器),其HDFS上的文件消息馬上可以讀取到,但內存中的消息會被清空。此時partition載入時間戳變成了S1。Client向admin重新定位到partition的服務broker寫入的消息c和(V0,S0)。

8. Broker檢查client發送的(V0,S0)與自身的(V0,S1)不相等,將拒絕此次消息的寫入。主要基於消息在partition內要求保序考慮。此時client還不知道b是否被序列化成功,partition重新被載入b是否被序列化成功的信息也會被丟棄(無狀態),所以它也不知道。因Broker返回(V0,S1,C0),要求client重新發送未持久化的所有消息。

9. Client 重置已發送游標到b之前,更新S1並重新發送消息b和c。

10. Broker檢驗client的(V0,S1)並收受消息b和c,這時消息b會被再次持久化化到HDFS上。Client重新更新已發送游標到c。如果此後無新消息的寫入,且buffer中的消息還有未被持久化的,client會發起一次空寫操作獲取最新的持久化信息。

步驟1-10是非同步消息寫入的工作方式,用戶層可以獲取到當前持久化消息的checkpoint,可以自己記錄發送進度以便回滾。如果不方便記錄發送進度,可以在寫完一段數據後,調用flush方法強制把數據從client的buffer放到broker的buffer中。此時消息雖然沒有被持久化,但在client與partition各存一份。所以只有在broker與client同時掛掉才出現消息丟失,因此我們認為這種方法也是比較安全的。

Swift partition的寫buffer緩存所有寫入的消息,只有當空間不足時,消息內存空間才會被回收。對文件上的消息讀取,也會以塊buffer的方式做緩存。Partition之間的buffer各文件cache buffer都是共享存儲,由統一的內存回收模塊管理。其保證冷門的partition基本不消耗資源,熱門的partition可以充分利用資源。正常情況下,swift的內存可以緩存數分鐘的消息,所以消費消息時基本上從內存讀取,讀的性能也會很高效。在這個協議下,分散式文件系統偶爾抖動也不會影響消息的時效性,實際上文件系統在數分鐘內的掛機也不影響消息的實時傳遞。

4. Swift IN雙十一

2017年swift消息系統開始在搜索事業部與計算平台事業部大規模應用,主要場景除主搜索外,還包括Porsche,K-monitor,IGraph, DII, OpenSearch,RankService等業務。另外,Swift在螞蟻金服,阿里媽媽和神馬事業部也有多套swift機群的部署。

雙十一當天,同時服務的topic個數均值近萬個, partition個數達10萬。當天創建與刪除的topic近2萬個,其主要來自IGRAPH, RankService,DII等業務索引的重建,平時也差不多是這個數量級。 當天Swift消息系統總共寫入數萬億條消息,讀取數十萬億條消息,讀寫比3:1。總讀寫消息內容位元組大小數PB,消息的內容是經過Swift client壓縮,一般消息壓縮率是原始大小的1/4 - 1/2之間。讀寫的最大QPS與均值都超億條/秒,讀寫請求的峰值與均值超千萬次/秒。在線與離線讀寫消息的worker超20萬個。 另外,swift日常處理的數據與雙十一的數值相差不大。

5. 總結與展望

Swift消息系統經過一年多的不斷改進與優化,目前每天能處理PB級與萬億級的消息,但在不久的將來還需要解一些問題:

1. 超大量client如百萬級的client寫入,涉及到的partition定位與worker的連接問題。當client達百萬時,首先碰到的一個問題是連接數不夠用,目前離線的一個client寫數據會對所有相關載入partition的worker產生連接,如果worker有N個,partition有M個,其連接數達N*M個。其次partition發生調度時,partition的定位瞬間能打爆Admin。

2.每秒百億級別的消息讀寫時,如何減少系統處理消息量。Swift目前有client主動合併消息的優化,但能合併的消息數量並不多,能否在broker端進行消息合併與存儲。在大規模消息讀寫時如何降低對HDFS的壓力。Swift目前提供內存topic等來儘可能的減少消息落盤,是否有更好的機制也需要探索。

6. 相關職位招聘

崗位描述:

參與阿里巴巴集團內部實時消息系統開發,支持每秒萬億級別消息讀寫,提供高可靠、高性能、高伸縮、低延時的服務,支撐電商、金融、物流、文娛、大數據、人工智慧、搜索、廣告等各種業務場景。

崗位要求:

1. 精通C/C++語言和數據結構,演算法和數據結構基礎紮實

2. 學習能力較強,有很好的邏輯思維能力,善於主動思考,對技術有強烈激情

3. 具有優秀的分析和解決實際問題的能力和態度,追求編寫優雅的代碼,從技術趨勢和思路上能影響技術團隊;

4. 符合以下條件之一者優先。

(1) 有互聯網中間件(數據、消息、服務等)開發經驗者優先。

(2) 對hbase/hadoop/cassandra/elasticsearch/rocksdb等開源存儲產品的一種或多種熟悉者優先

(3) 對linux內核原理或伺服器硬體熟悉者優先

職位招聘

更多技術乾貨敬請關注云棲社區知乎機構號:阿里云云棲社區 - 知乎


推薦閱讀:

高性能隊列——Disruptor
Kafka,Mq,Redis作為消息隊列使用時的差異?
Kafka入門簡介
LocalMQ:從零構建類 RocketMQ 高性能消息隊列
目前linux進程間通信的常用方法是什麼(pipe?信號量?消息隊列?)?

TAG:分布式系统 | HDFS | 消息队列 |