rocketmq怎麼保證隊列完全順序消費?
再rocketmq的java client里,在MessageListenerOrderly模式里,是把數據取出放入線程池裡去消費,在一條數據並沒有消費結束的情況下(也就是沒有commit),會繼續從隊列里拿出數據放入線程池裡消費。就是是說隊列並沒有真正的按順序消費。
實際是這樣的。這和我最初的預想是不對應的,因為我的消息消費必須保證嚴格的順序。請問rocketmq原生的client是否支持我要求的模式。也就是拿出一條隊列數據,鎖住隊列,消費完消息再解鎖。不然我就得自己碼了,嗚嗚...下面是我的測試代碼
實際上,RocketMQ是支持順序消費的。
但這個順序,不是全局順序,只是分區順序。要全局順序只能一個分區。
之所以出現你這個場景看起來不是順序的,是因為發送消息的時候,消息發送默認是會採用輪詢的方式發送到不通的queue(分區)。如圖:
而消費端消費的時候,是會分配到多個queue的,多個queue是同時拉取提交消費。
如圖:
但是同一條queue裡面,RocketMQ的確是能保證FIFO的。那麼要做到順序消息,應該怎麼實現呢——把消息確保投遞到同一條queue。
rocketmq消息生產端示例代碼如下:
按照這個示例,把訂單號取了做了一個取模運算再丟到selector中,selector保證同一個模的都會投遞到同一條queue。
即: 相同訂單號的---&>有相同的模---&>有相同的queue。
最後就會類似這樣:
這樣同一批你需要做到順序消費的肯定會投遞到同一個queue,同一個queue肯定會投遞到同一個消費實例,同一個消費實例肯定是順序拉取並順序提交線程池的,只要保證消費端順序消費,則大功告成!
如何保證順序消費? 如果是使用MessageListenerOrderly則自帶此實現,如果是使用MessageListenerConcurrently,則需要把線程池改為單線程模式。
(這裡假設觸發了重排導致queue分配給了別人也沒關係,由於queue的消息永遠是FIFO,最多只是已經消費的消息重複而已,queue內順序還是能保證)
但的確會有一些異常場景會導致亂序。如master宕機,導致寫入隊列的數量上出現變化。
如果還是沿用取模的seletor,就會一批訂單號的消息前面散列到q0,後面的可能散到q1,這樣就不能保證順序了。除非選擇犧牲failover特性,如master掛了無法發通接下來那批消息。
從消費端,如果想保證這批消息是M1消費完成再消費M2的話,可以使用MessageListenerOrderly介面,但是這樣的話會有以下問題:
1. 遇到消息失敗的消息,無法跳過,當前隊列消費暫停
2. 目前版本的RocketMQ的MessageListenerOrderly是不能從slave消費消息的。
更多分析請參考:
RocketMQ--角色與術語詳解 - 薛定諤的風口豬
RocketMQ--水平擴展及負載均衡詳解 - 薛定諤的風口豬
順序消費,跟重複消費,兩個問題常結伴出現。
分散式的MQ由於存在網路分區,生產ack、消費ack,要做到FIFO,exactly once的消費是不可能的。一般能保證FIFO,但需要客戶端參與才能不重複消費
消息去重的最佳實踐一般是消息body里寫入key,業務拿key做冪等處理
對於這個問題我認為是這樣的:
1.ConsumeMessageOrderlyService類的start()方法,如果是集群消費,則啟動定時任務,定時向broker發送批量鎖住當前正在消費的隊列集合的消息,具體是consumer端拿到正在消費的隊列集合,發送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合。consumer收到後,設置是否鎖住標誌位。 這裡注意2個變數:
consumer端的RebalanceImpl里的ConcurrentHashMap
processQueueTable,是否鎖住設置在ProcessQueue里。 broker端的RebalanceLockManager里的ConcurrentHashMap mqLockTable,這裡維護著全局隊列鎖。2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消費消息,這裡還有個 MessageQueueLock。messageQueueLock,維護當前consumer端的本地隊列鎖。保證當前只有一個線程能夠進行消費。
3.拉到消息存入ProcessQueue,然後判斷,本地是否獲得鎖,全局隊列是否被鎖住,然後從ProcessQueue里取出消息,用MessageListenerOrderly進行消費。 拉到消息後調用ProcessQueue.putMessage(final List msgs) 存入,具體是存入TreeMap msgTreeMap。 然後是調用ProcessQueue.takeMessags(final int batchSize)消費,具體是把msgTreeMap里消費過的消息,轉移到TreeMap
msgTreeMapTemp。4.本地消費的事務控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(掛起一會再消費),在此之前還有一個變數ConsumeOrderlyContext context的setAutoCommit()是否自動提交。 當SUSPEND_CURRENT_QUEUE_A_MOMENT時,autoCommit設置為true或者false沒有區別,本質跟消費相反,把消息從msgTreeMapTemp轉移回msgTreeMap,等待下次消費。
當SUCCESS時,autoCommit設置為true時比設置為false多做了2個動作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
commitOffset, false);
ProcessQueue.commit() :本質是刪除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消費時從msgTreeMap轉移過來的。this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本質是把拉消息的偏移量更新到本地內存中,然後定時更新到broker。那麼少了這2個動作會怎麼樣呢,隨著消息的消費進行,msgTreeMapTemp里的消息堆積越來越多,消費消息的偏移量一直沒有更新到broker導致consumer每次重新啟動後都要從頭開始重複消費。 就算更新了offset到broker,那麼msgTreeMapTemp里的消息堆積呢?不知道這算不算bug。 所以,還是把autoCommit設置為true吧。
理論上 該MQ 是不支持 完全順序消費的 !
用戶使用文檔中 指出過改情況, 文檔上指出了一種解決該問題的 方案。簡單概括就是 更具期隊列數量 比如 TopicTest 創建該 Topic的時候 會默認創建 4個隊列,如果你設置4個消費者來消費 那麼分配到的就是 4 - 4 如果你使用 1個消費者 那麼就是 1-4 。隊列中是可以設置是從頭還是從最後消費的。說明隊列中的 數據 是完全有序的。可以看出 解決辦法就是 Topic 的時間 就設置期最大隊列數 消費者的數量和隊列數相等。
文檔上推薦 在業務上解決該問題 如 新增 狀態彷彿 NewOrder Tag 標籤中。 修改 updateOrder Tag標籤中 。關於MQ 批量拉取模式下 線程池消費 可能會照成 順序消費混亂的問題
其他樓 說的 consumeThreadMax和consumeThreadMin 設置線程池最大線程 是可以解決。但是 消費性能就會下降啊! 如果按照4-4 默認情況下是 80個線程在處理 設置成最大線程=1 那就是4個線程了。
你的情況是想
拉取 消息就鎖定改消息 --》 消費消息--》解鎖消息鎖定--》同步消費狀態推薦你詳細看一下 push 消費-順序消費消息 的流程 和生命周期
RocketMQ 自己的整理和理解這是 我 整理 和 拷貝 大神 看看估計就能解惑了。這是因為你的客戶端沒有配置最大並行消費線程consumeThreadMax吧試試把consumeThreadMax和consumeThreadMin都配上
那其實就是單線程啊,著樓上都設置成1試下
推薦閱讀:
※阿里巴巴馬雲和facebook扎克伯格為什麼關係這麼好?
※如何看待蝦米音樂 macOS 客戶端在注釋中稱部分用戶「窮逼VIP」?
※阿里在線筆試為什麼推薦使用Chrome、Firefox?
※如何看待阿里巴巴副董事長提醒特朗普不要破壞中美關係?
※阿里巴巴為什麼要私有化銀泰商業,會帶來哪些影響?