標籤:

RabbitMQ學習心得——遠程過程調用(RPC)

一、RPC

遠程過程調用(英語:Remote Procedure Call,縮寫為 RPC)是一個計算機通信協議。該協議允許運行於一台計算機的程序調用另一台計算機的子程序,而程序員無需額外地為這個交互作用編程。如果涉及的軟體採用面向對象編程,那麼遠程過程調用亦可稱作遠程調用遠程方法調用

二、實現目標

在工作隊列模式中介紹了如何使用工作隊列(work queue)在多個工作者(woker)中間分發耗時的任務。如果我們需要將一個函數運行在遠程計算機上並且等待從那兒獲取結果,這種模式通常被稱為遠程過程調用(Remote Procedure Call)或者RPC。

為了更好地描述這個問題,我們會使用RabbitMQ來構建一個RPC系統:包含一個客戶端和一個RPC伺服器。我們創建一個模擬RPC服務來返回斐波那契數列。(模擬耗時任務)。

三、回調隊列

一般來說通過RabbitMQ來實現RPC是很容易的。一個客戶端發送請求信息,伺服器端將其應用到一個回複信息中。為了接收到回複信息,客戶端需要在發送請求的時候同時發送一個回調隊列(callback queue)的地址(思考一下為什麼要這麼做?)。實現如下:

result = channel.queue_declare(exclusive=True)ncallback_queue = result.method.queuennchannel.basic_publish(exchange=,n routing_key=rpc_queue,n properties=pika.BasicProperties(n reply_to = callback_queue,n ),n body=request)n

消息屬性

AMQP協議給消息預定義了一系列的14個屬性。大多數屬性很少會用到,除了以下幾個:

  • delivery_mode(投遞模式):將消息標記為持久的(值為2)或暫存的(除了2之外的其他任何值)。工作隊列模式中有介紹。
  • content_type(內容類型):用來描述編碼的mime-type。例如在實際使用中常常使用application/json來描述JOSN編碼類型。
  • reply_to(回複目標):通常用來命名回調隊列。
  • correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。

四、架構描述

  • RPC工作步驟如下:

  1. 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。

  2. 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。

  3. 將請求發送到一個 rpc_queue 隊列中。

  4. RPC工作者(又名:伺服器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作並且將帶有執行結果的消息發送給reply_to欄位指定的隊列。

  5. 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查correlation_id屬性。如果此屬性的值與請求匹配,將它返回給應用。
  • 關聯標識

在實際使用中,客戶端肯定會有多個RPC請求,為了區分這樣請求。我們可能會想到給每個RPC請求新建一個回調隊列(這樣請求與回復一一對應),但是這樣的話很佔資源。

correlation_id的引入使這一情況得到解決。他允許每個客戶端只建立一個獨立的回調隊列,然後給每個請求設置一個獨一無二的值。稍後,當我們從回調隊列中接收到一個消息的時候,我們就可以查看這條屬性從而將響應和請求匹配起來。如果我們接手到的消息的correlation_id是未知的,那就直接銷毀掉它,因為它不屬於我們的任何一條請求。

五、代碼實現

  • rpc_client.py

# coding=utf-8nimport pikanimport uuidnnnclass FibonacciRpcClient(object):n def __init__(self):n self.connection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))n self.channel = self.connection.channel()n # 聲明一個持久性的回調隊列n result = self.channel.queue_declare(exclusive=True)n self.callback_queue = result.method.queuenn # 等待從回調隊列中取伺服器返回的消息n self.channel.basic_consume(self.on_response, no_ack=True, # 不用ack確認n queue=self.callback_queue)nn def on_response(self, ch, method, props, body):n if self.corr_id == props.correlation_id: # 驗證碼核對n self.response = bodynn def call(self, n):n self.response = Nonen self.corr_id = str(uuid.uuid4())n self.channel.basic_publish(exchange=,n routing_key=_rpc_queue,n properties=pika.BasicProperties(n reply_to=self.callback_queue, # 發送返回信息的隊列namen correlation_id=self.corr_id, # 發送uuid 相當於驗證碼n ),n body=str(n))n while self.response is None:n self.connection.process_data_events() # 非阻塞版的start_consumingn return int(self.response)nnfibonacci_rpc = FibonacciRpcClient()nnprint " [x] Requesting fib(30)"nresponse = fibonacci_rpc.call(30)nprint " [.] Got %r" % (response,)n

  • rpc_server.py

# coding=utf-8nimport pikannconnection = pika.BlockingConnection(pika.ConnectionParameters(n host=localhost))nchannel = connection.channel()nnchannel.queue_declare(queue=_rpc_queue)nnndef fib(n):n if n == 0:n return 0n elif n == 1:n return 1n else:n return fib(n-1) + fib(n-2)nnndef on_request(ch, method, props, body):n n = int(body)nn print " [.] fib(%s)" % (n,)n response = fib(n)nn ch.basic_publish(exchange=,n routing_key=props.reply_to,n properties=pika.BasicProperties(correlation_id = n props.correlation_id),n body=str(response))n ch.basic_ack(delivery_tag = method.delivery_tag)nnchannel.basic_qos(prefetch_count=1)nchannel.basic_consume(on_request, queue=_rpc_queue)nnprint " [x] Awaiting RPC requests"nchannel.start_consuming()n

  • 運行

先運行server端(等待處理請求):

再運行client端(發送請求),並得到結果:

server端接到任務:

可以看到客戶端發了一個計算30的斐波那契數列的任務,服務端進行處理並返回了結果。

  • 疑難解惑

重點講解一下客戶端的代碼:

  1. 建立連接、通道並且為回復(replies)聲明獨享的回調隊列。

  2. 我們訂閱這個回調隊列,以便接收RPC的響應。

  3. 「on_response」回調函數對每一個響應執行一個非常簡單的操作,檢查每一個響應消息的correlation_id屬性是否與我們期待的一致,如果一致,將響應結果賦給self.response,然後跳出consuming循環。

  4. 接下來,我們定義我們的主要方法 call 方法。它執行真正的RPC請求。

  5. 在這個方法中,首先我們生成一個唯一的 correlation_id 值並且保存起來,on_response回調函數會用它來獲取符合要求的響應。

  6. 接下來,我們將帶有 reply_to 和 correlation_id 屬性的消息發布出去。

  7. 最後,將響應返回給用戶。

我們可以看到客戶端和服務端都既是訂閱者,也是發布者。那麼對於客戶端的rpc請求,如果 RPC伺服器運行的過慢。那麼我們可以類似「工作隊列」模式,運行多個server進行請求的處理。


推薦閱讀:

與RabbitMQ結合的「吐槽」抓取
Rabbitmq 和 Celery 是怎樣工作的?
beanstalk和rabbitmq區別?
RabbitMQ學習心得——發布/訂閱(中)

TAG:RabbitMQ | Python |