MQ(1)—— 從隊列到消息中間件

MQ(1)—— 從隊列到消息中間件

來自專欄 Beautiful Java41 人贊了文章

前言

好久不見。

從這篇文章開始,我將帶大家走進消息中間件的世界。

消息中間件本質上就是一種很簡單的數據結構——隊列,但是一條隊列肯定是當不成中間件的,你必須要考慮性能、容災、可靠性等等因素。這也給我的寫作提供了一些思路,我將從隊列開始,給你演示一條隊列是如何進化成一個靠譜的中間件的

消息中間件的實現有很多,有新貴Kafka、RocketMq,也有老牌勁旅RabbitMq和ActiveMq,不過我最後選擇了Nsq來講解,因為它極簡清爽,用起來舒服,講起來也好理解,更重要的是,通過對Nsq的學習,我們很容易擴展到消息中間件的通用層面,對學習其他Mq,乃至優化和設計自己的Mq都有很大幫助。

這一系列的文章,將有以下這些topics:

  1. 為什麼要使用消息中間件
  2. 如何從一條隊列,進化成一個靠譜的消息中間件:這一節,將帶你從演化的視角,認識Nsq的各個組件
  3. Bringing It All Together:從微觀層面了解完Nsq後,我們再從宏觀的視角,把上一節學的東西串起來,看一條消息,是如何從生產到被消費的
  4. 一些實現細節:從近到遠的看完了Nsq,現在讓我們再一次把鏡頭拉近,看看Nsq在處理一些細節問題上的智慧,這對我們理解消息中間件,將有很大幫助
  5. 如何設計一個消息中間件:由近到遠,由遠及近,我們已經把Nsq看了個遍,是時候嘗試總結一下,如何設計一個消息中間件了
  6. Nsq的不足:作為一個極簡的Mq,Nsq肯定做不到的面面俱到,那麼Nsq有哪些不足呢?
  7. Nsq vs Kafka:Nsq的那些不足,Kafka幾乎都給解決了,畢竟人家是重量級Mq,功能自然強大的多。雖然大多數業務,使用Nsq已經可以解決問題,但還是了解一下,Kafka是怎麼解決那些Nsq不屑於解決的問題吧~
  8. 有贊自研版Nsq:基於Nsq的一些不足,和對Kafka實現思路的借鑒,有贊對Nsq進行了自研開發,這一節,讓我們了解一下有贊的改進思路

為什麼要使用消息中間件

假設我們在淘寶下了一筆訂單後,淘寶後台需要做這些事情:

  1. 消息通知系統:通知商家,你有一筆新的訂單,請及時發貨
  2. 推薦系統:更新用戶畫像,重新給用戶推薦他可能感興趣的商品
  3. 會員系統:更新用戶的積分和等級信息

於是一個創建訂單的函數,至少需要取調用三個其他服務的介面,就像這樣:

寫成偽碼:

createOrder(...) { doCreateOrder(...); // 調用其他服務介面 sendMsg(...); updateUserInterestedGoods(...); updateMemberCreditInfo(...);}

這樣的做法,顯然很挫,至少有兩個問題:

  1. 過度耦合:如果後面創建訂單時,需要觸發新的動作,那就得去改代碼,在原有的創建訂單函數末尾,再追加一行代碼
  2. 缺少緩衝:如果創建訂單時,會員系統恰好處於非常忙碌或者宕機的狀態,那這時更新會員信息就會失敗,我們需要一個地方,來暫時存放無法被消費的消息

我們需要一個消息中間件,來實現解耦和緩衝的功能。

消息中間件的實現很多,比較常見的有kafka、rocketmq以及我們今天要講的nsq。

相比於前面兩個mq,nsq可以說是非常輕量級的,理解了它,也有助於學習kafka和rocketmq。所以本文以Nsq為例,來講解消息中間件的一些實現細節。

首先,讓我們從消息中間件的最原始形態開始,一種常見的數據結構 —— 隊列。

Nsq 1.0 —— 我是一條隊列

我們在訂單系統和其他系統的中間,引入了一個消息中間件,或者說,引入了一條隊列。

當訂單系統創建完訂單時,它只需要往隊列里,塞入(push)一條topic為「order_created」的消息。

接著,我們的nsq1.0,會把這條消息,再推送給所有訂閱了這個topic的消息的機器,告訴他們,「有新的訂單,你們該幹嘛幹嘛」。

這樣一個簡單的隊列,就解決了上面的兩個問題:

  • 解耦:如果後面有新的動作,需要在創建訂單後執行,那麼只需要讓新同學自己去訂閱topic為「order_created」的消息即可
  • 緩衝:如果會員系統現在很忙,沒空處理消息,那麼只需跟nsq說,「我很忙,不要再發消息過來了」,那麼nsq就不會給它推送消息,或者會員系統出了故障,消息雖然推送過去了,但是它給處理失敗了,那麼也只需給nsq回復一個「requeue」的命令,nsq就會把消息重新放入隊列,進行重試。

Nsq 2.0 —— channel

作為一個靠譜的中間件,你必須做到:高效、可靠、方便。

上面這個使用一條簡單的隊列來實現的消息中間件,肯定是不滿足這三點的。

首先,假設我的會員系統,部署了三台實例,他們都訂閱了topic為「order_created」的消息,那麼一旦有訂單創建,這三台實例就都會收到消息,並且去更新會員積分信息,而其實我只需要更新一次就ok了。

這就涉及到一個消費者組(Comsumer Group)的概念。消費者組是Kafka里提到的,在Nsq,對應的術語是channel

會員系統的三個實例,當它們收到消息時,要做的事情是一樣的,並且只需要有有一個實例執行,那麼它們就是一個消費者組裡面的,要標識為同一個channel,比如說叫「update_memeber_credit」的channel,而簡訊系統和推薦系統,也要有自己的channel,用來和會員系統作區分,比如說叫「send_msg」和「update_user_interesting_goods」

當nsq收到消息時,會給每個channel複製一份消息,然後channel再給對應的消費者組,推送一條消息。消費者組裡有多個實例,那麼要推給誰呢?這就涉及到負載均衡,比如有一個消費者組裡有ABC三個實例,這次推給了A,那麼下次有可能是推送給B,再下次,也許就是C …

nsq官網上的一張動圖,非常好的解釋了這個過程:

稍微解釋一下,圖中,nsq上有一個叫」clicks「的topic,」clicks「下面有三條channel,也就是三個消費者組,其中channel名稱為」metrics「的,有三個實例。消息A來到nsq後,被複制到三條channel,接著,在metrics上的那個A,被推送到了第二個實例上。接著,又來了一個叫B的消息,這一次,B被推送給了第一個實例進行處理。

Nsq 3.0 —— nsqlookup

上面講過,nsq收到生產者生產的消息後,需要將消息複製多份,然後推送給對應topic和channel的消費者。

那麼,nsq怎麼知道哪些消費者訂閱了topic為「order_created」的消息呢?

總不能在配置文件里寫死吧?ip為10.12.65.123的,埠8878,這個消費者的topic是xxx,channel是xxx,…

因此,我們需要一個類似於微服務裡頭的註冊中心的模塊,來實現服務發現的功能,這就是nsqlookup.

nsqlookup提供了類似於etcd、zookeeper一樣的kv存儲服務,裡面記錄了topic下面都有哪些nsq。

nsqlookup提供了一個/lookup介面,比如你想知道哪些nsq上面,有topic為test的消息,那麼只需要調一下:

curl http://127.0.0.1:4161/lookup?topic=test

nsqlookup就會給你返回對應topic的nsq列表:

{ "channels": [ "xxx" ], "producers": [ { "remote_address": "127.0.0.1:52796", "hostname": "hongzeyangdeMacBook-Pro", "broadcast_address": "127.0.0.1", "tcp_port": 4150, "http_port": 4151, "version": "1.0.0-compat" } ]}

接著消費者只需要遍歷返回的json串里的producers列表,把broadcast_address和tcp_port或者http_port拼起來,就可以拿到要建立連接的url地址。

消費者會和這些nsq,逐個建立連接。nsq收到對應topic的消息後,就會給和他們建立連接的消費者,推送消息。

這個過程,可以從nsq的消費者客戶端實現的代碼中,很清楚的看出來。

我這裡用nsq的Java 客戶端實現brainlag/JavaNSQClient作為例子。

首先,調用/lookup介面,獲取擁有對應topic的nsq列表。注意看代碼,裡面是遍歷了nsqlookup的列表,然後把所有lookup的返回結構,進行合併。

com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup:

畫紅框的地方,正是之前講的拼湊邏輯。

接著和舊的nsq列表比較,進行刪除和新增,保證本地的nsq列表數據是最新的。

com.github.brainlag.nsq.NSQConsumer#connect:

當然,這個過程不會只在消費者啟動時才執行,而是定期去執行,不斷去獲取最新的nsq列表。

Nsq 4.0 —— nsqd集群

作為一個靠譜的中間件,你必須支持集群部署,這樣才能實現可靠、高效。

nsq的集群部署非常簡單,官方推薦一個生產者對應的部署一個nsqd:

What is the recommended topology for nsqd?

We strongly recommend running an nsqd alongside any service(s) that produce messages.

這也能解釋,為什麼上面的/lookup介面,返回的屬性是叫producers,而不是叫nsqs,因為nsq認為一個producer,就對應一個nsq。

當然這樣的做法有不少壞處,如果生產者對應的nsq掛掉了,那它就生產不了消息了。而且每個生產者都要部署一個nsq,未免有些奢侈。

不過對於大多數業務來說,這樣的nsq已經夠用。如果你像有贊一樣,擁有一群Go語言大神,那也不妨對nsq做一下改造。一個簡單的思路,就是模仿消費者側的代碼,通過nsqlookup來動態獲取有效的nsq地址,然後往其中一個nsq發布消息。

小結

這篇文章主要從一個演化的視角,介紹了一條隊列,如何逐步進化成一個消息中間件,也介紹了Nsq的幾大模塊:

  • nsq: 存儲消息的地方,每個topic可以有一個或者多個的channel
  • nsqlookup:實現服務發現的模塊
  • nsqadmin:文章中沒有提及,主要是進行可視化管理的web ui工具

下一講,我將把這一次學到的東西串起來,了解一下,一條消息,是如何從生產到被消費的。

參考

  • Nsq官方文檔

推薦閱讀:

TAG:消息隊列 | 編程 | 中間件 |