深入python協程的實現,帶你一層一層揭開協程的神秘面紗!
首圖來源 @方正
就在三天前,如果你問我協程是什麼,我可能只會籠統的回答,協程是一種非同步執行方式。使用python yield來實現,如果你還要深入詢問,怎麼實現,底層到底發生了什麼,我可能就懵逼了。但是今天,在閱讀完tornado部分源碼,並自己實現了一套事件循環之後。我就敢大聲的喊出那麼一句,我好像懂了!夠自信么?這是應該的。畢竟就在前天夜裡思考如何實現基於yield from的事件循環這件事的時候,我失眠到下半夜,裝個小逼也算是情有可原吧。
最早知道協程這個名詞大約是在一年半前,我剛學了不到1年python,在一次面試中被面試官問到,當時我的表情是這樣的:
後來去查了一下百度百科:
協程與子常式一樣,協程(coroutine)也是一種程序組件。相對子常式而言,協程更為一般和靈活,但在實踐中使用沒有子常式那樣廣泛。協程源自 Simula 和 Modula-2 語言,但也有其他語言支持。
看完之後,我的表情是這樣的:
用專業辭彙解釋專業辭彙,相當於沒說,百度百科一慣如此。
再後來,因為工作圈子的問題,就沒有再了解過協程相關的知識。在我之後寫爬蟲的這一年多時間裡,我查看scrapy源碼時試圖查看twisted的源碼,但感覺太難直接放棄。這可能是我最早的一次嘗試查看非同步代碼。
到了2017年,聽說了asyncio這個包。我認為我這個人有時候很不理性,總以為自己很屌能似的在對框架整體都沒啥概念時就去查看別人寫的源碼,所以撞南牆也是我自己活該。asyncio貌似部分代碼使用c實現,對於我這種半路出家的盧瑟遇到c相關的東西直接gg。所以再次放棄。
如果非要說我的協程啟蒙教育是從什麼時候開始的,我會說應該是在我讀了<<流暢的python>>這本書之後。裡面有一章關於協程的講解,隨後還講解了asyncio,受益匪淺。
當時讓我明白了什麼叫委派生成器,為什麼一堆生成器套在一起就算是一個協程應用了。但是我始終存在這幾個盲點,關於sleep以及真正的io阻塞是怎麼實現的。對於io這一塊我知道底層肯定是使用了io多路復用。但是究竟是怎麼藉助一個事件循環讓其跑起來的呢?我百撕不得騎姐。
之後,也就是前兩天吧,我閑的蛋疼,不知道觸碰到哪根神經了,突然想看tornado的源碼。這個大名鼎鼎的web非同步框架,據說是使用了協程實現。等等,為什麼用「據說」二字?那是因為在此之前我的電腦從來沒有運行過pip install tornado。作為一個爬蟲專(cai)家(niao),我不否認我之前看過很多框架的源碼,甚至包括一些web框架。什麼django,bottle,flask,但是因為平時工作中我很少寫web,所以看過就忘了。實踐是檢驗真理的唯一標準,在web這方面,我最缺乏的就是實踐。
沒有使用tornado寫過一行代碼的我,一上手就直接開始看tonado源碼。
下面先簡單介紹一下tornado裡面的協程實現。這一部分不想看的可以不看,直接跳到重點,tornado為了兼容多個python版本,實現起來可能比較複雜。
事件循環
tornado中的事件循環,本質上使用io多路復用實現。io多路復用的最主要作用就是為了喚醒事件循環。事件循環可以通過監聽文件描述符來喚醒,所以tornado就自己創建一個管道。監聽管道輸出(READ),並設置了超時時間,此時epoll會在超時時間內掛起,如果有callback加入。則通過向管道寫入任意位元組喚醒,這樣就相當於實現了一個可喚醒的阻塞。喚醒之後,會依次執行callback。然後執行timeouts中的callback,timeouts使用堆來保證時間最近的在最上面。
以一個簡單的調用為例來說明
import tornado.ioloopfrom tornado.gen import coroutinefrom tornado.concurrent import Future@coroutinedef asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) tornado.ioloop.IOLoop.instance().add_callback(callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result))def main(): future = asyn_sum(2,3) tornado.ioloop.IOLoop.instance().start()main()
程序開始,定義了一個asyn_sum協程,在main函數中第一行調用了該協程
同時tornado.gen.coroutine中調用asyn_sum生成器,獲取第一個yield值也就是個future,並創建了一個Runner對象
try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) # 這一行所示 if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( stack_context inconsistency (probably caused by yield within a "with StackContext" block)))except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e))except Exception: future.set_exc_info(sys.exc_info())else: _futures_to_runners[future] = Runner(result, future, yielded)
Runner通過__init__調用了其方法handle_yield,並檢查future的完成情況,很明顯在第一個代碼塊中`tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)`剛被加入事件循環,此時事件循環還未啟動,所以callback中的`future.set_result(a+b)`並未被調用,因此future並未done。
# handle_yieldif not self.future.done() or self.future is moment: def inner(f): # Break a reference cycle to speed GC. f = None # noqa self.run() self.io_loop.add_future( self.future, inner) return Falsereturn True
讓我們把注意力移到`self.io_loop.add_future(self.future, inner)`這麼代碼中,這行代碼將inner在future完成之後加入到了事件循環的callbacks中,由下面的代碼可以的看出
def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
future在開啟事件循環後馬上就會完成,因此,隨後就會調用`self.add_callback(callback, future)`這段代碼將inner加入事件循環,繼而調用了self.run()
run會獲取future的結果,同時發送結果給協程里的生成器
orig_stack_contexts = stack_context._state.contextsexc_info = Nonetry: value = future.result()except Exception: self.had_exception = True exc_info = sys.exc_info()future = Noneif exc_info is not None: try: yielded = self.gen.throw(*exc_info) finally: # Break up a reference to itself # for faster GC on CPython. exc_info = Noneelse: yielded = self.gen.send(value)
相應的,結果就被賦值給了main中的`result = yield future`,繼而結束了整個協程的調用。
future對象
future對象可以當成一次非同步調用的代理人,非同步調用在創建後,加入事件循環中。隨著事件循環進行,非同步調用被執行了,執行一結束,代理人馬上獲取結果,並置done=True,同時代理人會將非同步調用後的全部回調函數執行,其中的一個回調函數就包括Runner.run,其作用是將非同步調用結果賦值給yield左邊的變數,同時繼續執行接下來的代碼,直到下一個yield出現。
對嵌套協程的調用
首先要清楚子協程返回的也一個future對象,因為這個future還沒有完成,返回對於yield左邊的變數會阻塞住。當子協程完成時,通過raise Return或者StopIteration的方式通過一個異常來將結果值傳遞出來,並為result_future也就是子協程的future進行set_result,子協程future完成後,父協程繼續。
總結
對於`a = yield b`這種結構。b是一個Future,那麼a的最終結果值為Future.result(),對於b,若是一個生成器,將其變成協程的方法只是實現了操控生成器在返回一個Future對象的同時,將生成器中每次yield都變成非同步執行。
以上是tornado整個事件循環以及調用的大致流程,因為tornado被要求兼容python2,而python2沒有實現yield from,所以tornado的機制和asyncio的實現機制會不相同。
所以我就想,我能否使用yield from 實現一個事件循環繼而實現一個協程呢。
重點來了
我對tornado的事件循環以及Future對象進行了一些改造,基於python3.3之後的yield from 實現了一個簡單的事件循環及相關方法。算是一個小的協程demo框架吧。
代碼參見:
https://github.com/ShichaoMa/toolkit/blob/master/toolkit/event_loop.py設計思路
EventLoop和torando的IOLoop一樣,使用IO多路復用來阻塞循環,使用一個Waker來喚醒EventLoop,這點我是照抄的torando,不同之處在於EventLoop中沒有回調函數列表,而是擁有一個協程字典。
協程字典是由一堆協程對象作為key組成的。協程類繼承於Torando::Future,本質上協程對象也是一個Future對象,不過更進一步的是,他同時還是一個生成器代理。key對應的value為每次yield返回的值。
協程類是通過一個裝飾器創建的,代碼如下:
def coroutine(func): def __init__(self, *args, **kwargs): super(self.__class__, self).__init__() self.gen = func(*args, **kwargs) def send(self, result): try: return self.gen.send(result) except (GeneratorExit, StopIteration) as e: self.set_result(e.args[0]) raise def throw(self, typ, val=None, tb=None): return self.gen.throw(typ, val=None, tb=None) def __iter__(self): return self.gen def __await__(self): yield from self cls = type(func.__name__, (Future, Coroutine, Iterable), {"__init__": __init__, "send": send, "throw": throw, "__iter__": __iter__, "__await__": __await__}) return cls
和當IOLoop一樣,EventLoop開始事件循環後,每當有協程對象加入到事件循環中,都會被喚醒,執行循環,循環會處理監聽到的文件描述符,調用其handler,處理timout對象。接下來處理的協程字典中的協程是IOLoop所沒有的操作。
處理協程的方法如下:
def process_coroutine(self, coroutine, yielded): while True: try: # 如果當前yield值為Future對象,則判斷Future是否完成,進而調用send方法髮結果,否則中斷本協程的進行。 # 如果當前yield值不是Future對象, 直接send當前值。 # 如果期間發生協程運行完畢,則刪除該協程。 if isinstance(yielded, Future): if yielded.done(): yielded = self.coroutines[coroutine] = coroutine.send(yielded.result()) else: break else: yielded = self.coroutines[coroutine] = coroutine.send(yielded) except (GeneratorExit, StopIteration): del self.coroutines[coroutine] break
為什麼要這麼實現呢?
假設我們現在擁有一個簡單的協程
demo0
# 通過coroutine裝飾器,該協程會被變成一個協程子類。@coroutinedef normal(): a = yield 1 print("Get a: %s" % a) b = yield 2 print("Get b: %s" % b) c = yield 3 print("Get c: %s" % c) return ccoroutine0 = normal() # 創建一個協程對象loop = EventLoop() # 創建事件循環loop.run_until_complete(coroutine0)# 開啟事件循環直到完成print(coroutine0.result()) # 列印輸出
以上代碼大致會經過如下的操作:
- 創建一個協程對象
- 創建事件循環
- 將協程對象添加到協程字典中
- 開啟事件循環
- 事件循環運行到process_coroutine,此時yielded在process_coroutine中由倒數第4行代碼依次變成1, 2, 3(因為協程中的yield返回的不是future類型),並通過send方法賦值給了a, b, c。直到最後拋出StopIteration。並刪除了該協程
- 運行結束result方法可以調用了返回return值
通過這個簡單的協程我們大致了解了整個程序的運行流程。但這種協程並沒有什麼卵用,而且把一件簡單的事情弄的非常複雜。接下來再更進一步介紹一個相對複雜的協程。
我們先看一個阻塞函數:
def sleep(seconds): future = Future() def callback(): future.set_result(None) EventLoop().call_at(time.time() + seconds, callback) yield future
這個函數和tornado的sleep函數差不多,函數創建了一個future對象,並在EventLoop中添加了一個Timeout對象,Timeout對象會在指定時間後後被事件循環調用其callback。在這裡,被callback之後,future被set_result,也就是說Future.done()變為了True(記住這一點,後面會用到,很重要)。
下面再寫一個複雜一點的的協程
demo1
@coroutinedef sum(a, b): print("Sum start. %s + %s" % (a, b)) yield from sleep(1) result = yield a + b print("Sum stop. %s + %s" % (a, b)) return resultcoroutine1 = sum(1, 2) # 創建一個協程對象loop = EventLoop() # 創建事件循環loop.run_until_complete(coroutine1)# 開啟事件循環直到完成print(coroutine1.result()) # 列印輸出
以上代碼大致會經過如下的操作:
- 創建一個協程對象
- 創建事件循環
- 將協程對象添加到協程字典中
- 開啟事件循環
- 事件循環運行到process_coroutine,sleep返回一個Future對象,並且調用其done方法發現沒有完成。於是跳出循環,回到事件循環。
- 事件循環會獲取timeouts中所有Timeout對象的最小延遲時長並做為超時時長掛起。這時候整個世界都停止了。
- 等超時之後,處理timeouts中Timeout對象,發現sleep結束了,調用其callback,future對象這時被置為完成。
- 事件循環繼續前進到達process_coroutine,此時調用future的done方法發現完成了。send future.result(),繼而執行a + b。隨後的情況和第一個例子無異了。
在這個例子中我們定義了一個sleep子協程,並學習到了如何實現阻塞。
再來看第三個更複雜的例子
demo2
@coroutinedef sum(a, b): print("Sum start. %s + %s" % (a, b)) yield from sleep(1) result = yield a + b print("Sum stop. %s + %s" % (a, b)) return result@coroutinedef multi(a, b): print("Multi start. %s x %s" % (a, b)) yield from sleep(2) result = yield a * b print("Multi stop. %s x %s" % (a, b)) return result@coroutinedef aaddbthenmutilc(a, b, c): sum_result = yield from sum(a, b) multi_result = yield from multi(sum_result, c) return multi_resultcoroutine2 = aaddbthenmutilc() # 創建一個協程對象loop = EventLoop() # 創建事件循環loop.run_until_complete(coroutine2)# 開啟事件循環直到完成print(coroutine2.result()) # 列印輸出
剛才說了協程也是一個future,但協程本身還是一個生成器。所以當我們返回協程時,使用yield from 代替yield。
當使用yield from 時生成器和子生器之間就建立起了一個長長的管道,子生器生成的值可以直接返回,無論鏈接多少個生成器,就好像直接調用一個生成器一樣。。
在這裡coroutine2中調用了sum子協程。一但其結束,sum的值會被賦值給sum_result,coroutine2繼續執行multi,直到結束。
寫到現在,可能有同學會問了,如果是真正的IO阻塞,怎麼處理。比如socket。別急,下面馬上給出第4個例子。
在此之前,我又實現了一個類似於sleep的阻塞函數
def get_buffer(socket): future = Future() def callback(fd_obj, events): future.set_result(fd_obj.recv(1024)) EventLoop().add_handler(socket, callback, EventLoop.READ) buffer = yield future return buffer
這個函數是專門用來處理socket讀產生的阻塞的,創建一個future,並創建一個回調函數,添加到事件循環中,EventLoop中有一組對象被稱為handlers,是用來處理文件描述符的。這個和tornado的IOLoop保持一致。通過add_handler可以添加。
在這裡,socket被監聽了READ事件。
demo3
@coroutinedef socket_coroutine(): print("socket_coroutine start. ") import socket from toolkit.event_loop import get_buffer client = socket.socket() client.connect(("", 1234)) buffer = yield from get_buffer(client) client.close() print("socket_coroutine stop. ") return buffercoroutine3 = socket_coroutine() # 創建一個協程對象loop = EventLoop() # 創建事件循環loop.run_until_complete(coroutine3)# 開啟事件循環直到完成print(coroutine3.result()) # 列印輸出
同時我開啟一個server
In [3]: def serve(): ...: server = socket.socket() ...: server.bind(("", 1234)) ...: server.listen(0) ...: client, addr = server.accept() ...: import time ...: time.sleep(5) ...: client.send(b"hello world") ...: client.close() ...: server.close() ...:In [4]: serve()
這個例子和sleep差不多,不同之處在於sleep讓事件循環停止了一段時間,而通過IO多路復用則是在被監聽的文件描述符發生讀事件時激活了事件循環。之後的步驟和demo1一樣。
下面還有最後一個例子,就是多個協程一起運行。
demo4
不貼代碼了,太長,代碼在此!https://github.com/ShichaoMa/toolkit/blob/master/test/test_event_loop.py
裡面有6個協程。
總結
其實關鍵的思路就在於如何處理協程生成器上,我的想法是對於一個協程,無非就是存在兩種情況,阻塞與不阻塞,阻塞的情況是返回的是future同時done為False,這種情況就像sleep ,get_buffer一樣。剛開始返回的都是沒有完成的future,導致該生成器無法被繼續執行,這個時候process_coroutine方法是會直接停止該生成器的執行,繼續執行下一個生成器,生成timeouts對象和添加事件監聽保證了在規定時間內(timout.callback)或者在事件發生時future被置為done=True。對於沒有阻塞的情況(返回的是非Future或者Future已經完成),生成器會一直執行下去直到結束。
終於寫完了。
希望能給您帶一些編程靈感!這個事件循環的所有代碼都被放在我的開發工具包toolkit裡面了。可在通過 pip install toolkity==1.6.0安裝
ShichaoMa/toolkit如果您喜歡同時方便的話,請Start一下,感謝關注。
推薦閱讀:
※為什麼 sqlmap 源碼看起來那麼費勁?
※Python裝飾器
※python里函數作為返回值如何進行比較?
※你可能不知道的 Python Web 部署方式總結
※Python能做什麼?需要什麼學習前提知識,如果要找工作,還需要學習什麼知識?