Python操作rabbitmq系列(二):多個接收端消費消息
今天,我們要逐步開始討論rabbitmq稍微高級點的耍法了。了解這一步,對我們設計高並發的系統非常有用。當然,還可以使用kafka。不過還是算了,有幾個硬性條件不支持,還是用rabbitmq吧。
循環分發:
啟動一個發送端往隊列發消息,此時啟動多個接收端。發送的消息會對接收端一個一個挨著發送消息。如圖:
這就是默認情況下,多個接收端輪流消費消息。隊列發送給消費端後,就立即刪除。那麼問題來了,當某個消費者在處理消息的時候,異常終止了怎麼辦?此時,我們更希望這樣:若是那個消費者掛掉了,消息自動轉給另一個消費者處理。
幸好,rabbitmq就有效確認機制。消費者收到消息後,正常處理完成,此時才通知隊列可以自由刪除。那麼問題又來了,消費者掛掉了連確認消息都發不出,該怎麼辦?rabbitmq維持了消費者的連接信息。消費者掛掉,與server的連接通道會關閉或tcp連接丟失。這時server知道了這個情況,就自動重發消息。
這裡還有個問題,就是server掛掉了怎麼辦?注意: durable=True。這個就是,當server掛了隊列還存在。delivery_mode=2:server掛了消息還存在。若是保證消息不丟,這兩個參數都要設置。
發送端:
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))channel = connection.channel()# durable:server掛了隊列仍然存在channel.queue_declare(queue=task_queue, durable=True)# 使用默認的交換機發送消息。exchange為空就使用默認的。delivery_mode=2:使消息持久化。和隊列名稱綁定routing_keymessage = .join(sys.argv[1:]) or "Hello World!"channel.basic_publish(exchange=,routing_key=task_queue,
body=message, properties=pika.BasicProperties( delivery_mode=2, ))print(" [x] Sent %r" % message)connection.close()接收端:
import pika
import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))channel = connection.channel()
channel.queue_declare(queue=task_queue, durable=True)print( [*] Waiting for messages. To exit press CTRL+C)def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b.)) print(" [x] Done") # 手動對消息進行確認 ch.basic_ack(delivery_tag=method.delivery_tag)公平派遣:
此刻,我們已經知道如何保證消息不丟,那麼問題又來了。有的消費幹得快,有的幹得慢。這樣分發消息,有的累死有的沒事幹。這個問題如何解決?
rabbitmq已經考慮到了。那就是:那個幹完了,通知給server,server就發送給那個。
在上面的接收端的
channel.basic_consume(callback, queue=task_queue)
代碼前加:
channel.basic_qos(prefetch_count=1)
即可
現在,我們的消息都是一個給一個消費者。接下來,我們要討論,向多個消費者發送相同的消息。
推薦閱讀:
※與RabbitMQ結合的「吐槽」抓取
※RabbitMQ學習心得——發布/訂閱(中)
※Python操作rabbitmq系列(四):根據類型訂閱消息
※beanstalk和rabbitmq區別?
※RabbitMQ ACK 機制的意義是什麼?