RabbitMQ學習心得——發布/訂閱(中)
一、實現目標
還是實現一個日誌系統,消費者能夠只訂閱消息的一個子集。例如,我們把嚴重的錯誤日誌信息輸出到一個控制台中,但同時仍然把所有的日誌信息輸出到另一個控制台中。
二、架構描述
- 綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做綁定鍵(binding key)。
- 不同的隊列使用不同的綁定鍵(binding key)與直連交換機綁定。
- 扇型交換機(fanout exchanges)會忽略這個值。
- 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發到哪個隊列。(路由演算法)
三、準備
- 在本地啟動rabbit-server
- 安裝rabbitmq客戶端。選用pika。
四、代碼實現
- emit_log_direct.py
# coding=utf-8nimport pikanimport sysnn# 連接伺服器nconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nn# 聲明直連交換機nchannel.exchange_declare(exchange=direct_logs,n type=direct)nn# 提取綁定鍵nseverity = sys.argv[1] if len(sys.argv) > 1 else infon# 提取發送的消息nmessage = .join(sys.argv[2:]) or Hello World!nn# 進行消息的發布,帶上了路由鍵nchannel.basic_publish(exchange=direct_logs,n routing_key=severity,n body=message)nprint " [x] Sent %r:%r" % (severity, message)nconnection.close()n
- receive_logs_direct.py
# coding=utf-8nimport pikanimport sysnimport timennconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nnchannel.exchange_declare(exchange=direct_logs,n type=direct)nnresult = channel.queue_declare(exclusive=True)nqueue_name = result.method.queuenn# 獲取綁定鍵nseverities = sys.argv[1:]nif not severities:n print >> sys.stderr, "Usage: %s [info] [warning] [error]" % n (sys.argv[0],)n sys.exit(1)nn# 一個隊列與一個交換機可以用多個綁定鍵進行綁定nfor severity in severities:n channel.queue_bind(exchange=direct_logs,n queue=queue_name,n routing_key=severity)nnprint [*] Waiting for logs. To exit press CTRL+Cnnndef callback(ch, method, properties, body):n print " [x] %r:%r" % (method.routing_key, body,)n time.sleep(6)nchannel.basic_consume(callback,n queue=queue_name,n no_ack=True)nnchannel.start_consuming()n
- 運行
接受消息
發布消息
第一個消費者綁定鍵為:"warning"和"error",第二個消費者的綁定鍵為:"warning"、"info"和"error"。發布者發布了兩條消息,第一條消息的路由鍵為"error",第二條消息的路由鍵為"info"。
- 疑難解惑
channel.queue_bind(exchange=direct_logs,n queue=queue_name,n routing_key=severity)n
再次重申一下:
綁定(binding)是指交換機(exchange)和隊列(queue)的關係。可以簡單理解為:這個隊列(queue)對這個交換機(exchange)的消息感興趣。
綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做綁定鍵(binding key)
另註:
推薦閱讀: