分散式時序資料庫 - LinDB

背景

  1. 餓了么對時序資料庫的需求主要來自各監控系統,主要用於存儲監控指標。原來使用graphite,後來慢慢有對指標有多維的需求(主要體現在對一個指標加多個Tag, 來組成Series,然後對Tag進行Filter和Group進行計算),這時graphite基本很難滿足需求。
  2. 業界現在用的比較多的主要有如下幾類TSDB:
  • InfluxDB:很多公司都在用,包括餓了么有部分監控系統也是用InfluxDB。優點,支持多維和多欄位,存儲也根據TSDB的特點做了優化。但開源的部分不支持,很多公司自己做集群化, 但大多基於指標名來,這樣會有單指的熱點問題。現在餓了么也是類似的做法,但熱點問題很嚴重,大的指標已經用了最好的伺服器,但是查詢性能還是不夠理想, 如果做成按Series Sharding那成本還是有一點高;
  • Graphite:根據指標寫入及查詢,計算函數很多,但很難支持多維,包括機房或多集群的查詢,原來餓了么把業務層的監控指標存儲在Graphite中,並工作的很好, 但是多活之後基本已經很難滿足一些需求了,由於其存儲結構的特點,很佔IO,根據目前線上的數據寫放大差不多幾十倍以上;
  • OpenTSDB: 基於HBase,優點存儲層不用自己考慮,做好查詢聚合就可以,也會存在HBase的熱點問題等,在以前公司也弄基於HBase實現的TSDB,來解決OpenTSDB的一些問題, 如熱點,部分查詢聚合下放到HBase等,目的是優化其查詢性能,但依賴HBase/HDFS還是很點重;
  • HiTSDB: 阿里提供的TSDB,存儲也是用HBase,在數據結構及Index上面做了很多優化,具體沒有研究,有興趣的同學可以在阿里雲上試一下;
  • Druid: Druid其實是一個OLAP系統,但也可以用來存儲時間序列數據,但看到它的架構圖時已經放棄了;
  • ES: 也有公司直接用ES來存儲,沒有實際測試,但總覺得ES不是一個真正的TSDB;
  • atlas: Netflix出品,全內存TSDB,最近幾小時數據全在內存中,歷史數據需要外部存儲,具體沒有詳細研究;
  • beringei:facebook出品,全內存TSDB,跟atlas一樣最近的數據在內存,目前應該還在孵化期;

3. 最終我們還是決定自己實現一套分散式時序資料庫,具體需要解決如下問題:

    • 輕量,目前只依賴於Zookeeper;
    • 基於Series進行Sharding,解決熱點,可以真正水平擴展;
    • 實時寫入,實時查詢,由於大多用於監控系統,所以查詢性能要好;
    • 由於餓了么目前是多活,監控系統也是多活,所以要支持單機房寫入,多機房聚合查詢等;
    • 自動的Rollup功能,如用戶可以寫10s的精度,系統自動Rollup到分鐘,小時,天級別,以支持大時間範圍的查詢,如報表等;
    • 支持類SQL的查詢方式;
    • 支持多副本,以提高整個系統的可靠性,正常只要還有一個副本存活就可以正常提供服務,副本數指定;

整體設計

採用計算和存儲分離的架構,分為計算層LinProxy和存儲層LinStorage。

說明:

  1. LinProxy主要做一些SQL的解析,及一些中間結合的再聚合計算,如果不是跨集群,LinProxy可以不需要,對於單集群的每個節點都內嵌了一個LinProxy來提供查詢服務;
  2. LinDB Client主要用於數據的寫入,也有一些查詢的API;
  3. LinStorage的每個節點組成一個集群,節點之進行複製,並有副本的Leader節點提供讀寫服務,這點設計主要是參考Kafka的設計,可以把LinDB理解成類Kafka的數據寫入複製+底層時間序列的存儲層;
  4. LinMaster主要負責database、shard、replica的分配,所以LinStorage存儲的調度,及MetaData(目前存儲Zookeeper中)的管理; 由於LinStorage Node都是對等的,所以我們基於Zookeeper在集群的節點的選一個節點成為Master,每個Node把自身的狀態以心跳的方式上報到Master上,Master根據這些狀態進行調度, 如果Master掛了,自動再選一個Master出來,這個過程基本對整個服務是無損的,所以用戶基本無感知。

寫入

整個寫過程分為如下2部分組成:

  1. WAL複製,這部分設計上參考了Kafka,用戶的寫入只要寫入WAL成功,就認為成功(由於主要用於監控系統,所以對數據的一致性沒有做太多的保證),這樣就可以提供系統的寫入吞吐;
  2. 本地寫入,這個過程是把WAL的數據解析寫入到自己的存儲結構中,只有寫入本地存儲的數據才可以查到;

整個過程不像一些系統在每次寫的過程中完成,我們是把這個過程分2步,並非同步化了;

WAL複製

目前LinDB的replica複製協議採用多通道複製協議,主要基於WAL在多節點之間的複製,WAL在每個節點上的寫入,有獨立的寫操作完成, 所以對於Client寫入對應Leader的WAL成功就認為本次寫操作是成功的,Leader所在的節點負責把相應的WAL複製到對應的follower, 同理寫WAL成功認為複製成功,如下所示:

多通道複製協議

寫入Leader副本成功就算成功以提高了寫入速率,也帶來了以下問題:

  • 數據一致性的問題
  • 數據的丟失問題

以上圖Server1為Leader,3個Replication來複制1-WAL為舉例來說:

  1. 當前Server1是該shard的Leader接受Client的寫入,Server2和Server3都是Follower接受Server1的複製請求,此時1-wal通道作為當前的數據寫入通道, Server2和Server3此時可能落後於Server1。

說明:

  • 整個過程需要注意以下幾個Index;
  1. Client寫入時的Append Index,表示當前Client寫入到哪裡;
  2. 對應每個Follower都會有一個Replica Index,表示對應Follower消費Leader上面同步到哪裡;
  3. Follower的Ack Index,表示Follower已經成功複製到本地的WAL;
  4. 對於Follower的複製請求,其實相當於一個特殊Client的寫入,所以也有一個對應的Append Index;
  • 只有被Ack過的Index,才標示為已經處理完成,對於Leader來說,小於最小的Ack Index的WAL數據是可以被刪除;
  • 在這個過程中,如果Server2或者Server3中有一台出問題,這時對應的Consume Index不會移動,只有等到相應服務恢復之後,繼續處理;
  • 在整個過程中可能出現如下情況的可能;
    1. Leader Replica Index > Follower Append Index,這時需要根據Follower Append Index重置Leader Replica Index,可能存在2種情況,具體情況在複製順序性中描述;
    2. Leader Replica Index < Follower Append Index,也同樣存在2種情況,具體情況在複製順序性中描述;

    假如此時Server1掛了,從Server2和Server3中選出新的Leader,如此時選為Server2為Leader。

    • Server2就會開啟2-wal複製通道,向server1和server3複製,由於當前server1掛了,所以暫時只往Server3複製,此時數據的寫入通道為2-wal。
    • Server1啟動恢復後,Server2會開啟向Server1的2-wal複製通道,同時server1會將1-wal中剩餘的還未向Server2和Server3複製的數據複製給他們。

    對於異常情況,WAL中的數據不能正常由於ACK之後刪除,導致WAL佔用過多磁碟,所以對WAL需要有一個SIZE和TTL的清理過程,一旦因為WAL因為SIZE和TTL清理之後,會導致幾個Index錯亂,具體錯亂情況如上所述。

    多通道複製協議帶來的問題:

    • 每個通道都有對應的index序列,保存每個通道的last index。而單通道複製只需要保存1個last index即可。這個代價其實還好。

    本地寫入

    背景

    • 做到Shard級別的寫入隔離,即每個Shard都會有獨立的線程來負責寫入,不會因為某個資料庫或者某個Shard寫入量具增而導致別的資料庫的寫入, 但可能會因為單機承載的Shard數過多,導致線程數過多,如果遇到這種情況,應該通過擴機器來解決,或者在新建資料庫的時候,合理分配Shard數。
    • 由於是單線程的寫操作,所以在很多情況下,不需要考慮多線程寫帶來的鎖競爭問題。

    數據存儲結構

    說明,以單個資料庫在單節點上的數據結構如例:

    • 一個資料庫在單節點上會存在多個Shard,所有Shard共享一個索引數據;
    • 所有的數據根據資料庫的Interval來計算按時間片來存儲具體的數據包括數據文件和索引文件。
    1. 這樣的設計主要為了方便處理TTL,數據如果過期,直接刪除相應的目錄就可以;
    2. 每個shard下面會存在segment,segment根據interval來存儲相應時間片的數據;
    3. 為什麼每個segment下面又按interval存儲很多個data family?這個主要由於LinDB主要解決的問題是存儲海量的監控數據,一般的監控數據基本是最新時間寫入, 基本不會寫歷史數據,而整個LinDB的數據存儲類似LSM方式,所以為了減少數據文件之間的合併操作,導致寫放大,所以最終衡量下來,再對segment時間片進行分片。

    下面以interval為10s為例說明:

    1. segment按天來存儲;
    2. 每個segment按小時來分data family,每個小時一個family,每個family中的文件再按列存儲具體的數據。

    寫入流程

    說明:

    • 系統會為每一個Shard啟一個寫線程,該線程負責這個Shard的所有寫操作。
    • 首先把measurement, tags, fields對應的數據寫入資料庫的索引文件,並生成相應的measurement id, time series id及field id,主要完成string->int的轉換。 這樣的好處是所有的數據存儲都以數據類型來存儲,從而可以減少整個存儲大小,因為對於每個數據點,measurement/tags/field這樣元數據佔用,如cpu{host=1.1.1.1} load=1 1514214168614, 其實轉換成id之後,cpu => 1(measurement id), host=1.1.1.1 => 1(time series id), load => 1(field id),所以最終的數據存儲為1 1 1514214168614=>1,這個考慮OpenTSDB的設計。
    • 如果寫索引失敗,認為本次寫入失敗,失敗分為2種,一種是數據寫入格式有問題,這類失敗直接標示失敗,另外一種由於內部問題,這時寫入失敗需要重試。
    • 使用根據索引得到的ID,再結合寫入時間和資料庫Interval計算得到需要寫入到哪個segment下的哪個family,寫family的過程,直接寫內存以達到高吞吐量的要求, 內存數據到達內存限制之後,會觸發Flush操作。
    • 整個寫過程先寫內存,再由Flusher線程把內存中的數據dump到相應的文件中,這樣就做到了對一批數據順序寫入,同時對於最近的數據根據Field Type進行Rollup操作,從而進一步減少磁碟IO操作。

    查詢引擎

    LinDB查詢需要解決如下問題:

    1. 解決多個機房之間的查詢;
    2. 高效的流式查詢計算;

    說明:

    • 由於需要支持多機房或者多集群的查詢,所以引入了LinProxy,LinProxy主要負責面向用戶的查詢請求;
    1. SQL Plan負責具體SQL的解析,生成最終的執行計劃及需要計算的中間結果的函數;
    2. 通過Zookeeper中的Metadata,把請求路由給具體的LinDB集群中對應的服務;
    3. 每個LinConnect負責與一個LinDB集群之間的通信,每個LinConnect內部保存了一份對應集群的Metadata,該Metadata信息在每個Metadata變更的時候有Server端推送給LinConnect, 這樣LinConnect基本做到近實時的更新Metadata;
    4. Aggregator Stream主要負責把各個LinConnect的中間結果進行最終的合併計算操作;
    5. 整個LinProxy處理過程都是非同步化,這樣可以利用線程在IO等待的時候可以做計算;
  • 每個Node接收LinConnect過來的請求,在內部查詢計算成中間結果返回給LinConnect,詳細的過程後面要介紹;
  • Node查詢

    說明:

    • 如果所示,Client過來的一個查詢請求,會產生很多小的查詢任務,每個任務所承擔的職責很單一,只做它所自己的任務,然後把結果給下一個任務, 所以需要所有的查詢計算任務都是異常無阻塞處理,IO/CPU任務分離;
    • 整個服務端查詢使用Actor模式來簡化整個Pipeline的處理;
    • 任何一個任務執行完成,如果沒有結果產生,則不會生產下游的任務,所有下游的任務都是根據上游任務是否有結果來決定;
    • 最終把底層結果,通過Reduce Aggregate聚合成最終的結果;

    存儲結構

    倒排索引

    倒排索引,分兩部分,目前索引相關的數據還是存儲在RocksDB中。

    1. 根據Time Series的Measurement+Tags生成對應的唯一ID(類似luence裡面的doc id)。
    2. 根據Tags倒排索引,指向一個ID列表。TSID列表以BitMap的方式存儲,以方便查詢的時候通過BitMap操作來過濾出想要的數據。BitMap使用RoaringBitMap。
    3. 每一類數據都存儲在獨立的RocksDB Family中。

    內存結構

    1. 為了提高寫入性能,把當前一段時間的數據寫入到內存中,內存到達一定限制或者時間後把內存中的數據Dump到文件中。
    2. 內存存儲分為當前可寫和不可寫,當前可寫用於接入正常的數據寫入,不可寫用入Dump到文件中,如果Dump成功,則清空不可寫部分。
    3. 如果可寫部分也到在寫入限制,但不可寫部分還沒有完成Dump,這時寫入會被Block住,直到有可用的內存供數據寫入,目的是為了不會因為佔用過多內存而導致OOM。
    4. MemoryTable內部通過一個Map來存儲Measurement ID->Measurement Store關係,即每個Measurement都存儲在一個獨立的Store中。
    5. 在Measurement Store內存儲對應Measurement下面每個TSID的數據,每個TSID對應的數據用一個Memory Block來存儲,每個Memory Block按TSID的順序存儲在Array List中,把TSID存儲在一個BitMap中,通過TSID在Bitmap中位置來定位Memory Block在Array List中的具體位置,這裡說明一下為什麼不直接使用Map來存儲,因為整個系統是用Java實現的,Java中的Map結構,不適合存儲小對象的數據,存在內存放多倍的存儲。
    6. 由於每個TSID都會對應一個時間線,每個時間線可能會存在多個數據點的情況,如count時只有一個count值,timer時會有count/sum/min/max等多個值。每個數據類型以Chunk的方式存儲。Chunk內部又以堆內和堆外2部分內存來存儲,最近一段時間的數據放在堆內,歷史數據壓縮之後放在堆外,在內存中盡量多放一些最近的數據,因為LinDB的目的主要是存儲一些監控類的數據,而監控類的數據主要關心最近一段時間的數據。

    文件存儲結構

    文件存儲跟內存存儲類似,同一個Measurement的數據以Block的方式存儲在一起,查詢時通過Measurement ID定位到該Measurement的數據存儲在哪個Block中。

    1. Measurement Block後存儲一個Offset Block,即存儲每個Measurement Block所在的Offset,每個Offset以4 bytes存儲。
    2. Offset Block存儲一個Measurement Index Block,按順序存儲每個Measurement ID,以Bitmap的方式存儲。
    3. 文件的尾存儲一個Footer Block,主要存儲Version(2 bytes) + Measurement Index Offset(4 bytes) + Measurement Index Length(4 bytes)。
    4. Data數據塊都是數值,所以使用xor壓縮,參考facebook的gorilla論文;

    Measurement Block:

    • 每個Measurement Block類似Measurement的方式存儲,只是把Measurement ID換成Measurement內的TSID。
    • TS Entry存儲該TSID對應每一列的數據,一列數據對應存儲一段時間的數據點。

    查詢邏輯:

    • DataFile在第一次載入的時候會把Measurement Index放在內存中,查詢輸入Measurement ID通過Measurement Index中的第幾個位置,然後通過這個位置N,在Offset Block查詢具體的Measurement Block的Offset,由於每個Offset都是4 bytes,所以offset position = (N-1) * 4,再讀取4 bytes得到真正的Offset。
    • 同樣的道理可以通過TSID,找到具體的TS Entry,再根據條件過濾具體的列數據,最終得到需要讀取的數據。

    總結

    LinDB從2年前正式慢慢服務於公司的監控系統,從1.0到2.0,已經穩定運行2年多,除了一次rocksdb的問題,幾乎沒出過什麼問題,到現在的3.0性能的大幅提升,我們基本都是站在業界一些成熟方案的基礎上,慢慢演進而來。

    也有人問,LinDB為什麼這麼快,其實我們是參考了很多TSDB的作法,然後取其好的設計,再結果時序的特徵再做一些優化。

    1. 時序一般都是最新寫入,但也是一種隨機寫,我們會先成內存中把隨寫變成循序寫,最終到寫文件都是順序寫,所有數據都是有序,這樣查詢的時候也是順序讀,這一點很關鍵;
    2. 把寫入的measurement/tags/fields都轉化成Int,再生成倒排索引,最終生成一個TSID(類似Luence的doc id),這樣就大大減少了最終的數據量,畢竟指標這樣字元串是占絕對的大頭, 這點很想OpenTSDB,雖然InfluxDB已經把一段時間的按Block來存儲,但還是在block的頭放這些數據,這些都是成本,特別是在compact的時候;
    3. 不像別的TSDB會把timestamp直接存下來,一般timestamp到毫秒基別佔8個節點,雖然根據時間有序的優勢再用delta-encoded,壓縮也是很好,但我們想做到極致,我們是用一個bit來表示時間, 具體的做法就是根據上面的描述,把時間的高位和存儲Interval,把高位的時間放在目錄上,再結合高位算一個delta,把delta以1bit的格式存儲,來表示有沒有數據,因為監控數據絕大部分都是連續的數據, 所以這樣做也是合理的,因此在時間這個數據上的存儲也大大減少空間;
    4. 我們發現對一個指標的多個Field的數據,每個Field的數據相鄰的一些點基本是很相近的,LinDB 2.0存儲直接是用RocksDB,多個Field放在一起存儲,再把相鄰的點進行壓縮,這樣其實壓縮率不會很高, 而且每取查詢取Field的時候都要把所有的數據都讀出來,這也是LinDB 3.0我們考慮自己實現列式存儲,相同列存在一塊,以提高壓縮率,查詢的時候只讀需要的數據。 整個壓縮我們也沒有用gzip/snappy/zlib,因為這些不大適合用於數值類型,我們是直接參考了facebook的gorilla論文的xor的方式來的,這個現在已經被很多TSDB採用;
    5. 基於上面這些基本的順序讀已經不成問題,基於TSID查詢的更不是問題,因為整個設計都是基於TSID->data來設計的,所以還要解決一個根據倒排查出一組TSID對數據的隨機讀,如上我們是把TSID放在Bitmap, 然後通過Bitmap計算出Offset,直接找到數據,通過存儲時的優化,做到TSID查詢精準查找,然不是通過二分查找;
    6. 還有一點就是LinDB在新建資料庫時指定完Interval之後,系統會自己Rollup,不像InfluxDB要寫很多Continue Query,LinDB所有的這一切都是自動化的;
    7. 查詢計算並行流式處理;

    所以用一句話來總結的話就是一個高效的索引外加一堆數值,然後怎麼玩好這堆數值。

    自身監控

    LinDB也自帶了自身的一些監控功能

    Overview

    Dashboard

    未來的展望

    1. 豐富查詢函數;
    2. 優化內存使用率;
    3. 自身監控的提升;
    4. 如果有可能,計劃開源;

    對比測試

    下面是與InfluxDB和LinDB2.0的一些查詢性能對比。 由於InfluxDB集群化要商業版,所以都是單機默認配置下,無Cache的測試。 伺服器配置阿里雲機器:8 Core 16G Memory

    大維度

    Tags: host(40000),disk(4),partition(20),模擬伺服器磁碟的監控,總的Series數為320W,每個Series寫一個數據點

    小維度的1天內的聚合測試

    Tags: host(400),disk(2),partition(10),模擬伺服器磁碟的監控,總的Series數為8K,每個Series寫一天的數據 每個維度每2s寫入1個點,每個維度一天內總共43200個點,所有維度總共43200 * 8000個點,共3 4560 0000即3億多數據

    小維度的7天內的聚合測試

    Tags: host(400),disk(2),partition(10),模擬伺服器磁碟的監控,總的Series數為8K,每個Series寫7天的數據 每個維度每5s寫入1個點,每個維度一天內總共17280個點,所有天數所有維度總共17280 8000 7 個點,即9 6768 0000,9億多個點 這個測試要說明一下,得利於LinDB自動的Rollup,如果InfluxDB開Continue Query的話相信應該也還好。

    引用

    • Time Series. Measurement+Tags 組成一個唯一組合,加上時間最終組全一個series。 ?
    • TSID. 每個time series(measurement + tags)都會生成一個唯一的ID。 ?
    • RoaringBitMap. roaringbitmap.org/ ?
    • Druid. druid.io/docs/0.12.0/de ?
    • InfluxDB TSM. docs.influxdata.com/inf ?
    • Gorilla: A Fast, Scalable, In-Memory Time Series Database. vldb.org/pvldb/vol8/p18 ?
    • beringei. github.com/facebookincu ?

    作者

    黃傑:2015年加入餓了么,現任框架工具部高級開發經理,主要負責餓了么的監控系統及監控系統周邊的工具。

    我們這裡還有坑位,歡迎有同樣愛好的夥伴加入。有興趣的同學請投簡歷到jie.huang@ele.me


    推薦閱讀:

    UCloud雲資料庫團隊誠招分散式資料庫研發
    PhxPaxos架構設計、實現分析

    TAG:分散式資料庫 |