標籤:

RabbitMQ學習心得——發布/訂閱(中)

在上一篇的教程里,我們介紹了「發布/訂閱」模式,使用的是扇形交換機。扇形交換機的特點是把消息推送給與之綁定的所有隊列。如果我們想把某些消息投送到特定的隊列,換句話說就是消費者能夠只訂閱消息的一個子集。那麼可以使用直連交換機實現。

一、實現目標

還是實現一個日誌系統,消費者能夠只訂閱消息的一個子集。例如,我們把嚴重的錯誤日誌信息輸出到一個控制台中,但同時仍然把所有的日誌信息輸出到另一個控制台中。

二、架構描述

  • 綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做綁定鍵(binding key)。

  • 不同的隊列使用不同的綁定鍵(binding key)與直連交換機綁定。
  • 扇型交換機(fanout exchanges)會忽略這個值。

  • 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發到哪個隊列。(路由演算法)

三、準備

  • 在本地啟動rabbit-server
  • 安裝rabbitmq客戶端。選用pika。

四、代碼實現

  • emit_log_direct.py

# coding=utf-8nimport pikanimport sysnn# 連接伺服器nconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nn# 聲明直連交換機nchannel.exchange_declare(exchange=direct_logs,n type=direct)nn# 提取綁定鍵nseverity = sys.argv[1] if len(sys.argv) > 1 else infon# 提取發送的消息nmessage = .join(sys.argv[2:]) or Hello World!nn# 進行消息的發布,帶上了路由鍵nchannel.basic_publish(exchange=direct_logs,n routing_key=severity,n body=message)nprint " [x] Sent %r:%r" % (severity, message)nconnection.close()n

  • receive_logs_direct.py

# coding=utf-8nimport pikanimport sysnimport timennconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nnchannel.exchange_declare(exchange=direct_logs,n type=direct)nnresult = channel.queue_declare(exclusive=True)nqueue_name = result.method.queuenn# 獲取綁定鍵nseverities = sys.argv[1:]nif not severities:n print >> sys.stderr, "Usage: %s [info] [warning] [error]" % n (sys.argv[0],)n sys.exit(1)nn# 一個隊列與一個交換機可以用多個綁定鍵進行綁定nfor severity in severities:n channel.queue_bind(exchange=direct_logs,n queue=queue_name,n routing_key=severity)nnprint [*] Waiting for logs. To exit press CTRL+Cnnndef callback(ch, method, properties, body):n print " [x] %r:%r" % (method.routing_key, body,)n time.sleep(6)nchannel.basic_consume(callback,n queue=queue_name,n no_ack=True)nnchannel.start_consuming()n

  • 運行

接受消息

發布消息

第一個消費者綁定鍵為:"warning"和"error",第二個消費者的綁定鍵為:"warning"、"info"和"error"。發布者發布了兩條消息,第一條消息的路由鍵為"error",第二條消息的路由鍵為"info"。

  • 疑難解惑

channel.queue_bind(exchange=direct_logs,n queue=queue_name,n routing_key=severity)n

再次重申一下:

綁定(binding)是指交換機(exchange)和隊列(queue)的關係。可以簡單理解為:這個隊列(queue)對這個交換機(exchange)的消息感興趣。

綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做綁定鍵(binding key)

另註:

多個隊列使用相同的綁定鍵是合法的。我們添加一個X和Q1之間的綁定,使用black綁定鍵。同樣,X和Q2之間也使用black綁定。這樣一來,直連交換機就和扇型交換機的行為一樣,會將消息廣播到所有匹配的隊列。帶有black路由鍵的消息會同時發送到Q1和Q2。


推薦閱讀:

RabbitMQ ACK 機制的意義是什麼?
beanstalk和rabbitmq區別?

TAG:RabbitMQ | Python |