tornado如何實現非同步websocket推送?
在實現一個websocket的推送,主要就是後端推送pub/sub了redis的信息到前端。然而自己實現的推送開了兩個chrome tab就有一個一直pending,可我明明在redis阻塞的部分用了gen.coroutine,一直不明白。想請問一下如何改進?
主要實現代碼:
class MessageHandler(websocket.WebSocketHandler):
def check_origin(self, origin):
return True@gen.coroutine
def handler(self):def cb(it, callback):
try:
value = it.next()
except:
value = None
callback(value)conn = redis.StrictRedis(**REDIS_CONFIG)
ps = conn.pubsub()
ps.subscribe(channel)
while True:
item = yield gen.Task(cb, ps.listen())
print item
if item:
if item[type] == message:
self.write_message(item["data"])
else:
breakdef open(self):
print "New WebSocket connection"
ioloop.IOLoop.current().spawn_callback(self.handler)def on_message(self, message):
passdef on_close(self):
print("WebSocket close")
self.close()app = web.Application([
(r/ws, MessageHandler),
])if __name__ == __main__:
app.listen(8888)
ioloop.IOLoop.current().start()
&> 可我明明在redis阻塞的部分用了gen.coroutine
這樣並不能使阻塞操作變得不阻塞。你需要用 https://github.com/leporo/tornado-redis 這種東西,或者用線程池(tornado.gen.run_on_executor)。Comzyh解釋的很好。
首先要理解gen.coroutine的意義,這個裝飾器其實並不能讓阻塞代碼變成非阻塞,而是將本身需要非同步回調的代碼寫成同步風格。它並不能改變什麼,而是讓你可以換一種代碼風格增強代碼可讀性。整個tornado生態是基於它IOLoop的,所以不是拿到任何非同步庫都能直接使用,我們都知道非同步是基於事件的,而事件的通知管理則是依靠事件輪詢設施eventloop的它通常構建於select、epoll、kq,這個IOloop就是事件輪詢設施,一切不能被它接納的非同步庫都是無法集成到tornado的。所以如果你想把redis集成到tornado就必須是IOLoop能夠接納的非阻塞redis——https://pypi.python.org/pypi/tornado-redis/2.4.2
其實您的需求也不一定需要redis作為消息中間件這麼重型的解決方案,如果您尋求服務端通知,zeromq是個非常輕量化的解決方案。在下不才,自己開發過一套tornado集成zeromq的集群完整解決方案——GitHub - nikoloss/rose-dewdrop: websocket消息發布集群 RoseDewdrop is a solution of web socket cluster涵蓋tornado網關,消息發布中心,消息發布sdk三大核心組件,屆時如果有需要可以仿造sdk開發多語言的sdk。sdk給消息發布中心inform headquarter(ihq)下達主題消息,客戶端連接tornado可以發送訂閱某主題。性能還行,普通i5的機子在沒有tcp參數調優的情況下能扛起兩萬左右連接數。注釋中的todo,您如果有能力優化的話,相信性能又可以提升不少。 value = it.next()
這裡 it.next() 仍然是阻塞的,Tornado 的非同步模型應該是在你的 while True 裡面 item = yield 一個future 才對,雖然你 yield 了一個 Task,但是這個 Task 本身不是非同步的(等待的時候不能被打斷,不能將控制權交給IOLoop)。
如果你要非阻塞的listen我能想到的有三種方案- 使用非阻塞的 Redis 客戶端
- 一個 listen 開個線程,拿到數據塞到隊列里去,讓 Tornado 去 get,因為你要非同步 get, 要用 tornado 的 Queue,但是這個Queue 貌似本身不是線程安全的,所以你還要自己加鎖
- 用一個 Redis 連接 SUB 所有消息,然後分發,但是程序會阻塞在 SUBSCRIBE 上,你新來的WebSocket 連接都不能處理 open了
總的來說用個非阻塞的 Redis 客戶端比較靠譜,
比如這個例子:tornado-redis/app.py at master · leporo/tornado-redis · GitHub推薦閱讀: