基於AMQP實現的golang消息隊列MaxQ

背景

餓廠此前一直是重度rabbitmq使用者,在使用的過程中遭遇了大量的問題,性能問題、故障排查問題等。Rabbitmq是用erlang開發的,該語言過於小眾,實在無力在其之上再做運維和開發。痛定思痛,我們於是決定自研一個消息隊列,為了降低業務層的接入難度,所以該消息隊列需要兼容AMQP協議,這樣就可以在業務層完全無感知的情況下接入MaxQ。

什麼是AMQP協議?

AMQP(Advanced Message Queuing Protocol),是一套消息隊列的七層應用協議標準,由摩根大通和iMatrix在2004年開始著手制定,於2006年發布規範,目前最新版是AMQP 1.0,MaxQ基於AMQP 0.9.1實現。相比zeroMQ這類無Broker的模型,AMQP是一種有Broker的協議模型,也就是需要運行單獨AMQP中間件服務,生產者客戶端和消費者客戶端通過AMQP SDK與AMQP中間件服務通訊。像kafka、JMS這類以topic為核心的Broker,生產者發送key和數據到Broker,由Broker比較key之後決定給那個消費者;而AMQP中淡化了topic的概念,引入了Exchange模型,生產者發送key和數據到Exchange,Exchange根據消費者訂閱queue的路由規則路由到對應的queue,也就是AMQP解耦了key和queue,即解藕了生產者和消費者,使得生產者和消費者之間的關係更靈活,消費者可自由控制消費關係。另外,AMQP是同時支持消息Push和Pull的模型,對於Push模型,消費者還可通過設置Qos達到流控的目的。

下圖是AMQP 0.9.1規範中給出的AMQP架構模型圖:

下面簡單介紹下AMQP中的一些基本概念:

  • Broker:接收和分發消息的應用,MaxQ就是基於AMQP協議實現的Message Broker。
  • Connection:publisher / consumer和broker之間的TCP連接。斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網路故障或broker服務出現問題。
  • Channel:如果客戶端每次和Broker通信都需要建議一條連接,在並大量連接並發的情況下建立TCP Connection的開銷將是巨大的,效率也較低。於是,AMQP引入了channel的概念,channel是connection之上應用層建立的邏輯連接,Broker在實現中可創建單獨的thread/協程來實現channel的並發,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。
  • Virtual host:出於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,類似於網路中的namespace概念。當多個不同的用戶使用同一個Broker提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange / queue等。
  • Exchange:message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue:消息最終會落到queue里,消息會由Broker push給消費者,或者消費者主動去pull queue上的消息;一個message可以被同時分發到多個queue中。
  • Binding:exchange和queue之間的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用於message的分發依據。

MaxQ - AMQP實現架構

MaxQ對AMQP協議的實現,主要做了以下幾件事:

按照協議spec自動生產frame encode/decode,這裡採用了golang的text/template包,將AMQP spec抽象成固定的json和對應的代碼模版,如:

{n "id": 50,n "methods": [{"id": 10,n "arguments": [{"type": "short", "name": "ticket", "default-value": 0},n {"type": "shortstr", "name": "queue", "default-value": ""},n {"type": "bit", "name": "passive", "default-value": false},n {"type": "bit", "name": "durable", "default-value": false},n {"type": "bit", "name": "exclusive", "default-value": false},n {"type": "bit", "name": "auto-delete", "default-value": false},n {"type": "bit", "name": "nowait", "default-value": false},n {"type": "table", "name": "arguments", "default-value": {}}],n "name": "declare",n "synchronous" : true},n {"id": 11,n "arguments": [{"type": "shortstr", "name": "queue"},n {"type": "long", "name": "message-count"},n {"type": "long", "name": "consumer-count"}],n "name": "declare-ok"},n {"id": 20,n "arguments": [{"type": "short", "name": "ticket", "default-value": 0},n {"type": "shortstr", "name": "queue", "default-value": ""},n {"type": "shortstr", "name": "exchange"},n {"type": "shortstr", "name": "routing-key", "default-value": ""},n {"type": "bit", "name": "nowait", "default-value": false},n {"type": "table", "name": "arguments", "default-value": {}}],n "name": "bind",n "synchronous" : true},n {"id": 21,n "arguments": [],n "name": "bind-ok"},n {"id": 30,n "arguments": [{"type": "short", "name": "ticket", "default-value": 0},n {"type": "shortstr", "name": "queue", "default-value": ""},n {"type": "bit", "name": "nowait", "default-value": false}],n "name": "purge",n "synchronous" : true},n {"id": 31,n "arguments": [{"type": "long", "name": "message-count"}],n "name": "purge-ok"},n {"id": 40,n "arguments": [{"type": "short", "name": "ticket", "default-value": 0},n {"type": "shortstr", "name": "queue", "default-value": ""},n {"type": "bit", "name": "if-unused", "default-value": false},n {"type": "bit", "name": "if-empty", "default-value": false},n {"type": "bit", "name": "nowait", "default-value": false}],n "name": "delete",n "synchronous" : true},n {"id": 41,n "arguments": [{"type": "long", "name": "message-count"}],n "name": "delete-ok"},n {"id": 50,n "arguments": [{"type": "short", "name": "ticket", "default-value": 0},n {"type": "shortstr", "name": "queue", "default-value": ""},n {"type": "shortstr", "name": "exchange"},n {"type": "shortstr", "name": "routing-key", "default-value": ""},n {"type": "table", "name": "arguments", "default-value": {}}],n "name": "unbind",n "synchronous" : true},n {"id": 51,n "arguments": [],n "name": "unbind-ok"}n ],n "name": "queue"n},n

代碼生成模版:

type {{$struct}} struct {n {{range .Fields}}n {{.Literal}} {{$.TypeOfFieldLiteral .}}n {{end}}n {{if .Content}}n // Contentn properties *Propertiesn body []byten {{end}}n}n// Name returns the string representation of the Method, implementsn// Method.Name().nfunc (m *{{$struct}}) Name() string {n return "{{.NameLiteral $class.Name}}"n}n// ID returns the AMQP index number of the Method, implements Method.ID().nfunc (m *{{$struct}}) ID() uint16 {n return {{.ID}}n}n// Class returns a instance of the Class of this method, implements Method.Class().nfunc (m *{{$struct}}) Class() Class {n return &{{$classStruct}}{}n}n// String returns the string representation for the Method.nfunc (m *{{$struct}}) String() string {n return {{.StringLiteral $class.Name}}n}n

Vhost API實現:

func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...nfunc (v *VHost) QueueInspect(name string) ...nfunc (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...nfunc (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...nfunc (v *VHost) QueuePurge(name string) ...nfunc (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...nfunc (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)nfunc (v *VHost) ExchangeDelete(name string, ifUnused bool) ...nfunc (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...nfunc (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...nfunc (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte) n

Exchange介面化,實現4種Exchange路由模式

// Exchange publisherntype publisher interface {n bind(b *Binding) (exists bool)n unbind(b *Binding)n bindingsCount() intn allBindings() []*Bindingn publish(msg *Message, routingKey string) (count int, err error)n}n ntype directPublisher struct {n ...n}n ntype fanoutPublisher struct {n ...n}n ntype topicPublisher struct {n ...n}n ntype headersPublisher struct {n ...n}n

Queue介面化——MaxQ集群

  • Normal Queue: queue功能的具體實現,包括Publish、Consume、Cancel、Ack、Get等,單機版MaxQ會實例化此queue。
  • Master Queue: Normal Queue的超集,集群模式下會實例化此queue,在HA鏡像策略下會與Slave Queue同步消息。
  • Virtual Queue: 負責遠程調用Master Queue的API,主要是用作消息轉發。
  • Slave Queue: Virtual Queue的超集,除了消息轉發,還和Master Queue進行消息同步,在Master Queue down掉後,會被選取為新的Master Queue。

MaxQ - 生產實現架構

如果要將MaxQ應用到生產,還需要更多工作要做:

  1. MaxQ集群化,集群間的元數據通過zookeeper存儲和同步,消息通過grpc進行通信。
  2. 通過四層Proxy,生產者或消費者客戶端可以採用官方或第三方的AMQP SDK與MaxQ集群通訊。
  3. 集群管理,由於集群信息和元數據信息都存儲在zookeeper上,因此通過zookeeper可以實現集群節點管理、擴容縮容和集群切換;

同時MaxQ本身提供了HTTP API管理和統計介面,因此可對集群進行監控統計、資源分配等。

MaxQ相關特性

1. 消息可靠性

  • Publishing可靠性,生產者設置confirm服務端確認機制,確認服務端成功接收到生產者消息。
  • 消息Routing可靠性,生產者設置Publish mandatory,確認消息路由到queue。
  • Consuming可靠性,消費者設置手工Ack,服務端在收到消息Ack後才清除本地消息。
  • Persisting可靠性, 採用RAID1存儲持久化消息;
  • 分散式下的可靠性,設置queue的鏡像模式,啟動Slave Queue,與Master Queue進行消息同步,在aster Queue down掉後,Slave Queue可被選舉為Master Queue。

2. 容錯性

  • zookeeper不可用

1. 元數據已緩存在內存中,不會有任何影響,生產方和消費方仍可正常生產和消費

2. 服務會自動降級,元數據不可變更

3. zookeeper恢復,服務自愈

  • 節點故障

通過zookeeper進行Master Queue選舉:

1.NodeA和NodeB收到NodeC掛掉的事件,NodeA和NodeB成為Master queue的候選節點

2.NodeA和NodeB各自上報同步的offset到zookeeper

3.NodeA和NodeB各自決策,offset最新的NodeA選為Master queue

4.NodeA將Master信息同步至zookeeper

5.NodeB更新新的Master信息,並同步數據

6.NodeC恢復,成為Slave queue,並與新的Master同步數據

  • 網路分區

3. 擴展性

  1. HA、Exchange和Queue動態擴展屬性參數
  2. Exchange、Binding、Queue支持自定義擴展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange

使用場景和案例

下面介紹下MaxQ作為消息隊列的經典三種使用場景和使用案例:

1. 非同步解耦

訂單系統與消息通知系統解耦

1.用戶訂單支付成功,直接向MaxQ推送下單成功通知,主流程迅速返回

2.消息通知系統非同步接收通知消息, 發送簡訊通知或應用通知

2. 削峰填谷

SQL-autoreview系統分析優化SQL語句,並將結果落DB,屬於慢消費, 生產高峰期處理能力不夠,可利用MaxQ的堆積能力,勻速消費和處理。

3. 發布訂閱

DC數據變更發布和訂閱

1.DRC將DC的數據變更記錄發布至MaxQ

2.各業務系統訂閱相關的數據變更,並進一步做業務處理

未來的展望

1. Sharding Queue支持Queue的水平擴展,讓單Queue的性能不再成為瓶頸;

2. 支持消息巨量堆積,讓消息堆積不再成為問題;

3. 延時隊列,支持按單消息延時,讓消息延時變的簡單,無需再通過ttl+deadletter exchange做延時推送;

4. 歷史消息trace,追溯查詢已經消費掉的消息,讓生產方和消費方不再因消息是否生產了或消費了而發生扯皮。

作者介紹:

張培培,2015年加入餓了么,現任餓了么框架工具部架構師,負責餓了么消息隊列MaxQ。

參考文檔

1. AMQP 0.9.1 官方協議

2. RabbitMQ與AMQP協議詳解

3. 消息隊列的流派之爭


推薦閱讀:

目前linux進程間通信的常用方法是什麼(pipe?信號量?消息隊列?)?
LocalMQ:從零構建類 RocketMQ 高性能消息隊列
Kafka,Mq,Redis作為消息隊列使用時的差異?

TAG:消息队列 | 分布式系统 | Go语言 |