Python操作rabbitmq系列(二):多個接收端消費消息

今天,我們要逐步開始討論rabbitmq稍微高級點的耍法了。了解這一步,對我們設計高並發的系統非常有用。當然,還可以使用kafka。不過還是算了,有幾個硬性條件不支持,還是用rabbitmq吧。

循環分發:

啟動一個發送端往隊列發消息,此時啟動多個接收端。發送的消息會對接收端一個一個挨著發送消息。如圖:

這就是默認情況下,多個接收端輪流消費消息。隊列發送給消費端後,就立即刪除。那麼問題來了,當某個消費者在處理消息的時候,異常終止了怎麼辦?此時,我們更希望這樣:若是那個消費者掛掉了,消息自動轉給另一個消費者處理。

幸好,rabbitmq就有效確認機制。消費者收到消息後,正常處理完成,此時才通知隊列可以自由刪除。那麼問題又來了,消費者掛掉了連確認消息都發不出,該怎麼辦?rabbitmq維持了消費者的連接信息。消費者掛掉,與server的連接通道會關閉或tcp連接丟失。這時server知道了這個情況,就自動重發消息。

這裡還有個問題,就是server掛掉了怎麼辦?注意: durable=True。這個就是,當server掛了隊列還存在。delivery_mode=2:server掛了消息還存在。若是保證消息不丟,這兩個參數都要設置。

發送端:

import pika

import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))

channel = connection.channel()

# durable:server掛了隊列仍然存在

channel.queue_declare(queue=task_queue, durable=True)

# 使用默認的交換機發送消息。exchange為空就使用默認的。delivery_mode=2:使消息持久化。和隊列名稱綁定routing_key

message = .join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange=,

routing_key=task_queue,

body=message,

properties=pika.BasicProperties(

delivery_mode=2,

))

print(" [x] Sent %r" % message)

connection.close()

接收端:

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))

channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)

print( [*] Waiting for messages. To exit press CTRL+C)

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

time.sleep(body.count(b.))

print(" [x] Done")

# 手動對消息進行確認

ch.basic_ack(delivery_tag=method.delivery_tag)

# basic_consume:這個函數有no_ack參數。該參數默認為false。表示:需要對message進行確認。怎麼理解:no設置成false,表示要確認

channel.basic_consume(callback, queue=task_queue)

channel.start_consuming()

公平派遣:

此刻,我們已經知道如何保證消息不丟,那麼問題又來了。有的消費幹得快,有的幹得慢。這樣分發消息,有的累死有的沒事幹。這個問題如何解決?

rabbitmq已經考慮到了。那就是:那個幹完了,通知給server,server就發送給那個。

在上面的接收端的

channel.basic_consume(callback, queue=task_queue)

代碼前加:

channel.basic_qos(prefetch_count=1)

即可

現在,我們的消息都是一個給一個消費者。接下來,我們要討論,向多個消費者發送相同的消息。


推薦閱讀:

與RabbitMQ結合的「吐槽」抓取
RabbitMQ學習心得——發布/訂閱(中)
Python操作rabbitmq系列(四):根據類型訂閱消息
beanstalk和rabbitmq區別?
RabbitMQ ACK 機制的意義是什麼?

TAG:RabbitMQ | 消息隊列 | 高並發 |