Tornado 非同步非阻塞淺析
[以下代碼基於 Tornado 3.2.1 版本講解]
[主要目標:講解 gen.coroutine、Future、Runner 之間的關係]這裡是示例運行代碼
#!/usr/bin/pythonn# coding: utf-8n"""nFile: demo.pynDate: 2017-08-28 22:59nDescription: demon"""nimport tornadonfrom tornado import gen, webn@gen.coroutinendef service_method():n raise gen.Return("abc")nclass NoBlockHandler(tornado.web.RequestHandler):n @web.asynchronousn @gen.coroutinen def get(self):n result = yield service_method()n self.write(result)n self.finish()nclass Application(tornado.web.Application):n def __init__(self):n settings = {n "xsrf_cookies": False,n }n handlers = [n (r"/api/noblock", NoBlockHandler),n ]n tornado.web.Application.__init__(self, handlers, **settings)nif __name__ == "__main__":n Application().listen(2345)n tornado.ioloop.IOLoop.instance().start()n
講解從 coroutine 修飾器入手,這個函數實現了簡單的非同步,它通過 generator 中的 yield 語句使函數暫停執行,將中間結果臨時保存,然後再通過 send() 函數將上一次的結果送入函數恢複函數執行。
def coroutine(func):n @functools.wraps(func)n def wrapper(*args, **kwargs):n future = TracebackFuture()n if callback in kwargs:n print("gen.coroutine callback:{}".format(kwargs[callback]))n callback = kwargs.pop(callback)n IOLoop.current().add_future(n future, lambda future: callback(future.result()))n try:n print("gen.coroutine run func:{}".format(func))n result = func(*args, **kwargs)n except (Return, StopIteration) as e:n result = getattr(e, value, None)n except Exception:n future.set_exc_info(sys.exc_info())n return futuren else:n if isinstance(result, types.GeneratorType):n def final_callback(value):n deactivate()n print("gen.coroutine final set_result:{}".format(value))n future.set_result(value)n print("gen.coroutine will Runner.run() result:{}".format(result))n runner = Runner(result, final_callback)n runner.run()n return futuren print("@@ gen.coroutine will set_result and return:{}".format(result))n future.set_result(result)n return futuren return wrappern
首先創建一個Future實例,然後執行被修飾的函數,一般函數返回的是一個生成器對象,接下來交由 Runner 處理,如果函數返回的是 Return, StopIteration 那麼表示函數執行完成將結果放入 future 中並 set_done() 返回。
下面是Future的簡版:
class Future(object):n def __init__(self):n self._result = Nonen self._callbacks = []n def result(self, timeout=None):n self._clear_tb_log()n if self._result is not None:n return self._resultn if self._exc_info is not None:n raise_exc_info(self._exc_info)n self._check_done()n return self._resultn def add_done_callback(self, fn):n if self._done:n fn(self)n else:n self._callbacks.append(fn)n def set_result(self, result):n self._result = resultn self._set_done()n def _set_done(self):n self._done = Truen for cb in self._callbacks:n try:n cb(self)n except Exception:n app_log.exception(Exception in callback %r for %r, cb, self)n self._callbacks = Nonen
在tornado中大多數的非同步操作返回一個Future對象,這裡指的是 Runner 中處理的非同步返回結果。我們可以將該對象抽象成一個佔位對象,它包含很多屬性和函數。一個 Future 對象一般對應這一個非同步操作。當這個對象的非同步操作完成後會通過 set_done() 函數去處理 _callbacks 中的回調函數,這個回調函數是在我們在做修飾定義的時候傳入 coroutine 中的。
下面的代碼是在 coroutine 中定義的,用來添加對非同步操作完成後的回調處理。
if callback in kwargs:n print("gen.coroutine callback:{}".format(kwargs[callback]))n callback = kwargs.pop(callback)n IOLoop.current().add_future(n future, lambda future: callback(future.result()))n
這裡是 IOLoop 中的 add_future 函數,它是來給 future 對象添加回調函數的。
def add_future(self, future, callback):n assert isinstance(future, Future)n callback = stack_context.wrap(callback)n future.add_done_callback(n lambda future: self.add_callback(callback, future))n
然後說 Runner 都做了什麼。在 3.2.1 版本中 Runner 的作用更重要一些。那麼 Runner() 的作用是什麼?
它主要用來控制生成器的執行與終止,將非同步操作的結果 send() 至生成器暫停的地方恢復執行。在生成器嵌套的時候,當 A 中 yield B 的時候,先終止 A 的執行去執行 B,然後當 B 執行結束後將結果 send 至 A 終止的地方繼續執行 A。
class Runner(object):n def __init__(self, gen, final_callback):n self.gen = genn self.final_callback = final_callbackn self.yield_point = _null_yield_pointn self.results = {}n self.running = Falsen self.finished = Falsen def is_ready(self, key):n if key not in self.pending_callbacks:n raise UnknownKeyError("key %r is not pending" % (key,))n return key in self.resultsn def set_result(self, key, result):n self.results[key] = resultn self.run()n def pop_result(self, key):n self.pending_callbacks.remove(key)n return self.results.pop(key)n def run(self):n try:n self.running = Truen while True:n next = self.yield_point.get_result()n self.yield_point = Nonen try:n print("gen.Runner.run() will send(next)")n yielded = self.gen.send(next)n print("gen.Runner.run() send(next) done.")n except (StopIteration, Return) as e:n print("gen.Runner.run() send(next) throw StopIteration or Return done.")n self.finished = Truen self.yield_point = _null_yield_pointn self.final_callback(getattr(e, value, None))n self.final_callback = Nonen returnn if isinstance(yielded, (list, dict)):n yielded = Multi(yielded)n elif isinstance(yielded, Future):n yielded = YieldFuture(yielded)n self.yield_point = yieldedn self.yield_point.start(self)n finally:n self.running = Falsen def result_callback(self, key):n def inner(*args, **kwargs):n if kwargs or len(args) > 1:n result = Arguments(args, kwargs)n elif args:n result = args[0]n else:n result = Nonen self.set_result(key, result)n return wrap(inner)n
實例化 Runner() 的時候將生成器對象和生成器執行結束時的回調函數傳入,然後通過 run() 函數去繼續執行生成器對象。
run() 函數的處理首先包了一層 while 循環,因為在生成器對象中可能包含多個 yield 語句。
yielded = self.gen.send(next)
,在第一次 send() 恢復執行的時候默認傳入 None ,因為函數第一次執行並沒有結果。然後將第二次執行的結果 yielded (返回的是一個 Future 對象),包裝成一個 YieldFuture 對象,然後通過 start() 函數處理:
def start(self, runner):n if not self.future.done():n self.runner = runnern self.key = object()n self.io_loop.add_future(self.future, runner.result_callback(self.key))n else:n self.runner = Nonen self.result = self.future.result()n
首先判斷 future 是否被 set_done(),如果沒有則註冊一系列回調函數,如果完成則保存結果,以供下一次恢復執行時將結果送入生成器。
在 Runner.run() 執行完成後此時的 coroutine 中的 future 對象已經是被 set_done 的,然後直接返回 future 對象,最後被 外層的 @web.asynchronous 修飾器消費。
推薦閱讀: