延遲隊列
隊列常見的使用場景:非同步處理、系統解耦、數據同步、流量削峰,常見的種類有任務隊列、消息隊列、請求隊列。
現在考慮這樣一個問題:某某服務,用戶需要在48小時之內評分;否則會自動評價為5星。
預備知識:某某服務必有訂單,這些訂單會存儲到資料庫里。一個常見的做法是開啟定時任務,每隔一段時間去輪詢,發現訂單超過48小時而沒有評分,那麼會自動評分。
這種做法有一些問題:
1)輪詢間隔不好控制。如果太頻繁,對CPU不友好;如果不及,則時效性太差。
2)訂單可能有很多,全部查詢出來效率很低(即使通過分頁查詢優化,也需要一個循環)
經過分析,可知這是一個延遲消息的功能。開源的MQ都不支持延遲消息,所以需要我們自己去實現一個。
先提供兩種思路:
1)java.util.concurrent.DelayQueue
缺點是這個隊列是基於內存的,容量有限,而且重啟之後會丟失消息;
2)redis 有序集合
zadd key 1513674550287 member,其中score是時間戳。有序集合按照score逆序排序。這個需要一個線程去輪詢,但是成本很低,因為只需要查詢集合第一個元素即可,況且redis 響應神速。
現在介紹的第三種方案:環形的任務隊列,由數組實現,數組中元素是Set<Task>,數組長度是3600。
Task結構中有兩個核心屬性:
- Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務
- Task-Function:需要執行的任務指針
啟動一個Timer,每個一秒鐘在移動一個slot,那轉一圈正好需要一個小時。
如圖,當前Current Index指向第一格,當有延時消息到達之後,例如希望3610秒之後,觸發一個延時消息任務,只需:
- 計算這個Task應該放在哪一個slot,現在指向1,3610秒之後,應該是第11格,所以這個Task應該放在第11個slot的Set<Task>中
- 計算這個Task的Cycle-Num,由於環形隊列是3600格,這個任務是3610秒後執行,所以應該繞3610/3600=1圈之後再執行,於是Cycle-Num=1
Current Index不停的移動,每秒移動到一個新slot,遍歷slot中對應的Set<Task>,每個Task看Cycle-Num是不是0:
- 如果不是0,說明還需要多移動幾圈,將Cycle-Num減1
- 如果是0,說明馬上要執行這個Task了,取出Task-Funciton執行(可以用單獨的線程來執行Task),並把這個Task從Set<Task>中刪除。
Netty中的工具類HashedWheelTimer的原理與這種環形的延遲隊列相似。
參考資料:1分鐘實現「延遲消息」功能
推薦閱讀:
※Spring 整合JMS 基於ActiveMQ 實現消息的發送接收
※Kafka基礎概念
※調用:RPC MQ
※消息隊列怎樣不丟消息?
※螞蟻消息中間件 (MsgBroker) 在 YGC 優化上的探索
TAG:消息隊列 |