Python操作rabbitmq系列(三):多個接收端消費消息
05-11
接著上一章。這一章,我們要將同一個消息發給多個客戶端。這就是發布訂閱模式。直接看代碼:
發送端:
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))channel = connection.channel()# 原則上,消息,只能有交換機傳到隊列。就像我們家裡面的交換機道理一樣。
# 有多個設備連接到交換機,那麼,這個交換機把消息發給那個設備呢,就是根據# 交換機的類型來定。類型有:direct opicheadersfanout# fanout:這個就是,所有的設備都能收到消息,就是廣播。# 此處定義一個名稱為logs的fanout類型的exchangechannel.exchange_declare(exchange=logs, exchange_type=fanout)# 將消息發送到名為log的exchange中# 因為是fanout類型的exchange,所以無需指定routing_keymessage = .join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange=logs,
routing_key=, body=message)print(" [x] Sent %r" % message)connection.close()接收端:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))channel = connection.channel()# 這裡需要和發送端保持一致(習慣和要求)channel.exchange_declare(exchange=logs,exchange_type=fanout)
# 類似的,比如log,我們其實最想看的,當連接上的時刻到消費者退出,這段時間的日誌# 有些消息,過期了的對我們並沒有什麼用# 並且,一個終端,我們要收到隊列的所有消息,比如:這個隊列收到兩個消息,一個終端收到一個。# 我們現在要做的是:兩個終端都要收到兩個# 那麼,我們就只需做個臨時隊列。消費端斷開後就自動刪除result = channel.queue_declare(exclusive=True)# 取得隊列名稱queue_name = result.method.queue# 將隊列和交換機綁定一起channel.queue_bind(exchange=logs,
queue=queue_name)print( [*] Waiting for logs. To exit press CTRL+C)def callback(ch, method, properties, body): print(" [x] %r" % body)# no_ack=True:此刻沒必要回應了channel.basic_consume(callback, queue=queue_name,no_ack=True)
channel.start_consuming()效果:
推薦閱讀: