【RabbitMQ學習記錄】- 消息隊列存儲機制源碼分析

【RabbitMQ學習記錄】- 消息隊列存儲機制源碼分析

來自專欄網易雲社區

本文來自網易雲社區。

RabbitMQ在金融系統,OpenStack內部組件通信和通信領域應用廣泛,它部署簡單,管理界面內容豐富使用十分方便。筆者最近在研究RabbitMQ部署運維和代碼架構,本篇文章主要記錄下RabbitMQ存儲機制相關內容和源碼分析。

一、RabbitMQ進程架構

Erlang是基於Actor模型的一門天然多進程、分散式和高並發的語言。一個Erlang虛擬機對應一個操作系統進程,一個Erlang進程調度器對應一個操作系統線程,一般來說,有多少個CPU核就有多少個調度器。

RabbitMQ是基於Erlang語言實現的一個分散式消息中間件。下圖是RabbitMQ基本的進程模型:

  • tcp_acceptor:負責接受客戶端連接,然後為客戶端連接創建rabbit_reader、rabbit_writer、rabbit_channel進程

  • rabbit_reader:負責解析客戶端AMQP幀,然後將請求發送給rabbit_channel進程

  • rabbit_writer:負責向客戶端返回數據

  • rabbit_channel:負責解析AMQP方法,以及對消息進行路由,然後發送給對應的隊列進程。

  • rabbit_amqqueue_process:rabbit隊列進程,該進程一般在rabbitmq創建隊列時被創建,其主要負責消息的接收/投遞邏輯

  • rabbit_msg_store:存儲伺服器進程,主要負責消息的持久化存儲

上述進程中,tcp_acceptor和rabbit_msg_store只會有一個,rabbit_amqqueue_process進程的數量則和隊列數量保持一致,每個客戶端連接對應一個rabbit_reader和rabbit_writer進程,每一個連接的通道對應一個rabbit_channel進程。

通常來說,客戶端發起一條連接的同時,可以打開多條channel,相對連接的open/close來說,對channel進行open和close的操作開銷會更小。最佳實踐是一個生產者/消費者進程對應一個connection,具體發送一個線程對應一個channel即可。

二、消息存在哪裡

RabbitMQ的消息持久化實際包括兩部分:隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。rabbit_queue_index負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已經被交付給消費者、是否已被消費者ack等,每個隊列都有一個與之對應的rabbit_queue_index。

rabbit_msg_store以鍵值對的形式存儲消息,每個節點有且只有一個,所有隊列共享。從技術層面講rabbit_msg_store又可以分為msg_store_persistent和msg_store_transient,其中msg_store_persistent負責持久化消息的存儲,不會丟失,而msg_store_transient負責非持久化消息的存儲,重啟後消息會丟失。

通過配置環境變數RABBITMQ_MNESIA_BASE可以指定存儲目錄,一般配置RABBITMQ_MNESIA_BASE=/srv/rabbitmq。

$ cd /srv/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1$ ls -ldrwxr-xr-x 2 nqs nqs 12288 Jun 1 14:43 msg_store_persistentdrwxr-xr-x 2 nqs nqs 4096 Jul 25 2016 msg_store_transientdrwxr-xr-x 4 nqs nqs 4096 Jul 27 2016 queues...

其中msg_store_transient、queues和msg_store_persistent就是實際消息的存儲目錄。

2.1 rabbit_msg_store存儲

RabbitMQ通過配置queue_index_embed_msgs_below可以指定根據消息存儲位置,默認queue_index_embed_msgs_below是4096位元組(包含消息體、屬性及headers),小於該值的消息存在rabbit_queue_index中。

$ ls msg*msg_store_persistent:82680.rdq 97666.rdqmsg_store_transient:0.rdq

經過rabbit_msg_store處理的消息都會以追加的方式寫入到文件中,文件名從0開始累加,後綴是.rdq,當一個文件的大小超過指定的限制(file_size_limit)後,關閉這個文件再創建一個新的文件存儲。 消息以以下格式存在於文件中:

<<Size:64, MsgId:16/binary, MsgBody>>

MsgId為RabbitMQ通過rabbit_guid:gen()每一個消息生成的GUID,MsgBody會包含消息對應的exchange,routing_keys,消息的內容,消息對應的協議版本,消息內容格式。

在進行消息存儲時,RabbitMQ會在ETS表中記錄消息在文件中的位置映射和文件的相關信息。讀取消息的時候先根據消息的msg_id找到對應的文件,如果文件存在且未被鎖住則直接打開文件,如果文件不存在或者鎖住了則發請求到rabbit_msg_store處理。

2.2 索引文件

查看索引信息

$ cd queues/DMX3PGVA4ZG3HHCXA0ULNIM6P$ ls 70083.idx 70084.idx 88155.idx journal.jif

rabbit_queue_index順序存儲段文件,文件編號從0開始,後綴.idx,且每個段文件包含固定的SEGMENT_ENTRY_COUNT條記錄。SEGMENT_ENTRY_COUNT默認是16384,每個rabbit_queue_index從磁碟讀取消息的時候至少讀取一個段文件。

2.3 過期消息刪除

消息的刪除只是從ETS表刪除執行消息的相關信息,同時更新對應的存儲文件的相關信息,並不立即對文件中的消息進程刪除,後續會有專門的垃圾回收進程負責合併待回收消息文件。

當所有文件中的垃圾消息(已經被刪除的消息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發文件合併操作(至少有三個文件存在的情況下),以提高磁碟利用率。

publish消息時寫入內容,ack消息時刪除內容(更新該文件的有用數據大小),當一個文件的有用數據等於0時,刪除該文件。

三、消息存儲過程源碼分析

消息流轉示意圖:

rabbit_channel進程確定了消息將要投遞的目標隊列,rabbit_amqqueue_process是隊列進程,每個隊列都有一個對應的進程,實際上rabbit_amqqueue_process進程只是提供了邏輯上對隊列的相關操作,他的真正操作是通過調用指定的backing_queue模塊提供的相關介面實現的,默認情況該backing_queue的實現模塊為rabbit_variable_queue。 RabbitMQ隊列中的消息隨著系統的負載會不斷的變化,一個消息可能會處於以下4種狀態:

%% Definitions:%%%% alpha: this is a message where both the message itself, and its%% position within the queue are held in RAM(消息本身和消息位置索引都只在內存中)%%%% beta: this is a message where the message itself is only held on%% disk (if persisted to the message store) but its position%% within the queue is held in RAM.(消息本身存儲在磁碟中,但是消息的位置索引存在內存中)%%%% gamma: this is a message where the message itself is only held on%% disk, but its position is both in RAM and on disk.(消息本身存儲在磁碟中,但是消息的位置索引存在內存中和磁碟中)%%%% delta: this is a collection of messages, represented by a single%% term, where the messages and their position are only held on%% disk.(消息本身和消息的位置索引都值存儲在磁碟中)

對於普通的沒有設置優先順序和鏡像的隊列來說,backing_queue的默認實現是rabbit_variable_queue,其內部通過5個子隊列Q1、Q2、Delta、Q3和Q4來實現這4個狀態的轉換,其關係如下圖所示:

其中Q1、Q4隻包含alpha狀態的消息,Q2和Q3包含Beta和gamma狀態的消息,Delta只包含delta狀態的消息。具體消息的狀態轉換後續會進行源碼分析。

3.1 消息入隊分析

rabbit_amqqueue_process對消息的主要處理邏輯位於deliver_or_enqueue函數,該方法將消息直接傳遞給消費者,或者將消息存儲到隊列當中。

整體處理邏輯如下:

  1. 首先處理消息的mandory標誌,和confirm屬性。mandatory標誌告訴伺服器至少將該消息route到一個隊列中,否則將消息返還給生產者。confirm則是消息的發布確認。
  2. 然後判斷隊列中是否有消費者正在等待,如果有則直接調用backing_queue的介面給客戶端發送消息。
  3. 如果隊列上沒有消費者,根據當前相關設置判斷消息是否需要丟棄,不需要丟棄的情況下調用backing_queue的介面將消息入隊。

deliver_or_enqueue函數代碼:

deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> %% 如果當前消息mandatory欄位為true,則立刻通知該消息對應的rabbit_channel進程 send_mandatory(Delivery), %% must do this before confirms %% 消息隊列記錄要confirm的消息,如果confirm為false,則不記錄要confirm(如果消息需要進行confirm,則將該消息的信息存入msg_id_to_channel欄位中) {Confirm, State1} = send_or_record_confirm(Delivery, State), %% 得到消息特性特性數據結構 Props = message_properties(Message, Confirm, State1), %% 讓backing_queue去判斷當前消息是否重複(rabbit_variable_queue沒有實現,直接返回的false) {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State2 = State1#q{backing_queue_state = BQS1}, case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, State2) of true -> State2; %% 已經將消息發送給消費者的情況 {delivered, State3} -> State3; %% The next one is an optimisation %% 沒有消費者來取消息的情況(discard:拋棄) %% 當前消息沒有發送到對應的消費者,同時當前隊列中設置的消息過期時間為0,同時重新發送的exchange交換機為undefined,則立刻將該消息丟棄掉 {undelivered, State3 = #q{ttl = 0, dlx = undefined, backing_queue_state = BQS2, msg_id_to_channel = MTC}} -> %% 直接將消息丟棄掉,如果需要confirm的消息則立刻通知rabbit_channel進程進行confirm操作 {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; %% 沒有消費者來取消息的情況 {undelivered, State3 = #q{backing_queue_state = BQS2}} -> %% 將消息發布到backing_queue中 BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), %% 判斷當前隊列中的消息數量超過上限或者消息的占的空間大小超過上限 {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}), %% 得到當前隊列中的消息數量 QLen = BQ:len(BQS4), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an %% expiry ends up at the head of the queue. If the head %% remains unchanged, or if the newly published message %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped, QLen =:= 1, Props#message_properties.expiry} of %% 該情況是頭部沒有變化,同時消息隊列消息樹立不為一,則不管當前加入的消息是否設置有超時時間,都不執行drop_expired_msgs函數 {false, false, _} -> State4; %% 有丟棄消息,同時當前隊列中只有當前這個新的消息,同時消息自己的特性過期時間沒有定義,則不檢查消息過期 %% 此時消息的頭部有變化,但是消息隊列中只有一個消息,該消息還沒有設置超時時間,則不執行drop_expired_msgs函數 {true, true, undefined} -> State4; %% 當向隊列中插入消息後需要做檢查消息過期,同時設置定時器的操作只有三種情況 %% 1.當消息頭部根據隊列上限有變化,同時消息插入後當前隊列消息數量為一,且該消息設置有過期時間,則需要做一次操作(該情況是消息頭部有刪除消息,都會進行一次消息過期檢查) %% 2.當消息頭部根據隊列上限有變化,同時消息插入後當前隊列消息數量不為一,且該消息設置有過期時間,則需要做一次操作(該情況是消息頭部有刪除消息,都會進行一次消息過期檢查) %% 3.當消息頭部根據隊列上限沒有變化,同時消息插入後當前隊列消息數量為一,不管消息有沒有過期時間,都要做一次操作(該情況下是當前隊列進入第一條消息) %% 最重要的是只要消息隊列的頭部消息有變化,則立刻執行drop_expired_msgs函數,將隊列頭部超時的消息刪除掉 {_, _, _} -> drop_expired_msgs(State4) end end.

如果調用到該方法的BQ:publish則說明當前隊列沒有消費者正在等待,消息將進入到隊列。backing_queue實現了消息的存儲,他會儘力會durable=true的消息做持久化存儲。初始默認情況下,非持久化消息直接進入內存隊列,此時效率最高,當內存佔用逐漸達到一個閾值時,消息和消息索引逐漸往磁碟中移動,隨著消費者的不斷消費,內存佔用的減少,消息逐漸又從磁碟中被轉到內存隊列中。

消息在這些Queue中傳遞的"一般"過程q1->q2->delta->q3->q4,一般負載較輕的情況消息不需要走完每個Queue,大部分都可以跳過。rabbit_variable_queue中消息的入隊介面源碼如下:

%% 消息的發布介面publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, _Flow, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, in_counter = InCount, durable = IsDurable, unconfirmed = UC }) -> %% 只有持久化隊列和消息持久化才會對消息進行持久化 IsPersistent1 = IsDurable andalso IsPersistent, %% 組裝消息狀態(該數據結構是實際存儲在隊列中的數據) MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), %% 如果隊列和消息都是持久化類型,則將消息內容和消息在隊列中的索引寫入磁碟 {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), %% 將消息狀態數據結構存入內存(如果Q3隊列不為空,則將新消息存入Q1隊列,如果為空則將新消息存入Q4隊列) State2 = case ?QUEUE:is_empty(Q3) of %% 如果Q3隊列不為空,則將當前的消息寫入Q1隊列 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; %% 如果Q3隊列為空,則將當前的消息寫入Q4隊列 true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, %% 進入隊列中的消息數量加一 InCount1 = InCount + 1, %% 如果消息需要確認,將該消息加入unconfirmed欄位 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), %% 更新隊列進程中的狀態信息 State3 = stats({1, 0}, {none, MsgStatus1}, %% 更新下一個消息在消息中的位置 State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, unconfirmed = UC1 }), %% RabbitMQ系統中使用的內存過多,此操作是將內存中的隊列數據寫入到磁碟中 a(reduce_memory_use(maybe_update_rates(State3))).

消息入隊時先判斷Q3是否為空,如果Q3為空,則直接進入Q4,否則進入Q1,這裡思考下為什麼?

假如Q3為空,Delta一定為空,因為假如Delta不為空,那麼Q3取出最後一個消息的時候Delta已經把消息轉移到Q3了,這樣Q3就不是空了,前後矛盾因此Delta一定是空的。同理可以推測出Q2、Q1都是空的,直接把消息放入Q4即可。

消息入隊後,需要判斷內存使用,調用reduce_memory_use函數:

reduce_memory_use(State = #vqstate { ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, rates = #rates { in = AvgIngress, out = AvgEgress, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> State1 = #vqstate { q2 = Q2, q3 = Q3 } = %% 得到當前在內存中的數量超過允許在內存中的最大數量的個數 case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of 0 -> State; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress)) of %% ack操作進入的流量大於消息進入的流量,則優先將等待ack的消息寫入磁碟文件 true -> [ %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁碟文件) fun limit_ram_acks/2, %% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息) fun push_alphas_to_betas/2 ]; %% 消息進入的流量大於ack操作進入的消息流量,則優先將非等待ack的消息寫入磁碟文件 false -> [ %% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息) fun push_alphas_to_betas/2, %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁碟文件) fun limit_ram_acks/2 ] end, %% 真正執行轉化的函數 {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, Funs), State2 end, %% 當前beta類型的消息大於允許的beta消息的最大值,則將beta類型多餘的消息轉化為deltas類型的消息 case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of S2 when S2 >= ?IO_BATCH_SIZE -> %% 將S2個betas類型的消息轉化為deltas類型的消息 push_betas_to_deltas(S2, State1); _ -> State1 end.%% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息)push_alphas_to_betas(Quota, State) -> %% 將Q1隊列中消息轉化為betas類型的消息 %% 如果磁碟中沒有消息,則將Q1中的消息存儲到Q3隊列,如果磁碟中有消息則將Q3隊列中的消息存儲到Q2隊列(將Q1隊列頭部的元素放入到Q2或者Q3隊列的尾部) {Quota1, State1} = push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, %% 如果delta類型的消息的個數為0,則將該消息存入存入Q3隊列 State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; %% 如果delta類型的消息個數不為0,則將該消息存入Q2隊列 (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, State #vqstate.q1, State), %% 將Q4隊列中消息轉化為betas類型的消息(Q4 -> Q3)(將Q4隊列尾部的元素不斷的放入到Q3隊列的頭部) {Quota2, State2} = push_alphas_to_betas( fun ?QUEUE:out_r/1, fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}.%% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁碟文件)limit_ram_acks(0, State) -> {0, State};limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> {Quota, State}; false -> %% 拿到隊列索引最大的消息 {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), %% 內存不足,強制性的將等待ack的SeqId消息內容寫入磁碟 {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), %% 如果成功的將消息寫入磁碟,則將內存中的消息體欄位清空 MsgStatus2 = m(trim_msg_status(MsgStatus1)), %% 更新存儲在磁碟中等待ack的消息欄位disk_pending_ack,將剛才從存儲在內存中等待ack的消息欄位ram_pending_ack中的SeqId存儲到disk_pending_ack欄位中 DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), %% 更新隊列狀態,同時更新最新的ram_pending_ack和disk_pending_ack欄位 limit_ram_acks(Quota - 1, %% 主要是更新內存中保存的消息大小(ram_bytes減去當前寫入磁碟的消息的大小) stats({0, 0}, {MsgStatus, MsgStatus2}, State1 #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1 })) end.

每次入隊消息後,判斷RabbitMQ系統中使用的內存是否過多,此操作是嘗試將內存中的隊列數據寫入到磁碟中.

內存中的消息數量(RamMsgCount)及內存中的等待ack的消息數量(RamAckIndex)的和大於允許的內存消息數量(TargetRamCount)時,多餘數量的消息內容會被寫到磁碟中.

3.2 消息出隊源碼分析

獲取消息:

  1. 嘗試從q4隊列中獲取一個消息,如果成功,則返回獲取到的消息,如果失敗,則嘗試通過試用fetch_from_q3/1從q3隊列獲取消息,成功則返回,如果為空則返回空;
  2. 注意fetch_from_q3從Q3獲取消息,如果Q3為空,則說明整個隊列都是空的,無消息,消費者等待即可。

取出消息後:

  1. 如果Q4不為空,取出消息後直接返回;
  2. 如果Q4為空,Q3不為空,從Q3取出消息後,判斷Q3是否為空,如果Q3為空,Delta不為空,則將Delta中的消息轉移到Q3中,下次直接從Q3消費;
  3. 如果Q3和Delta都是空的,則可以任務Delta和Q2的消息都是空的,此時將Q1的消息轉移到Q4,下次直接從Q4消費即可。

%% 從隊列中獲取消息queue_out(State = #vqstate { q4 = Q4 }) -> %% 首先嘗試從Q4隊列中取得元素(Q4隊列中的消息類型為alpha) case ?QUEUE:out(Q4) of {empty, _Q4} -> %% 如果Q4隊列為空則從Q3隊列中取得元素(如果Q3也為空,則直接返回空) case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end.%% 從隊列Q3中讀取消息fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4 }) -> %% 先從Q3隊列中取元素(如果為空,則直接返回為空) case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, MsgStatus}, Q3a} -> State1 = State #vqstate { q3 = Q3a }, State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% 當這兩個隊列都為空時,可以確認q2也為空,也就是這時候,q2,q3,delta,q4都為空,那麼,q1隊列的消息可以直接轉移到q4,下次獲取消息時就可以直接從q4獲取 %% q3 is now empty, it wasnt before; %% delta is still empty. So q2 must be %% empty, and we know q4 is empty %% otherwise we wouldnt be loading from %% q3. As such, we can just set q4 to Q1. %% 當Q3隊列為空,且磁碟中的消息數量為空,則斷言Q2隊列為空 true = ?QUEUE:is_empty(Q2), %% ASSERTION %% 當Q3隊列為空,且磁碟中的消息數量為空,則斷言Q4隊列為空 true = ?QUEUE:is_empty(Q4), %% ASSERTION %% 從Q3隊列中取走消息後發現Q3隊列為空,同時磁碟中沒有消息,則將Q1隊列中的消息放入Q4隊列 State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; {true, false} -> %% 從Q3隊列中取走消息後發現Q3隊列為空,q3空,delta非空,這時候就需要從delta隊列(內容與索引都在磁碟上,通過maybe_deltas_to_betas/1調用)讀取消息,並轉移到q3隊列 maybe_deltas_to_betas(State1); {false, _} -> %% q3非空,直接返回,下次獲取消息還可以從q3獲取 %% q3 still isnt empty, weve not %% touched delta, so the invariants %% between q1, q2, delta and q3 are %% maintained State1 end, {loaded, {MsgStatus, State2}} end.

轉移Delta消息到Q3源碼分析:

%% 從磁碟中讀取隊列數據到內存中來(從隊列消息中最小索引ID讀取出一個索引磁碟文件大小的消息索引信息)%% 從隊列索引的磁碟文件將單個磁碟文件中的消息索引讀取出來%% 該操作是將單個隊列索引磁碟文件中的deltas類型消息轉換為beta類型的消息maybe_deltas_to_betas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA, disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, %% 根據delta中的開始DeltaSeqId得到存在索引磁碟的最小的磁碟索引號 DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), %% 從隊列索引中讀取消息索引(從隊列索引的磁碟文件將單個磁碟文件中的消息索引讀取出來) {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), %% 過濾掉從rabbit_queue_index中讀取過來的消息隊列索引(如果該消息不是持久化的則需要刪除掉),最後得到當前內存中準備好的消息個數以及內存中的消息的總的大小 {Q3a, RamCountsInc, RamBytesInc, IndexState2} = %% RabbitMQ系統關閉以前非持久化消息存儲到磁碟中的索引信息再從磁碟讀取出來的時候必須將他們徹底從RabbitMQ系統中刪除 betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState1), %% 更新隊列消息索引結構,內存中隊列中的消息個數,隊列內存中消息占的大小,以及從磁碟文件讀取的次數 State1 = State #vqstate { index_state = IndexState2, ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, disk_read_count = DiskReadCount + RamCountsInc}, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold %% 如果讀取的當前消息隊列索引磁碟文件中的操作項為空,則繼續讀下一個消息索引磁碟文件中的操作項 maybe_deltas_to_betas( State1 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> %% 將從索引中讀取出來的消息索引存儲到Q3隊列(將新從磁碟中讀取的消息隊列添加到老的Q3隊列的後面) Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% 如果讀取出來的長度和隊列索引的總長度相等,則delta信息被重置為消息個數為0,同時q2中的消息轉移到q3隊列 %% delta is now empty, but it wasnt before, so %% can now join q2 onto q3 State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, %% 如果磁碟中已經沒有消息,則將Q2隊列中的消息放入Q3隊列 q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> %% 得到最新的隊列消息磁碟中的信息 Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), %% 更新最新的q3隊列和磁碟信息結構 State1 #vqstate { delta = Delta1, q3 = Q3b } end end.

問題1:為什麼Q4,Q3空,隊列就為空?

消費Q3最後一條消息的時候,會調用函數maybe_deltas_to_betas,將磁碟上Delta狀態的消息轉移到Q3,現在Q3是空的,那麼Delta狀態的消息一定是空的,否則消息會轉移到Q3;

Delta消息是空的,上述代碼中:

State1 #vqstate { q2 = ?QUEUE:new(),delta = ?BLANK_DELTA,%% 如果磁碟中已經沒有消息,則將Q2隊列中的消息放入Q3隊列q3 = ?QUEUE:join(Q3b, Q2) };

會將Q2隊列的消息轉移到Q3,現在Q3是空的,那麼Q2中消息肯定是空的;

現在Q2、Q3、Delta和Q4都是空的,看代碼:

State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> true = ?QUEUE:is_empty(Q2), true = ?QUEUE:is_empty(Q4), %% 從Q3隊列中取走消息後發現Q3隊列為空,同時磁碟中沒有消息,則將Q1隊列中的消息放入Q4隊列 State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };

會將Q1消息轉移到Q4,現在Q4是空的,Q1肯定沒有消息了。

綜上所述,Q3和Q4都是空的,那該隊列無消息!

問題2:為什麼q4,q3,delta為空的時候,q2必空?

在問題1中已經分析了,Delta消息為空的時候會將Q2放入Q3中,現在Q3是空的,可以反向推出Q2肯定是空的。

問題3:為什麼Q4、Q3和delta為空的時候,q1不為空會直接轉移到q4?

根據定義Q1和Q4存儲的消息是處於內存中的alpha狀態的消息,這時候直接從Q1轉到Q4就不需要經過磁碟,減少IO延遲;

rabbit_variable_queue.erl源碼關於轉換狀態還有很多細節,這裡不再介紹。後續深入學習源碼後再分析。

四、總結

節點消息堆積較多時,這些堆積的消息很快就會進入很深的隊列中去,這樣會增加處理每個消息的平均開銷,整個系統的處理能力就會降低。因為要花更多的時間和資源處理堆積的消息,後流入的消息又被擠壓到很深的隊列中了,系統負載越來越惡化。

因此RabbitMQ使用時一定要注意磁碟佔用監控和流控監控,這些在控制台上都可以看到,一般來說如果消息堆積過多建議增加消費者或者增強每個消費者的消費能力(比如調高prefetch_count消費者一次收到的消息可以提高單個消費者消費能力)。

參考文章:

RabbitMQ源碼分析 - 隊列機制

RabbitMQ實戰指南

RabbitMQ官方文檔

本文已由作者李海燕授權網易雲社區發布。


推薦閱讀:

圖解Redis之對象篇
redis需要讀寫分離嗎?
redis的簡單操作
Redis 設置密碼登錄和刪除挖礦進程
Linux安裝redis,並設置訪問許可權,及使用可視化工具

TAG:RabbitMQ | Redis | ZeroMQ |