標籤:

RabbitMQ ACK 機制的意義是什麼?

首先,ACK機制可以保證消費者如果拿了隊列的消息,處理出錯了,那麼隊列中還有這個消息,仍然可以給下個機子來跑。

但是,個人覺得一般處理消息出錯都是因為代碼邏輯或者出bug,即使 隊列中後來仍然保留該消息,然後再給某一個消費者消費,不還是報錯嗎?

Ps:當然,如果一個機子宕掉,消息還有,還可以給另外的機子用,這種情景下 ACK 是很有用的。但是個人覺得這種應該是少數情況吧。

另外,不知道能不能不要 ACK 機制,然後把出錯的消息存庫,方便以後查bug或者重新執行。 這種解決思路是參考 Quartz 定時任務調度,因為Quartz可以讓失敗的任務重新執行一次,或者不管,或者怎麼怎麼樣,但是 RabbitMQ 好像缺了這一點。

不知道大家一般都是怎麼用 RabbitMQ 的 ACK 的?


最近也在學 RabbitMQ,如果有什麼錯誤歡迎指正。

首先,ACK機制可以保證消費者如果拿了隊列的消息,處理出錯了,那麼隊列中還有這個消息,仍然可以給下個機子來跑。

首先你弄錯了 acknowledgment 的目的。acknowledgment 是 consumer 告訴 broker 當前消息是否成功 consume,至於 broker 如何處理 NACK,取決於 consumer 是否設置了 requeue:如果 requeue=False,那麼 NACK 後 broker 是會刪除消息的。看看 RabbitMQ 官方的解釋。Consumer 做一個 ACK,是為了告訴 Broker 這條消息已經被成功處理了(transaction committed)。只要沒收到 consumer 的 acknowledgment,broker 就會一直保存著這條消息(但不會 requeue,更不會分配給其他 consumer,直到當前 consumer 發生斷開連接之類的異常)。RabbitMQ 之所以是 guaranteed delivery,這是一個關鍵。換言之,你的 consumer 代碼必須能夠處理各種異常,確保只要收到一條消息,最終一定能夠執行一條 ACK / NACK(當然也沒人阻止你設置 no_ack=True,乾脆不用 acknowledgment 機制,這個視業務需求而定)。

處理消息時出錯了,一般不應該讓別的 consumer 再去處理這個消息,因為多半還是要出錯。

但是,個人覺得一般處理消息出錯都是因為代碼邏輯或者出bug,即使 隊列中後來仍然保留該消息,然後再給某一個消費者消費,不還是報錯嗎?

Ps:當然,如果一個機子宕掉,消息還有,還可以給另外的機子用,這種情景下 ACK 是很有用的。但是個人覺得這種應該是少數情況吧。

Consumer 出錯的原因遠不止是自身的 bug 這麼簡單:

如果我的 consumer 需要驗證消息內容,那麼不合法的消息會導致 consumer 出錯。

如果我的 consumer 需要將消息保存到資料庫,那麼資料庫伺服器掛了會導致 consumer 出錯。

如果我的 consumer 要根據消息內容發送一個 HTTP 請求,那麼 HTTP 服務掛了會導致 consumer 出錯。

……

對於第 1 個情況,讓別的 consumer 再處理一次不合法的消息毫無意義(反正還是要出錯)。

對於 2、3,我個人的方法是讓當前的 consumer 進行 N 次嘗試,全部失敗之後 NACK 這條消息。

另外,不知道能不能不要 ACK 機制,然後把出錯的消息存庫,方便以後查bug或者重新執行。

RabbitMQ 自帶 dead lettering 機制,基本上就是 天龍的回答(basic.nack 或者 basic.reject)。我沒有用過這個機制,而是手動在 consumer 裡面寫了一小段 publisher 的邏輯來轉發 dead letter 到另一個 queue,這樣我可以在消息的 header 里加一些有用的 log 信息,方便我 debug。


ack 機制可以用來告訴 mq 你的消費者程序已經完成這個消息的處理了。

「個人覺得一般處理消息出錯都是因為代碼邏輯或者出bug,即使 隊列中後來仍然保留該消息,然後再給某一個消費者消費,不還是報錯嗎?」

這個可以在創建 queue 的時候,設置dead letter 相關屬性。

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received "" + message + """);
try{
doWork(message);
System.out.println(" [x] Done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch(Exception e){
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("出現異常, reject,不重新入隊");
}

出現異常的時候,reject,且不重新入隊,即可進入死信隊列


1、消費者處理時由於程序本身bug或者宕機沒有ack,那麼rabbit會將該信息丟給另外的消費者進行處理。並且rabbit不會再給該消費者丟消息。

2、如果是消費者認為該消息本身有問題而不需要處理。可以使用basic.reject命令。(2.0.0更高版本支持。)reject命令里requeue設置成true的話,rabbit會將該消息丟給其他消費者,設置成false的話則不會再發送。

3、在訂閱隊列的時候可以設置auto-ack為true 這樣消費者收到消息後直接ack(並不代表一定消費成功)。

最後...在創建隊列或者消費者訂閱隊列的時候可以設置屬性no-ack。(不過先確認你需要這個機制嗎...)

最後的最後,將來的版本rabbitmq會支持「死信」(dead letter)隊列,就是存放2提到的requeue為false的消息。便於定位問題。


剛試了下,開兩個消費者,其中一個消費者在處理某條消息時中途宕機,伺服器無法得到該條消息ack信息.

在channel斷開連接後,rabbitmq伺服器會把還未ack的該條消息發給其它消費者,防止了數據丟失.

接收到id為165的消息,但是宕機了,該消息會由另一個消費者消費


最近也在研究,官網文檔看了一遍demo也都跑了一遍,但是我存在一個疑問就是,消費者一直未ack的話 是不是會分配給其他消費者,又或者是否會重試。


可以配置死信,將你失敗的消息放到死信隊列當中


一句話:告知rabbitmq代理,消費者的狀態。

rmq可通過ack回執知道消費者是斷掉channel,還是掛掉,還是吞吐量過大,延遲銷毀msg

可以看看《rabbitmq實戰》


推薦閱讀:

Rabbitmq 和 Celery 是怎樣工作的?

TAG:RabbitMQ |