tornado, pika, async/await 以及 asynchronous iterator
本文首發於: md-blog/pika-and-async.md at master · lexdene/md-blog · GitHub
pika 是一個非同步的 amqp 庫, 我用它來連接 rabbitmq 並非同步接收一些消息.
callbacks
它的代碼看起來是這樣的(節選):
class Consumer: def connect(self): self._connection = pika.adapters.TornadoConnection( pika.URLParameters(self._server_url), self._on_connection_open ) def _on_connection_open(self, *argv): self._channel = self._connection.channel( on_open_callback=self._on_channel_open ) def _on_channel_open(self, *argv): self._channel.exchange_declare( self._on_exchange_declare_ok, self._exchange, self._exchange_type, durable=True, ) def _on_exchange_declare_ok(self, *argv): self._channel.queue_declare( self._on_queue_declare_ok, self._queue, durable=True, ) def _on_queue_declare_ok(self, method_frame): self._channel.queue_bind( self._on_bind_ok, queue=self._queue, exchange=self._exchange, routing_key=self._routing_key, ) def _on_bind_ok(self, method_frame): self._consumer_tag = self._channel.basic_consume( self._on_message, self._queue, ) def _on_message(self, channel, deliver, properties, body): body = body.decode(properties.content_encoding) data = json.loads(body) # do something with data channel.basic_ack(deliver.delivery_tag)
大致分為 5 個步驟:
- 連接
- 開啟 channel
- 聲明 queue
- 綁定 queue
- 接收消息
每一步都靠 callback 連接. 如果加上各種處理關閉以及處理錯誤, 那滿屏都會是 callback .
coroutine
試想一下, 如果代碼變成這樣:
class Consumer: @coroutine def connect(self, url, exchange, exchange_type, queue, routing_key): connection = yield AsyncConnection.connect(url) channel = yield connection.channel() yield channel.exchange_declare( exchange=exchange, exchange_type=exchange_type, durable=True, ) yield channel.queue_declare( queue=queue, durable=True, ) yield channel.queue_bind( queue=queue, exchange=exchange, routing_key=routing_key, ) channel.channel.basic_consume( self._on_message, queue, ) def _on_message(self, channel, deliver, properties, body): body = body.decode(properties.content_encoding) data = json.loads(body) # do something with data channel.basic_ack(deliver.delivery_tag)
雖然 _on_message 依然是個 callback , 但是整體有沒有清晰好多?
在 tornado 中, 我們可以靠 coroutine 來實現 AsyncConnection. 只需要在每個函數中返回一個 future 對象, 並正確處理它的 result , 就可以了.
AsyncConnection 代碼如下:
class AsyncConnection: similar to TornadoConnection but every method returns a Future object def __init__(self, conn): self.conn = conn @classmethod def connect(cls, url): f = Future() def on_open(conn): f.set_result(cls(conn)) def on_open_error(conn, err): f.set_exception(AMQPConnectionError(err)) TornadoConnection( URLParameters(url), on_open_callback=on_open, on_open_error_callback=on_open_error, ) return f def channel(self): f = Future() def on_open(channel): f.set_result(AsyncChannel(channel)) _channel = self.conn.channel(on_open_callback=on_open) return fclass AsyncChannel: similar to pika.channel.Channel but every method returns a Future object def __init__(self, channel): self.channel = channel def exchange_declare(self, *argv, **kwargs): return Task(self.channel.exchange_declare, *argv, **kwargs) def queue_declare(self, *argv, **kwargs): return Task(self.channel.queue_declare, *argv, **kwargs) def queue_bind(self, *argv, **kwargs): return Task(self.channel.queue_bind, *argv, **kwargs)
async 與 await
在 Python 3.5 中,引入了 2 個新的關鍵字: async 和 await. 這比使用 coroutine 和 yield, 在語義上更清晰.
connect 函數可以改寫如下:
async def connect(self, url, exchange, exchange_type, queue, routing_key): connection = await AsyncConnection.connect(url) channel = await connection.channel() await channel.exchange_declare( exchange=exchange, exchange_type=exchange_type, durable=True, ) await channel.queue_declare( queue=queue, durable=True, ) await channel.queue_bind( queue=queue, exchange=exchange, routing_key=routing_key, ) channel.channel.basic_consume( self._on_message, queue, )
asynchronous iterator
目前只剩下 _on_message 仍然是個 callback, 能不能把它也用 future 來實現呢?
不能. 因為一個 future 只能被設置 result 一次. 而 _on_message 會在每次有數據到達的時候被調用.
在 tornado 以及其它幾個非同步庫中, 一個 future 被多次設置 result 是一個 undefined behavior
那麼有沒有其它辦法可以解決呢?
有的. 在 Python 3.5.2 中,我們可以使用 asynchronous iterator 來實現 _on_message .
其實 Python 3.5 中就有 asynchronous iterator, 不過 Python 3.5.2 的時候, 它的使用方法發生了更改(見 pep 492). 這裡我們以 Python 3.5.2 中的用法為準
調用的地方使用 async for 來非同步迭代:
async def consume(self, channel, queue): async for data in channel.data(queue): # do something with data
那麼如何實現 channel.data() 呢?
class AsyncChannel: # 剛才寫過的那些函數省略 def data(self, queue): return AsyncData(self.channel, queue)class AsyncData: an async data consumer which implements __aiter__ for asynchronous iterator. def __init__(self, channel, queue): self.channel = channel self.queue = queue self._future = None self.channel.basic_consume( self._on_message, self.queue ) def _on_message(self, channel, deliver, properties, body): body = body.decode(properties.content_encoding) data = json.loads(body) if self._future: self._future.set_result(data) channel.basic_ack(deliver.delivery_tag) def __aiter__(self): return self async def __anext__(self): self._future = Future() data = await self._future self._future = None return data
demo
consumer.py
全文完
推薦閱讀:
※知乎在Tornado開發中的Database Driver是什麼?
※tornado 裡面如何使用Future中的數據?
※對於Tornado非同步庫使用的困惑?
※知乎為什麼要選擇用Tornado做為web開發框架,非同步非阻塞模式在此起到了作用?
※python多進程數量對應核數?