LocalMQ:從零構建類 RocketMQ 高性能消息隊列

本文記錄了月前筆者參與阿里雲中間件比賽中,實現的簡要具有持久化功能的消息隊列的設計與實現過程。需要聲明的是,LocalMQ 借鑒了 RocketMQ 在 Broker 部分的核心設計思想,最早的源碼也是基於 RocketMQ 源碼改造而來。本文涉及引用以及其他消息隊列相關資料參考這裡,源代碼放於 LocalMQ 倉庫;另外筆者水平有限,後來因為畢業旅行也未繼續優化,本文很多內容可能存在謬誤與不足,請批評指正。

LocalMQ:從零構建類 RocketMQ 高性能消息隊列

所謂消息隊列,直觀來看有點像蓄水池,能夠在生產者與消費者之間完成解耦,並且平衡生產者與消費者之間的計算量與可計算時間之間的差異;目前主流的消息隊列有著名的 Kafka、RabbitMQ、RocketMQ 等等。在筆者實現的 LocalMQ 中,從簡到復依次實現了 MemoryMessageMQ、EmbeddedMessageQueue 與 LocalMessageQueue 這三個版本;需要說明的是,在三個版本的消息隊列中,都是採取所謂的拉模式,即消費者主動向消息隊列請求拉取消息的模式。在 wx.demo.* 包下提供了很多的內部功能與性能測試用例,

// 首先在這裡:https://parg.co/beX 下載代碼// 然後修改 DefaultProducer 對應的繼承類// 測試 MemoryMessageQueue,則繼承 MemoryProducer;// 測試 EmbeddedMessageQueue,則繼承 EmbeddedProducer;// 默認測試 LocalMessageQueue,注意,需要對 DefaultPullConsumer 進行同樣修改public class DefaultProducer extends LocalProducer// 使用 mvn 運行測試用例,也可以在 Eclipse 或者 Intellij 中打開mvn clean package -U assembly:assembly -Dmaven.test.skip=truejava -Xmx2048m -Xms2048m -cp open-messaging-wx.demo-1.0.jar wx.demo.benchmark.ProducerBenchmark

最簡單的 MemoryMessageQueue 即是將消息數據按照選定主題存放在內存中,其主要結構如下圖所示:

MemoryMessageQueue 提供了同步的消息提交與拉取操作,其利用 HashMap 堆上存儲來緩存所有的消息;並且在內存中維護了另一個所謂的 QueueOffsets 來記錄每個主題對應隊列的消費偏移量。相較於 MemoryMessageQueue 實現的簡單的不能進行持久化存儲的消息隊列,EmbeddedMessageQueue 則提供了稍微複雜點的支持磁碟持久化的消息隊列。EmbeddedMessageQueue 構建了基於 Java NIO 提供的 MappedByteBuffer 的 MappedPartitionQueue。每個 MappedPartitionQueue 對應磁碟上的多個物理文件,並且為上層應用抽象提供了邏輯上的單一文件。EmbeddedMessageQueue 結構如下圖所示:

EmbeddedMessageQueue 的主要流程為生產者同步地像 Bucket Queue 中提交消息,每個 Bucket 可以視作某個主題(Topic)或者隊列(Queue)。而 EmbeddedMessageQueue 還包含著負責定期將 MappedPartitionQueue 中數據持久化寫入到磁碟的非同步線程,該線程會定期地完成 Flush 操作。EmbeddedMessageQueue 假設某個 BucketQueue 被分配給某個 Consumer 之後就被其佔用,該 Consumer 會消費其中全部的緩存消息;每個 Consumer 會包含獨立地 Consumer Offset Table 來記錄當前某個隊列地消費情況。EmbeddedMessageQueue 的缺陷在於:

  • 混合處理與標記位:EmbeddedMessageQueue 僅提供了最簡單的消息序列化模型,無法記錄額外的消息屬性;

  • 持久化存儲到磁碟的時機:EmbeddedMessageQueue 僅使用了一級緩存,並且僅在某個 Partition 寫滿時才進行文件的持久化操作;

  • 添加消息的後處理:EmbeddedMessageQueue 是將消息直接寫入到 BucketQueue 包含的 MappedPartitionQueue 中,無法動態地進行索引、篩選等消息後處理,其可擴展性較差。

  • 未考慮斷續拉取的情況:EmbeddedMessageQueue 中是假設 Consumer 能夠單次處理完某個 BucketQueue 中的單個 Partition 的全部消息,因此記錄其處理值時也僅是記錄了文件級別的位移,如果存在某次是僅拉取了單個 Partition 中部分內容,則下次的起始拉取點還是下個文件首。

EmbeddedMessageQueue 中我們可以在各 Producer 線程中單獨將消息持久化入文件中,而在 LocalMessageQueue 中,我們是將消息統一寫入 MessageStore 中,然後又 PostPutMessageService 進行二次處理。 LocalMessageQueue 的結構如下所示:

LocalMessageQueue 最大的變化在於將消息統一存儲在獨立地 MessageStore 中(類似於 RocketMQ 中的 CommitLog),然後針對 Topic-queueId 將消息劃分到不同的 ConsumeQueue 中;這裡的 queueId 是由對應的 Producer 專屬編號決定的,每個 Consumer 即會被分配佔用某個 ConsumeQueue(類似於 RocketMQ 中的 consumequeue),從而保證某個 Producer 生產的某個主題下的消息被專一的 Consumer 消費。LocalMessageQueue 同樣使用 MappedPartitionQueue 提供底層文件系統抽象,並且構建了獨立的 ConsumerOffsetManager 對消費者的消費進度進行管理,從而方便異常恢復。

設計概要

順序消費

本部分圖來源於分散式開放消息系統(RocketMQ)的原理與實踐

消息產品的一個重要特性是順序保證,也就是消息消費的順序要與發送的時間順序保持一致;在多發送端的情況下,保證全局順序代價比較大,只要求各個發送端的順序有保障即可; 舉個例子 P1 發送 M11, M12, M13,P2 發送 M21, M22, M23,在消費的時候,只要求保證 M11, M12, M13(M21,M22,M23)的順序,也就是說,實際消費順序為: M11, M21, M12, M13, M22, M23 正確; M11, M21, M22, M12, M13, M23 正確 M11, M13, M21, M22, M23, M12 錯誤,M12 與 M13 的順序顛倒了;假如生產者產生了 2 條消息:M1、M2,要保證這兩條消息的順序,最直觀的方式就是採取類似於 TCP 中的確認消息:

不過該模型中如果 M1 與 M2 分別被發送到了兩台不同的消息伺服器上,我們無法控制消息伺服器發送 M1 與 M2 的先後時機;有可能 M2 已經被發送到了消費者,M1 才被發送到了消息伺服器上。針對這個問題改進版的思路即是將 M1 與 M2 發送到單一消息伺服器中,然後根據先到達先消費的原則發送給對應的消費者:

不過在實際情況下往往因為網路延遲或其他問題導致在 M1 發送耗時大於 M2 的情況下,M2 會先於 M1 被消費。因此如果我們要保證嚴格的順序消息,那麼必須要保證生產者、消息伺服器與消費者之間的一對一對應關係。在 LocalMQ 的實現中,我們首先會將消息按照生產者劃分到唯一的 Topic-queueId 隊列中;並且保證同一時刻該消費隊列只會被某個消費者獨佔。如果某個消費者在消費完該隊列之前意外中斷,那麼在保留窗口期內不會將該隊列重新分配;在窗口期之外則將該隊列分配給新的消費者,並且即使原有消費者恢復工作也無法繼續拉取該隊列中包含的消息。

數據存儲

LocalMQ 中目前是實現了基於文件系統的持久化存儲,主要功能實現在 MappedPartition 與 MappedPartitionQueue 這兩個類中,筆者也會在下文中詳細介紹這兩個類的實現。本部分我們討論下數據存儲的文件格式,對於 LocalMessageQueue 而言,其文件存儲如下:

* messageStore * -- MapFile1 * -- MapFile2 * consumeQueue * -- Topic1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2 * -- Queue1 * ---- queueId1 * ------ MapFile1 * ------ MapFile2 * ---- queueId2 * ------ MapFile1 * ------ MapFile2

LocalMessageQueue 中採用了消息統一存儲的方案,因此所有的消息實際內容會被存放在 messageStore 目錄下。而 consumeQueue 中則存放了消息的索引,即在 messageStore 中的偏移地址。LocalMQ 中使用 MappedPartitionQueue 來管理某個邏輯上單一的文件,而根據不同的單文件大小限制會自動將其切割為多個物理上獨立的 Mapped File。每個 MappedPartition 使用 offset,即該文件首地址的全局偏移量命名;而使用 pos / position 統一表示單文件中局部偏移量,使用 index 表示某個文件在其文件夾中的下標。

性能優化

在編寫的過程中,筆者發現對於執行流的優化、避免重複計算與額外變數、選擇使用合適的並發策略都會對結果造成極大的影響,譬如筆者從 SpinLock 切換到重入鎖之後,本地測試 TPS 增加了約 5%。另外筆者也統計了消費者工作中不同階段的時間佔比,其中構建(包括消息屬性的序列化)與發送操作(寫入到 MappedFileQueue 中,未使用二級緩存)都是同步進行,二者的時間佔比也是最多。

[2017-06-01 12:13:21,802] INFO: 構建耗時佔比:0.471270,發送耗時佔比:0.428567,持久化耗時佔比:0.100163[2017-06-01 12:25:31,275] INFO: 構建耗時佔比:0.275170,發送耗時佔比:0.573520,持久化耗時佔比:0.151309

代碼級別優化

筆者在實現 LocalMQ 的過程中感觸最深的就是實現相同功能的不同代碼在性能上的差異可能會很大。在實現過程中應該避免冗餘變數聲明與創建、避免額外空間申請與垃圾回收、避免冗餘的執行過程;另外儘可能選用合適的數據結構,譬如筆者在部分實現中從 ArrayList 遷移到了 LinkedList,從 ConcurrentHashMap 遷移到了 HashMap,都帶來了一定的評測指標提升。

非同步 IO

非同步 IO,順序 Flush;筆者發現,如果多個線程進行並發 Flush 操作,反而不如單線程進行順序 Flush。

並發控制

  • 盡量減少鎖控制的範圍。

  • 並發計算優化,將所有的耗時計算放到可以並發的 Producer 中。

  • 使用合理的鎖,重入鎖相較於自旋鎖有近 5 倍的 TPS 提升。

MemoryMessageQueue

源代碼參考這裡

MemoryMessageQueue 是最簡易的實現,不過其代碼能夠反映出某個消息隊列的基本流程,首先在生產者我們需要創建消息並且發送給消息隊列:

// 創建消息BytesMessage message = messageFactory.createBytesMessageToTopic(topic, body);// 發送消息messageQueue.putMessage(topic, message);

在 putMessage 函數中則將消息存入內存存儲中:

// 存放所有消息private Map<String, ArrayList<Message>> messageBuckets = new HashMap<>();// 添加消息public synchronized PutMessageResult putMessage(String bucket, Message message) { if (!messageBuckets.containsKey(bucket)) { messageBuckets.put(bucket, new ArrayList<>(1024)); } ArrayList<Message> bucketList = messageBuckets.get(bucket); bucketList.add(message); return new PutMessageResult(PutMessageStatus.PUT_OK, null); }

而 Consumer 則根據指定的 Bucket 與 queueId 來拉取消息,如果存在多個 Bucket 需要拉取則進行輪詢:

//use Round Robinint checkNum = 0;while (++checkNum <= bucketList.size()) { String bucket = bucketList.get((++lastIndex) % (bucketList.size())); Message message = messageQueue.pullMessage(queue, bucket); if (message != null) { return message; }}

而 MemoryMessageQueue 的 pullMessage 函數則首先判斷目標 Bucket 是否存在,並且根據內置的 queueOffset 中記錄的拉取偏移量來判斷是否拉取完畢。若沒有拉取完畢則返回消息並且更新本地偏移量;

private Map<String, HashMap<String, Integer>> queueOffsets = new HashMap<>();...public synchronized Message pullMessage(String queue, String bucket) { ... ArrayList<Message> bucketList = messageBuckets.get(bucket); if (bucketList == null) { return null; } HashMap<String, Integer> offsetMap = queueOffsets.get(queue); if (offsetMap == null) { offsetMap = new HashMap<>(); queueOffsets.put(queue, offsetMap); } int offset = offsetMap.getOrDefault(bucket, 0); if (offset >= bucketList.size()) { return null; } Message message = bucketList.get(offset); offsetMap.put(bucket, ++offset); ...}

EmbeddedMessageQueue

源代碼參考這裡

EmbeddedMessageQueue 中引入了消息持久化支持,本部分我們也主要討論消息序列化與底層的 MappedPartitionQueue 實現。

消息序列化

EmbeddedMessageQueue 中定義的消息格式如下:

序號消息存儲結構備註長度(位元組數)1TOTALSIZE消息大小42MAGICCODE消息的 MAGIC CODE43BODY前 4 個位元組存放消息體大小值,後 bodyLength 大小的空間存儲消息體內容4 + bodyLength4headers*前 2 個位元組(short)存放頭部大小,後存放 headersLength 大小的頭部數據2 + headersLength5properties*前 2 個位元組(short)存放屬性值大小,後存放 propertiesLength 大小的屬性數據2 + propertiesLength

EmbeddedMessageSerializer 是繼承自 MessageSerializer 的主要負責消息持久化的類,其提供了消息長度的計算函數:

/** * Description 計算某個消息的長度,注意,headersByteArray 與 propertiesByteArray 在發送消息時完成轉換 * @param message * @param headersByteArray * @param propertiesByteArray * @return */public static int calMsgLength(DefaultBytesMessage message, byte[] headersByteArray, byte[] propertiesByteArray) { // 消息體 byte[] body = message.getBody(); int bodyLength = body == null ? 0 : body.length; // 計算頭部長度 short headersLength = (short) headersByteArray.length; // 計算屬性長度 short propertiesLength = (short) propertiesByteArray.length; // 計算消息體總長度 return calMsgLength(bodyLength, headersLength, propertiesLength);}

而 EmbeddedMessageEncoder 的 encode 函數負責具體的消息序列化操作:

/** * Description 執行消息的編碼操作 * @param message 消息對象 * @param msgStoreItemMemory 內部緩存句柄 * @param msgLen 計算的消息長度 * @param headersByteArray 消息頭位元組序列 * @param propertiesByteArray 消息屬性位元組序列*/public static final void encode( DefaultBytesMessage message, final ByteBuffer msgStoreItemMemory, int msgLen, byte[] headersByteArray, byte[] propertiesByteArray) {// 消息體byte[] body = message.getBody();int bodyLength = body == null ? 0 : body.length;// 計算頭部長度short headersLength = (short) headersByteArray.length;// 計算屬性長度short propertiesLength = (short) propertiesByteArray.length;// 初始化存儲空間resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEmsgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEmsgStoreItemMemory.putInt(MESSAGE_MAGIC_CODE);// 3 BODYmsgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0) msgStoreItemMemory.put(message.getBody());// 4 HEADERSmsgStoreItemMemory.putShort((short) headersLength);if (headersLength > 0) msgStoreItemMemory.put(headersByteArray);// 5 PROPERTIESmsgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength > 0) msgStoreItemMemory.put(propertiesByteArray);}

對應的反序列化操作則是由 EmbeddedMessageDecoder 完成,其主要從某個 ByteBuffer 中讀取數據:

/** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */public static DefaultBytesMessage readMessageFromByteBuffer(ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default:// log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; // 3 BODY int bodyLen = byteBuffer.getInt(); byte[] body = new byte[bodyLen]; if (bodyLen > 0) { // 讀取並且校驗消息體內容 byteBuffer.get(body, 0, bodyLen); } // 4 HEADERS short headersLength = byteBuffer.getShort(); KeyValue headers = null; if (headersLength > 0) { byteBuffer.get(bytesContent, 0, headersLength); String headersStr = new String(bytesContent, 0, headersLength, EmbeddedMessageDecoder.CHARSET_UTF8); headers = string2KeyValue(headersStr); } // 5 PROPERTIES // 獲取 properties 尺寸 short propertiesLength = byteBuffer.getShort(); KeyValue properties = null; if (propertiesLength > 0) { byteBuffer.get(bytesContent, 0, propertiesLength); String propertiesStr = new String(bytesContent, 0, propertiesLength, EmbeddedMessageDecoder.CHARSET_UTF8); properties = string2KeyValue(propertiesStr); } // 返回讀取到的消息 return new DefaultBytesMessage( totalSize, headers, properties, body );}

消息寫入

EmbeddedMessageQueue 中消息的寫入實際上是由 BucketQueue 的 putMessage/putMessages 函數完成的,這裡的某個 BucketQueue 就對應著 Topic-queueId 這個唯一的標識。這裡以批量寫入消息為例,首先我們從 BucketQueue 包含的 MappedPartitionQueue 中獲取到最新可用的某個 MappedPartition:

mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0);

然後調用 MappedPartition 的 appendMessages 方法,該方法會在下文介紹;這裡則是要討論添加消息的幾種結果對應的處理。如果添加成功,則直接返回成功;如果該 MappedPartition 剩餘空間不足以寫入消息隊列中的某條消息,則需要調用 MappedPartitionQueue 創建新的 MappedPartition,並且重新計算待寫入的消息序列:

...// 調用對應的 MappedPartition 追加消息// 注意,這裡經過填充之後,會逆向地將消息在 MessageStore 中的偏移與 QueueOffset 中偏移添加進去result = mappedPartition.appendMessages(messages, this.appendMessageCallback);// 根據追加結果進行不同的操作switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: this.messageQueue.getFlushAndUnmapPartitionService().putPartition(mappedPartition); // 如果已經到了文件最後,則創建新文件 mappedPartition = this.mappedPartitionQueue.getLastMappedFileOrCreate(0); if (null == mappedPartition) { // XXX: warn and notify me log.warning("創建 MappedPartition 錯誤, topic: " + messages.get(0).getTopicOrQueueName()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } // 否則重新進行添加操作 // 從結果中獲取處理完畢的消息數 int appendedMessageNum = result.getAppendedMessageNum(); // 創建臨時的 LeftMessages ArrayList<DefaultBytesMessage> leftMessages = new ArrayList<>(); // 添加所有未消費的消息 for (int i = appendedMessageNum; i < messages.size(); i++) { leftMessages.add(messages.get(i)); } result = mappedPartition.appendMessages(leftMessages, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}...

邏輯文件存儲

Mapped Partition

某個 MappedPartition 映射物理上的單個文件,其初始化時如下傳入文件名與文件尺寸屬性:

/** * Description 初始化某個內存映射文件 * * @param fileName 文件名 * @param fileSize 文件尺寸 * @throws IOException 打開文件出現異常 */private void init(final String fileName, final int fileSize) throws IOException { ... // 從文件名中獲取到當前文件的全局偏移量 this.fileFromOffset = Long.parseLong(this.file.getName()); ... // 嘗試打開文件 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 將文件映射到內存中 this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);}

初始化階段即打開文件映射,而後在寫入消息或者其他內容時,其會調用傳入的消息編碼回調(即是我們上文中介紹的消息序列化的包裹對象)將對象編碼為位元組流並且寫入:

public AppendMessageResult appendMessage(final DefaultBytesMessage message, final AppendMessageCallback cb) { ... // 獲取當前的寫入位置 int currentPos = this.wrotePosition.get(); // 如果當前還是可寫的 if (currentPos < this.fileSize) { // 獲取到實際的寫入句柄 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // 調整當前寫入位置 byteBuffer.position(currentPos); // 記錄信息 AppendMessageResult result = null; // 調用回調函數中的實際寫入操作 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, message); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } ...}

MappedPartitionQueue

MappedPartitionQueue 用來管理多個物理上的映射文件,其構造函數如下:

// 存放所有的映射文件private final CopyOnWriteArrayList<MappedPartition> mappedPartitions = new CopyOnWriteArrayList<MappedPartition>();.../** * Description 默認構造函數 * * @param storePath 傳入的存儲文件目錄,有可能傳入 MessageStore 目錄或者 ConsumeQueue 目錄 * @param mappedFileSize * @param allocateMappedPartitionService */public MappedPartitionQueue(final String storePath, int mappedFileSize, AllocateMappedPartitionService allocateMappedPartitionService) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.allocateMappedPartitionService = allocateMappedPartitionService;}{}

這裡以 load 函數為例說明其載入過程:

/** * Description 載入內存映射文件序列 * * @return */public boolean load() { // 讀取存儲路徑 File dir = new File(this.storePath); // 列舉目錄下所有文件 File[] files = dir.listFiles(); // 如果文件不為空,則表示有必要載入 if (files != null) { // 重排序 Arrays.sort(files); // 遍歷所有的文件 for (File file : files) { // 如果碰到某個文件尚未填滿,則返回載入完畢 if (file.length() != this.mappedFileSize) { log.warning(file + " " + file.length() + " length not matched message store config value, ignore it"); return true; } // 否則載入文件 try { // 實際讀取文件 MappedPartition mappedPartition = new MappedPartition(file.getPath(), mappedFileSize); // 設置當前文件指針到文件尾 mappedPartition.setWrotePosition(this.mappedFileSize); mappedPartition.setFlushedPosition(this.mappedFileSize); // 將文件放置到 MappedFiles 數組中 this.mappedPartitions.add(mappedPartition);// log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.warning("load file " + file + " error"); return false; } } } return true;}

非同步預創建文件

處於性能的考慮,MappedPartitionQueue 還會提前創建文件,在 getLastMappedFileOrCreate 函數中,當 allocateMappedPartitionService 存在的情況下則會調用該非同步服務預創建文件:

/** * Description 根據起始偏移量查找最後一個文件 * * @param startOffset * @return*/public MappedPartition getLastMappedFileOrCreate(final long startOffset) { ... // 如果有必要創建文件 if (createOffset != -1) { // 獲取到下一個文件的路徑與文件名 String nextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset); // 以及下下個文件的路徑與文件名 String nextNextFilePath = this.storePath + File.separator + FSExtra.offset2FileName(createOffset + this.mappedFileSize); // 指向待創建的映射文件句柄 MappedPartition mappedPartition = null; // 判斷是否存在創建映射文件的服務 if (this.allocateMappedPartitionService != null) { // 使用服務創建 mappedPartition = this.allocateMappedPartitionService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); // 進行預熱處理 } else { // 否則直接創建 try { mappedPartition = new MappedPartition(nextFilePath, this.mappedFileSize); } catch (IOException e) { log.warning("create mappedPartition exception"); } } ... return mappedPartition; } return mappedPartitionLast;}

這裡的 AllocateMappedPartitionService 則會不間斷地執行創建文件的請求:

@Overridepublic void run() { ... // 循環執行文件分配請求 while (!this.isStopped() && this.mmapOperation()) {} ...}/** * Description 循環執行映射文件預分配 * * @Exception Only interrupted by the external thread, will return false */private boolean mmapOperation() { ... // 執行操作 try { // 取出最新的執行對象 req = this.requestQueue.take(); // 取得待執行對象在請求表中的實例 AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); ... // 判斷是否已經存在創建好的對象 if (req.getMappedPartition() == null) { // 記錄起始創建時間 long beginTime = System.currentTimeMillis(); // 構建內存映射文件對象 MappedPartition mappedPartition = new MappedPartition(req.getFilePath(), req.getFileSize()); ... // 進行文件預熱,僅預熱 MessageStore if (mappedPartition.getFileSize() >= mapedFileSizeCommitLog && isWarmMappedFileEnable) { mappedPartition.warmMappedFile(); } // 將創建好的對象回寫到請求中 req.setMappedPartition(mappedPartition); // 異常設置為 false this.hasException = false; // 成功設置為 true isSuccess = true; } ...}

非同步 Flush

EmbeddedMessageQueue 中還包含了某個 flushAndUnmapPartitionServices 用於非同步 Flush 文件並且完成不用映射文件的關閉操作。該服務的核心代碼如下:

private final ConcurrentLinkedQueue<MappedPartition> mappedPartitions = new ConcurrentLinkedQueue<>();...@Overridepublic void run() { while (!this.isStopped()) { int interval = 100; try { if (this.mappedPartitions.size() > 0) { long startTime = now(); // 取出待處理的 MappedPartition MappedPartition mappedPartition = this.mappedPartitions.poll(); // 將當前內容寫入到磁碟 mappedPartition.flush(0); // 釋放當前不需要使用的空間 mappedPartition.cleanup(); long past = now() - startTime;// EmbeddedProducer.flushEclipseTime.addAndGet(past); if (past > 500) { log.info("Flush data to disk and unmap MappedPartition costs " + past + " ms:" + mappedPartition.getFileName()); } } else { // 定時進行 Flush 操作 this.waitForRunning(interval); } } catch (Throwable e) { log.warning(this.getServiceName() + " service has exception. "); } }}

這裡的 mappedPartitions 即是在上文介紹的當添加消息且返回為 END_OF_FILE 時候添加進來的。

LocalMessageQueue

源代碼參考這裡

消息存儲

LocalMessageQueue 中採用了中心化的消息存儲方案,其提供的 putMessage / putMessages 函數實際上會調用內置 MessageStore 對象的消息寫入函數:

// 使用 MessageStore 進行提交PutMessageResult result = this.messageStore.putMessage(message);

而 MessageStore 即是存放所有真實消息的中心存儲,LocalMessageQueue 中支持更為複雜的消息屬性:

序號消息存儲結構備註長度(位元組數)1TOTALSIZE消息大小42MAGICCODE消息的 MAGIC CODE43BODYCRC消息體 BODY CRC,用於重啟時校驗44QUEUEID隊列編號,queueID45QUEUEOFFSET自增值,不是真正的 consume queue 的偏移量,可以代表這個隊列中消息的個數,要通過這個值查找到 consume queue 中數據,QUEUEOFFSET * 12 才是偏移地址86PHYSICALOFFSET消息在 commitLog 中的物理起始地址偏移量87STORETIMESTAMP存儲時間戳88BODY前 4 個位元組存放消息體大小值,後 bodyLength 大小的空間存儲消息體內容4 + bodyLength9TOPICORQUEUENAME前 1 個位元組存放 Topic 大小,後存放 topicOrQueueNameLength 大小的主題名1 + topicOrQueueNameLength10headers*前 2 個位元組(short)存放頭部大小,後存放 headersLength 大小的頭部數據2 + headersLength11properties*前 2 個位元組(short)存放屬性值大小,後存放 propertiesLength 大小的屬性數據2 + propertiesLength

其構造函數中初始化創建的 MappedPartitionQueue 是按照固定大小(默認單文件 1G)的映射文件組:

// 構造映射文件類this.mappedPartitionQueue = new MappedPartitionQueue( ((LocalMessageQueueConfig) this.messageStore.getMessageQueueConfig()).getStorePathCommitLog(), mapedFileSizeCommitLog, messageStore.getAllocateMappedPartitionService(), this.flushMessageStoreService);

構建 ConsumeQueue

不同於 EmbeddedMessageQueue,LocalMessageQueue 並沒有在初次提交消息時就直接寫入按照 Topic-queueId 劃分的存儲內;而是依賴於內置的 PostPutMessageService :

/** * Description 執行消息後操作 */private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { ... // 讀取當前的消息 SelectMappedBufferResult result = this.messageStore.getMessageStore().getData(reputFromOffset); // 如果消息不存在,則停止當前操作 if (result == null) { doNext = false; continue; } try { // 獲取當前消息的起始位置 this.reputFromOffset = result.getStartOffset(); // 順序讀取所有消息 for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 讀取當前位置的消息 PostPutMessageRequest postPutMessageRequest = checkMessageAndReturnSize(result.getByteBuffer()); int size = postPutMessageRequest.getMsgSize(); readSpendTime.addAndGet(now() - startTime); startTime = now(); // 如果處理成功 if (postPutMessageRequest.isSuccess()) { if (size > 0) { // 執行消息寫入到 ConsumeQueue 的操作 this.messageStore.putMessagePositionInfo(postPutMessageRequest); // 修正當前讀取的位置 this.reputFromOffset += size; readSize += size; } else if (size == 0) { this.reputFromOffset = this.messageStore.getMessageStore().rollNextFile(this.reputFromOffset); readSize = result.getSize(); } putSpendTime.addAndGet(now() - startTime); } else if (!postPutMessageRequest.isSuccess()) { ... } } } finally { result.release(); } }}

而在 putMessagePositionInfo 函數中即進行實際的 ConsumeQueue 創建:

/** * Description 將消息的位置放置到 ConsumeQueue 中 * * @param postPutMessageRequest */public void putMessagePositionInfo(PostPutMessageRequest postPutMessageRequest) { // 尋找或者創建 ConsumeQueue ConsumeQueue cq = this.findConsumeQueue(postPutMessageRequest.getTopic(), postPutMessageRequest.getQueueId()); // 將消息放置到 ConsumeQueue 中合適的位置 cq.putMessagePositionInfoWrapper(postPutMessageRequest.getCommitLogOffset(), postPutMessageRequest.getMsgSize(), postPutMessageRequest.getConsumeQueueOffset());}/** * Description 根據主題與 QueueId 查找 ConsumeQueue,如果不存在則創建 * * @param topic * @param queueId * @return*/public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); ... // 判斷該主題下是否存在 queueId,不存在則創建 ConsumeQueue logic = map.get(queueId); // 如果獲取為空,則創建新的 ConsumeQueue if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue(// topic, // 主題 queueId, // queueId LocalMessageQueueConfig.mapedFileSizeConsumeQueue, // 映射文件尺寸 this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); ... } return logic;}

而在 ConsumeQueue 的構造函數中完成實際的文件映射與讀取:

/** * Description 主要構造函數 * * @param topic * @param queueId * @param mappedFileSize * @param localMessageStore */public ConsumeQueue( final String topic, final int queueId, final int mappedFileSize, final LocalMessageQueue localMessageStore) { ... // 當前隊列的路徑 String queueDir = this.storePath + File.separator + topic + File.separator + queueId; // 初始化內存映射隊列 this.mappedPartitionQueue = new MappedPartitionQueue(queueDir, mappedFileSize, null); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);}

ConsumeQueue 的文件格式則相對簡單:

// ConsumeQueue 文件內存放的單條 Message 尺寸// 1 | MessageStore Offset | int 8 Byte// 2 | Size | short 8 Byte

消息拉取

在 LocalPullConsumer 拉取消息時,設置的批量拉取機制;即一次性從 LocalMessageQueue 拉取多條消息到本地,然後再批次返回給本地進行處理(假設處理也有一定耗時)。在批次拉取的函數中,我們首先需要獲取當前 Consumer 處理的主題與隊列編號對應的 ConsumeQueue 是否包含數據,然後再申請具體的讀取句柄並且佔用該隊列:

/** * Description 批量抓取消息,注意,這裡只進行預抓取,僅當消費者真正獲取後才會修正讀取偏移量 */private void batchPoll() { // 如果是 LocalMessageQueue // 執行預抓取 LocalMessageQueue localMessageStore = (LocalMessageQueue) this.messageQueue; // 獲取當前待抓取的桶名 String bucket = bucketList.get((lastIndex) % (bucketList.size())); // 首先獲取待抓取的隊列和偏移 long offsetInQueue = localMessageStore.getConsumerScheduler().queryOffsetAndLock("127.0.0.1:" + this.refId, bucket, this.getQueueId()); // 如果當前待抓取的 queueId 已經被佔用,則直接切換到下一個主題 if (offsetInQueue == -2) { // 將當前主題設置為 true this.isFinishedTable.put(bucket, true); // 重置當前的 LastIndex 或者 RefOffset,即 queueId this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 獲取到了有效的隊列偏移量之後,開始嘗試獲取消息 consumerOffsetTable.put(bucket, new AtomicLong(offsetInQueue)); // 設置每次最多抓一個文件內包含的消息數,等價於變相的一次性讀完,注意,這裡的數目還受到單個文件尺寸的限制 GetMessageResult getMessageResult = localMessageStore.getMessage(bucket, this.getQueueId(), this.consumerOffsetTable.get(bucket).get() + 1, mapedFileSizeConsumeQueue / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 如果沒有找到數據,則切換到下一個 if (getMessageResult.getStatus() != GetMessageStatus.FOUND) { // 將當前主題設置為 true this.isFinishedTable.put(bucket, true); this.resetLastIndexOrRefOffsetWhenNotFound(); } else { // 這裡不考慮 Consumer 被惡意幹掉的情況,因此直接更新遠端的 Offset 值 localMessageStore.getConsumerScheduler().updateOffset("127.0.0.1:" + this.refId, bucket, this.getQueueId(), consumerOffsetTable.get(bucket).addAndGet(getMessageResult.getMessageCount())); // 首先從文件系統中一次性讀出所有的消息 ArrayList<DefaultBytesMessage> messages = readMessagesFromGetMessageResult(getMessageResult); // 將消息添加到隊列中 this.messages.addAll(messages); // 本次抓取成功後才開始抓取下一個 lastIndex++; } }}

消費者調度

ConsumerScheduler 為我們提供了核心的消費者調度功能,其內置的 ConsumerOffsetManager 包含了兩個核心存儲:

// 存放映射到內存中private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/*queueId*/, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);// 存放某個 Topic 下面的某個 Queue 被某個 Consumer 佔用的信息private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/*queueId*/, String/*refId*/>> queueIdOccupiedByConsumerTable = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, String>>(512);

分別對應了某個 ConsumeQueue 被消費的進度和被消費者的佔用信息。同時 ConsumerOffsetManager 還提供了基於 JSON 格式的持久化功能,並且通過 ConsumerScheduler 中的定期服務 scheduledExecutorService 進行自動定期持久化。在消息提交階段,LocalMessageQueue 會自動調用 updateOffset 函數更初始化某個 ConsumeQueue 的偏移情況(在恢復時也會使用):

public void updateOffset(final String topic, final int queueId, final long offset) { this.consumerOffsetManager.commitOffset("Broker Inner", topic, queueId, offset);}

而某個 Consumer 在初次拉取時,會調用 queryOffsetAndLock 函數來查詢某個 ConsumeQueue 的可拉取情況:

/** * Description 修正某個 ConsumerOffset 隊列中的值 * * @param topic * @param queueId * @return */public long queryOffsetAndLock(final String clientHostAndPort, final String topic, final int queueId) { String key = topic; // 首先判斷該 Topic-queueId 是否被佔用 if (this.queueIdOccupiedByConsumerTable.containsKey(topic)) { ... } // 如果沒有被佔用,則此時宣告佔用 ConcurrentHashMap<Integer, String> consumerQueueIdMap = this.queueIdOccupiedByConsumerTable.get(key); ... // 真實進行查找操作 ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } // 默認返回值為 -1 return -1;}

並且在拉取完畢後調用 updateOffset 函數來更新拉取進度。

消息讀取

在某個 Consumer 通過 ConsumerManager 獲取可用的拉取偏移量之後,即從 LocalMessageQueue 中進行真實地消息讀取操作:

/** * Description Consumer 從存儲中讀取數據的介面 * * @param topic * @param queueId * @param offset 下一個開始抓取的起始下標 * @param maxMsgNums * @return */public GetMessageResult getMessage(final String topic, final int queueId, final long offset, final int maxMsgNums) { ... // 根據 Topic 與 queueId 構建消費者隊列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 保證當前 ConsumeQueue 存在 if (consumeQueue != null) { // 獲取當前 ConsumeQueue 中包含的最小的消息在 MessageStore 中的位移 minOffset = consumeQueue.getMinOffsetInQueue(); // 注意,最大的位移地址即是不可達地址,是當前所有消息的下一個消息的下標 maxOffset = consumeQueue.getMaxOffsetInQueue(); // 如果 maxOffset 為零,則表示沒有可用消息 if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = 0; } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = minOffset; } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = offset; } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = minOffset; } else { nextBeginOffset = maxOffset; } } else { // 根據偏移量獲取當前 ConsumeQueue 的緩存 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; // 設置每次獲取的最大消息數 final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 遍歷所有的 Consume Queue 中的消息指針 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } // 從 MessageStore 中獲取消息 SelectMappedBufferResult selectResult = this.messageStore.getMessage(offsetPy, sizePy); // 如果沒有獲取到數據,則切換到下一個文件繼續 if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.messageStore.rollNextFile(offsetPy); continue; } // 如果獲取到了,則返回結果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; // 獲取當前內存情況 long memory = (long) (getTotalPhysicalMemorySize() * (LocalMessageQueueConfig.accessMessageInMemoryMaxRatio / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = consumeQueue.rollNextFile(offset); log.warning("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { ... } ...}

注意,這裡返回的其實只是消息在 MessageStore 中的存放地址,真實地消息讀取還需要通過 readMessagesFromGetMessageResult 函數:

/** * Description 從 GetMessageResult 中抓取全部的消息 * * @param getMessageResult * @return */public static ArrayList<DefaultBytesMessage> readMessagesFromGetMessageResult(final GetMessageResult getMessageResult) { ArrayList<DefaultBytesMessage> messages = new ArrayList<>(); try { List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { messages.add(readMessageFromByteBuffer(bb)); } } finally { getMessageResult.release(); } // 獲取位元組數組 return messages;}/** * Description 從輸入的 ByteBuffer 中反序列化消息對象 * * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */public static DefaultBytesMessage readMessageFromByteBuffer(java.nio.ByteBuffer byteBuffer) { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return null; default: log.warning("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return null; } byte[] bytesContent = new byte[totalSize]; ...}

後記

端午前後即已停止代碼編寫,原以為周把時間可以完成文檔編寫;可惜畢業旅行和畢業聚會一直拖到了七月,最後也是匆匆寫完,也是我個人拖延癌晚期,不由感慨啊。

推薦閱讀:

Kafka,Mq,Redis作為消息隊列使用時的差異?

TAG:Java | 消息队列 | RocketMQ |