基於TableStore構建簡易海量Topic消息隊列

前言

消息隊列,通常有兩種場景,一種是發布者訂閱模式,一種是生產者消費者模式。發布者訂閱模式,即發布者生產消息放入隊列,多個監聽的消費者都會收到同一份消息,也就是每個消費者收到的消息是一樣的。生產者消費者模式,生產者生產消息放入隊列,多個消費者同時監聽隊列,誰先搶到消息就會從隊列中取走消息,最終每個消息只會有一個消費者擁有。

在大數據時代,傳統的生產者消費者隊列模式中的Topic數目可能從少量的幾個變為海量topic。例如要實現一個全網爬蟲抓取任務調度系統,每個大型的門戶,SNS都會成為一個topic。在topic內部也會有海量的子網頁需要抓取。在實現這樣的一個任務分發調度系統時可能會遇到以下一些問題:

  1. 海量的topic,意味著我們可能會有海量的隊列。針對爬蟲場景,根據網頁類型,一類網站對應到一個任務隊列,不同的任務隊列會有自己的生產者和消費者。
  2. 生產者和消費者會有多個,在業務峰值期間,產生較大並發訪問,消息總量也是海量。針對爬蟲任務消息總量可能就是全網的網頁地址數量。
  3. 任務可能會有優先順序,為了實現優先順序高的任務優先調度,我們可能會在一個topic下再細分子隊列。
  4. 消息消費不能丟失,如果是作為任務的調度消息,我們的消息丟失失零容忍的。
  5. 消費者模式中如果消費者因為種種原因處理失敗或者超時,需要支持消息被重新調度。
  6. 在保證消息一定會被處理的前提下,我們也要避免少量消息因為各種原因處理堆積,而影響整個系統的吞吐。因為消息讀區往往是輕量級,消息的處理是資源密集型。我們不希望因為消息讀區堆積導致處理資源閑置。

解決方案

基於TableStore(表格存儲)的跨分區高並發,主鍵自增列這個特性又很好的適配到我們的隊列特性。支持海量,不同分區鍵下使用各自的自增主鍵,可以很好的實現海量隊列。具體我們給出如下方案:

需要設計以下表:

  1. 任務消息表
  2. 消息消費checkpoint表
  3. 全量消息表

在介紹表設計之前,先做一些名詞解釋。

  1. 每個任務消息,我們假設已有一個唯一的id。
  2. 任務優先順序,我們假設優先順序範圍是固定並且已經知道,如果任務優先順序過多,可以分層,例如優先順序1~100的映射到層級1。這裡如果我們的任務沒有優先順序,那可以根據任務數據量級做一個簡單的分桶,然後輪訓抓取每個分桶中的任務。
  3. 兩個游標,對應到每個topic的每個優先層級,我們需要記錄2個游標位移點。一個是抓取掃描游標,一個是完成游標。掃描游標的定義是指當前任務當前優先層級下,被掃描到的最大位移位置。完成位移點表示改任務當前優先層級下,最大的抓取完成位移點,之前的任務都已經完成抓取。

表設計

任務消息表

這裡,每一個子任務都會被插入這張表,任務可能由不同的爬蟲端抓取後產生子任務,在子任務產生的同時,任務的訪問地址,訪問優先順序已經被固定。我們根據一個分層演算法進行映射。所以主鍵前三列已經確定,插入TableStore(表格存儲)後,id會自增生成,用於後續消費者讀任務用。

消息消費checkpoint表

這張表用於消息消費的checkpoint。下面會結合schema具體說下checkpoint的內容。

這張表屬性列上會有兩列,一列用來表示抓取掃描位移點,一列記錄完成位移點。這裡checkpoint的記錄需要使用條件更新,即我們只會確保原來值小於待更新的值才會更新。

全量消息表

我們用全量消息表存放我們的消息id以及對應屬性,一個消息任務是否重複處理也通過這張表做判斷。

在全網信息表中,有一列屬性用來表示任務處理狀態,消費者在拿到任務id時需要條件更新這張表對應的這個key,對應行不存在可以直接插入。如果已經存在,需要先讀狀態為非結束狀態,版本為讀到版本情況下再做更新。更新成功者意味著當前id的任務被這個消費者搶佔。其中行不存在表示第一次爬取,如果存在非結束狀態,表示之前的任務可能已經失敗。

任務消費處理流程

下面我們用爬蟲抓取全網網頁做為例子來看下具體如何基於TableStore(表格存儲)做消息隊列並最終實現任務的分發:

這張圖展現了我們的整個爬蟲框架,爬蟲具體流程如下

  1. 不同的爬蟲端會根據自身爬取進度定時從TableStore的爬蟲任務表進行拉取爬蟲任務,這裡一般我們單線程GetRange訪問TableStore,我們認為這裡的任務讀區速率會遠大於抓取消費者的速度,從TableStore讀區到的任務數據進入爬蟲內存隊列,然後進行下一輪任務消息讀區。直到當前內存隊列滿後等待下輪喚醒繼續抓取,如果有特殊需求可以並發拉取不同優先順序。
  2. 初始對於每個任務的各個priority,他們的默認checkpoint都對應於TableStore的一個flag即Inf_Min,也就是第一行。
  3. GetRange拉取到當前任務各優先順序抓取任務後(例如我們可以設置從優先順序高到低,一次最多200條,抓夠200條進行一次任務搶佔),爬蟲會先根據具體優先順序排序,然後按照優先順序從高到低嘗試更新網頁信息表,進行爬取任務搶佔,搶佔成功後,該任務會被放進爬蟲的內存任務隊列給抓取線程使用。搶佔成功同時我們也會更新一下爬蟲任務表中的狀態,和當前的時間,表示任務最新的更新時間,後續的任務狀態檢驗線程會查看任務是否已經過期需要重新處理。注意這裡假如有一個爬蟲線程比較leg,是上一輪搶佔任務後卡了很久才嘗試更新這個時間,也沒有問題。這種小概率的leg可能會帶來重複抓取,但是不會影響數據的一致性。並且我們可以在內存中記錄下每一步的時間,如果我們發現每一步內存中的時間超時也可以結束當前任務,進一步減少小概率的重複抓取。
  4. 當一輪的任務全部填充後,我們會根據當前拿到的最大任務表id+1(即爬蟲任務表第三個主鍵,也就是自增主鍵)進行嘗試當前任務對應優先順序checkpoint表的更新(這裡更新頻率可以根據業務自由決定),更新的原則是新的id要大於等於當前id。如果更新成功後,可以使用當前更新值繼續拉取,如果更新失敗,意味著有另一個爬蟲已經取得更新的任務,需要重新讀一下checkpoint表獲得最新的checkpoint id值,從該id繼續拉取。
  5. 除了任務抓取線程以為,每個爬蟲端可以有一個頻率更低的任務進行任務完成掃描,這個任務用來最新的完成任務游標。掃描中getrange的最大值為當前拉取的起始位置,掃描的邏輯分以下幾種:
    1. 掃描到該行已經更新為完成,此時游標可以直接下移
    2. 掃描到任務還是initial狀態,一個任務沒有被任何人設置為running,切被拉去過,原因是這個任務是一個重複抓取的任務,此時可以去url表中檢查這個url是否存在,存在直接跳過。
    3. 掃描到任務是running,不超時認為任務還在執行,結束當輪掃描。如果檢查時間戳超時,檢查url表,如果內容已經存在,則有可能是更新狀態回任務表失敗,游標繼續下移。如果內容也不存在,一種簡單做法是直接在表對應優先順序中put一個新任務,唯一的問題是如果是並發檢查可能會產生重複的任務(重複任務通過url去重也可以解決)。另一種做法也是通過搶任務一樣更新url表,更新成功者可以新建任務下移坐標。其餘的人停止掃描,更新checkpoint為當前位置。更新成功者可以繼續下移掃描直至尾部或者任務正常進行位置,然後更新checkpoint。

6. 爬蟲抓取每個任務完成後,會更新全網url表中的狀態以及對應爬蟲任務表中的狀態,其中全網url的狀態用來給後續抓取任務去重使用,爬蟲任務表中的狀態給上面步驟5的完成游標掃描線程使用判斷一個任務是否已經完成。

整個寫入子任務和讀取我們可以抽象出下面這張圖

新任務會根據優先順序並發寫入不同的隊列,其中圖中編號就對應表格存儲中的自增列,用戶按照上面設計表結構的話,不需要自己處理並發寫入的編號,表格存儲服務端會保證唯一且自增,即新任務在對應隊列末尾。爬蟲讀取任務的游標就是圖中紅色,藍色對應完成的任務列表。兩個游標在響應優先順序下獨立維護。

下面我們舉個例子,如果一個爬蟲任務拉取線程假如設置一次拉2個任務為例,

我們的爬蟲任務表會從上面切換成下圖,task1 priority=3的掃描游標更新到了10011,priority=2的掃描游標更新到10006。也就意味著掃描優先順序3的下次會從10011開始,優先順序2的會從10006開始。

並發處理

  1. 多爬蟲拉取任務有重複,這部分我們通過條件更新大表決定了同一個網頁不會同時被抓取。
  2. 多爬蟲條件更新checkpoint表決定了我們整個拉取任務不會漏過當前拉到的一批任務,如果checkpoint更新如果條件失敗任務繼續進行,其他類型可重試錯誤會繼續重試(例如服務短時間不可用,leg等。)這裡只有可能導致其他爬蟲喚醒後拉到重複數據,但是抓取因為搶佔失敗也不會重複拉取,並且新喚醒的客戶端也會更新更大的游標,保證系統不會因為一個客戶端leg而任務掃描游標滯後。
  3. 任務判定完成邏輯我們可以做分散式互斥,同時只有一個進程在判斷。也可以在判斷任務失敗的時候進行條件更新原表,更新成功後再新插入一條新任務。

總結

最後我們再來看下整個設計中幾個關鍵的問題是否滿足

  1. 海量topic,TableStore(表格存儲)天然的以一個分區鍵做為一個隊列的能力使得我們可以很容易的實現海量的隊列,數量級可以在億級別甚至更多。
  2. 優先順序,優先順序對應一個主鍵列,依照優先順序進行分層優先順序高的會被優先getrange獲得。
  3. 系統吞吐,整個系統中兩個游標的設計,使得我們任務掃描游標每輪掃描後都會快速向下走,長尾任務不會阻礙對新任務的掃描。另一方面我們任務會在url大表上做搶佔,避免不必要的重複抓取。
  4. 子任務不丟失,自增列的保證了新任務會用更大的值即排在當前隊列末尾。另外有一個完成掃描線程,會確保新任務全部完成後才會更新,這個游標代表了最後整個任務是否完成。這個游標也保證了任務不會丟失。這個任務會對長尾的任務重新建一個任務並插入隊列,新任務會被新爬蟲端重新觸發,也避免了因為一個客戶端卡住而餓死的問題。

原文

更多技術乾貨敬請關注云棲社區知乎機構號:阿里云云棲社區 - 知乎


推薦閱讀:

CODING 代碼託管架構升級之路
雙十一絲般順滑體驗背後:阿里雲洛神網路虛擬化系統揭秘
走過第六個雙11,雙11阿里雲技術負責人楊旭說:大考亦從容
【逐雲】阿里巴巴通用計算平台負責人關濤:讓計算平台成為阿里的「水電煤」

TAG:架构 | 表格存储 | 数据库 |