標籤:

延遲隊列

隊列常見的使用場景:非同步處理、系統解耦、數據同步、流量削峰,常見的種類有任務隊列、消息隊列、請求隊列。

現在考慮這樣一個問題:某某服務,用戶需要在48小時之內評分;否則會自動評價為5星。

預備知識:某某服務必有訂單,這些訂單會存儲到資料庫里。

一個常見的做法是開啟定時任務,每隔一段時間去輪詢,發現訂單超過48小時而沒有評分,那麼會自動評分。

這種做法有一些問題:

1)輪詢間隔不好控制。如果太頻繁,對CPU不友好;如果不及,則時效性太差。

2)訂單可能有很多,全部查詢出來效率很低(即使通過分頁查詢優化,也需要一個循環)

經過分析,可知這是一個延遲消息的功能。開源的MQ都不支持延遲消息,所以需要我們自己去實現一個。

先提供兩種思路:

1)java.util.concurrent.DelayQueue

缺點是這個隊列是基於內存的,容量有限,而且重啟之後會丟失消息;

2)redis 有序集合

zadd key 1513674550287 member,其中score是時間戳。有序集合按照score逆序排序。這個需要一個線程去輪詢,但是成本很低,因為只需要查詢集合第一個元素即可,況且redis 響應神速。

現在介紹的第三種方案:環形的任務隊列,由數組實現,數組中元素是Set<Task>,數組長度是3600。

Task結構中有兩個核心屬性:

  1. Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務
  2. Task-Function:需要執行的任務指針

啟動一個Timer,每個一秒鐘在移動一個slot,那轉一圈正好需要一個小時。

如圖,當前Current Index指向第一格,當有延時消息到達之後,例如希望3610秒之後,觸發一個延時消息任務,只需:

  1. 計算這個Task應該放在哪一個slot,現在指向1,3610秒之後,應該是第11格,所以這個Task應該放在第11個slot的Set<Task>中
  2. 計算這個Task的Cycle-Num,由於環形隊列是3600格,這個任務是3610秒後執行,所以應該繞3610/3600=1圈之後再執行,於是Cycle-Num=1

Current Index不停的移動,每秒移動到一個新slot,遍歷slot中對應的Set<Task>,每個Task看Cycle-Num是不是0:

  1. 如果不是0,說明還需要多移動幾圈,將Cycle-Num減1
  2. 如果是0,說明馬上要執行這個Task了,取出Task-Funciton執行(可以用單獨的線程來執行Task),並把這個Task從Set<Task>中刪除。

Netty中的工具類HashedWheelTimer的原理與這種環形的延遲隊列相似。

參考資料:1分鐘實現「延遲消息」功能

推薦閱讀:

Spring 整合JMS 基於ActiveMQ 實現消息的發送接收
Kafka基礎概念
調用:RPC MQ
消息隊列怎樣不丟消息?
螞蟻消息中間件 (MsgBroker) 在 YGC 優化上的探索

TAG:消息隊列 |