RabbitMQ學習心得——工作隊列
一、實現目標
生產者會發布一些耗時的任務到工作隊列(Work Queue),讓多個工作者(Worker)去執行。
二、架構描述
工作隊列(又稱:任務隊列——Task Queues)是為了避免等待一些佔用大量資源、時間的操作。當我們把任務(Task)當作消息發送到隊列中,一個運行在後台的工作者(worker)進程就會取出任務然後處理。當你運行多個工作者(workers),任務就會在它們之間共享。
三、準備
- 在本地啟動rabbit-server
- 為了更好的表現耗時複雜的任務,我們用time.sleep()函數來模擬這種情況。我們在字元串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。比如"Hello..."就會耗時3秒鐘。
四、代碼實現
- new_task.py(用來發布任務)
# coding=utf-8nimport pikanimport sysnn# 連接伺服器nconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nn# 聲明隊列nchannel.queue_declare(queue=task_queue, durable=True)nn# 模擬任務的複雜度nmessage = .join(sys.argv[1:]) or "Hello World!"nchannel.basic_publish(exchange=,n routing_key=task_queue,n body=message,n properties=pika.BasicProperties(n delivery_mode=2, # make message persistentn ))nprint " [x] Sent %r" % (message,)nconnection.close()n
- worker.py(用來執行任務)
# coding=utf-8nimport pikanimport timennconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nnchannel.queue_declare(queue=task_queue, durable=True)nprint [*] Waiting for messages. To exit press CTRL+Cnnndef callback(ch, method, properties, body):n print " [x] Received %r" % (body,)n # 沉睡幾秒,模擬任務n time.sleep(body.count(.))n print " [x] Done"n # 給予消息的響應n ch.basic_ack(delivery_tag=method.delivery_tag)nn# 收到消息後,在沒做出響應之前,不要再給我發消息,即acknchannel.basic_qos(prefetch_count=1)nchannel.basic_consume(callback,n queue=task_queue)nnchannel.start_consuming()n
- 運行
打開三個終端,一個用來運行new_task.py(發布任務),另兩個用來運行worker.py(用來執行任務)。
首先打開兩個終端運行worker.py,等待執行任務。
再打開一個終端用來運行new_task.py,發布任務。(運行時要加參數)
我們發布了7條複雜度不一樣的任務,看一下任務的執行情況:可以看到任務被執行。默認來說,RabbitMQ會按順序得把消息發送給每個消費者(consumer)。平均每個消費者都會收到同等數量得消息。這種發送消息得方式叫做——輪詢(round-robin)。試著添加三個或更多得工作者(workers)。
- 疑難解惑
def callback(ch, method, properties, body):n print " [x] Received %r" % (body,)n # 沉睡幾秒,模擬任務n time.sleep(body.count(.))n print " [x] Done"n # 給予消息的響應n ch.basic_ack(delivery_tag=method.delivery_tag)nn# 收到消息後,在沒做出響應之前,不要再給我發消息,即acknchannel.basic_qos(prefetch_count=1)nchannel.basic_consume(callback,n queue=task_queue)n
在上一篇教程中我們弱化了對「no_ack參數」的講解。在Hello World工作方式中我們的代碼如下:
channel.basic_consume(callback,n queue=hello,n no_ack=True)n
這個參數代表消息響應,消息響應默認是開啟的(no_ack=False)。在Hello World工作方式中我們把它關閉了。在關閉的狀態下,消息被RabbitMQ發送給消費者(consumers)之後,馬上就會在內存中移除。這對於前面的例子沒什麼影響。但對於工作隊列來說,意義非同凡響。設想當RabbitMQ把一個任務分配給一個Worker去執行,而這個Worker在執行一半時掛掉了,任務未完成。而此時RabbitMQ已經把任務從內存中移除了。這是多麼的操蛋呀。
而消息響應開啟後,消費者會通過一個ack響應
ch.basic_ack(delivery_tag=method.delivery_tag)n
告訴RabbitMQ已經收到並處理了某條消息,然後RabbitMQ就會釋放並刪除這條消息。如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然後重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失消息。(可以驗證一下,運行兩個worker.py,在一個worker.py收到耗時較長的消息後立馬關閉它,會發現RabbitMQ會把這個任務重新分發給另一個worker。)
# 聲明隊列nchannel.queue_declare(queue=task_queue, durable=True)n
關於聲明隊列,參數的問題,在RabbitMQ簡介(下)中有介紹。Durable這個參數代表把隊列聲明為持久化。這時候,我們就可以確保在RabbitMq重啟之後queue_declare隊列不會丟失。另外,我們需要把我們的消息也要設為持久化——將delivery_mode的屬性設為2。如下所示:
channel.basic_publish(exchange=,n routing_key=task_queue,n body=message,n properties=pika.BasicProperties(n delivery_mode=2, # make message persistentn ))n
# 公平調度nchannel.basic_qos(prefetch_count=1)n
當我們注釋掉這段代碼後,會發現兩個工作者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。沒有實現公平調度。我們可以使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,在同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息並且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker)。
推薦閱讀:
※day9-哈夫曼樹
※Python 遠程訪問系列之分散式RPC(上)
※selenium爬蟲被檢測到 該如何破?
※零基礎小白學編程多久能達到接私活的水平?