標籤:

VLCP協程框架簡介

最近我在github上開源的Python SDN控制器項目vlcp(github.com/hubo1016/vlc)經歷0.9,0.10,0.11,0.12版本之後終於迎來了1.0版本,初步具備了實用性。最近在整理文檔的同時,想寫一系列文章簡單介紹一下這個開源項目,順便求一些star(2個也太慘了……)

雖然本職工作是SDN控制器,由於內嵌了一個獨立設計的協程框架,作為非同步編程框架來用也是很不錯的,實際上我也用它搭建Web伺服器,在公司內部跑得很歡。這篇文章主要介紹這個基於協程的非同步編程框架,後續的文章中詳細介紹與SDN有關的部署和配置方法。

第一篇文章,不想講太多的技術問題,邀請各種對Python有一些了解的朋友們,無論是資深開發者,還是初學者,都可以簡單上手玩一下,讓我們回到初心,想起在Python的Interactive Shell上面測試各種代碼的愉悅心情,以下主要都是一些可以跟著做的例子,隨著例子講解一些框架中的概念。

協程概述

現在在Python中說到協程,其實指的不一定是同一個東西,而且至少有三種不同的含義:

  1. 基於greenlet的協程,使用C實現協程調度
  2. Python 3.5開始支持的原生async方法(內部使用生成器機制)
  3. 基於生成器(generator)的協程,如asyncio(Python3.5之前)和tornado

VLCP使用的是第三種,也就是說既不依賴於greenlet,又不要求使用Python3。跟tornado相比,VLCP提供了一種比Future回調更加簡單易用的編程模型——甚至在許多時候比Stackless Python(以及Go語言)的channel模型更加簡單,它是專門為了SDN控制器這樣有大量並發過程同時進行的任務設計的。除了最核心的調度器以外,幾乎所有的功能代碼本身也都是基於協程模型編寫的。

安裝

閑話不多說,直接進入動手玩的階段。有Linux的環境是最好的,因為主要的測試在Linux上進行,不過Windows也可以兼容,所以這次的實驗兩者都可以,不過在Windows下你可能得小心一些並且忍受許多在Linux上不會出現的bug,比如說Ctrl+C目前不是很好使。當然你得先裝好Python。

vlcp支持的Python版本很全,包括Python 2.6+, Python 3.4+以及PyPy,基本上涵蓋了你有可能使用到的任何環境。一般對於開發來說Python 3是最友好的,對於運行效率來說則是PyPy略勝一籌,對於懶得折騰的人來說,各大Linux發行版自帶的Python,比如Centos 7自帶的是Python 2.7,Centos 6.5+自帶的是Python 2.6,也都可以用。

安裝vlcp是比較簡單的, 最簡單的方法莫過於使用pip:

pip install vlcpn

如果不想安裝到系統路徑的話,可以使用virtualenv、或者直接從github上下載源碼並解壓到某個目錄中都是可以的(要使用pip安裝nstruct依賴庫,這也是我的開源項目之一(github.com/hubo1016/nam),以後有時間介紹)

啟動控制台

安裝好之後, 用以下命令啟動vlcp shell

python -m vlcp.service.debugging.consolen

這是vlcp內置的調試控制台模塊,除了多打了一些信息出來以外,跟Python原生的控制台看上去沒有多大區別,實則暗藏玄機。在控制台的背後,有一個獨立的線程在運行vlcp調度器,這樣即便前台看上去是普通的interactive shell,後台的協程運行也是不受影響的。這個控制台模塊是複雜程序調試的利器,它可以載入到任意的基於vlcp框架的程序中,隨時查看當前的運行狀態,追蹤程序邏輯,以至於動態添加新的程序邏輯、中斷執行過程甚至進入pdb等等,對於不應該直接以控制台方式啟動的程序,則支持用telnet協議登入來使用控制台。今天要用的是只是其中一些微小的功能。

第一個協程

我們首先看一下如何在vlcp框架中書寫一個合法的協程。最基礎的來說,協程必須是一個生成器,所以一定要有yield語句。於是我們寫出第一個協程方法:

如果你用的是Python 3,請注意自行將所有的print替換成print()函數。後面不再重複這句話。

def first_routine():n print Hello world!n if False:n yieldn

按照宇宙慣例我們使用爛俗的Hello world作為開始。後面的if False: yield是為了讓這個方法變成一個生成器。現在我們來啟動這個協程:

>>> subroutine(first_routine())n<routine <generator object first_routine at 0x02FAD468> at 0x0000000002FAD468>Hello world!n

Hello world!前面的文本是subroutine方法的返回值——一個協程對象,如果你使用r = subroutine(first_routine()) 就不會列印到控制台中了。subroutine是vlcp控制台的內置方法,有關它的詳細介紹放到更後面一些的地方,目前我們只要記得它的作用是啟動一個協程就好。注意subroutine(first_routine())而不是subroutine(first_routine),傳入的是生成器對象而不是生成器函數,關於生成器的知識這裡就不詳細介紹了,不太了解的可以簡單查一下資料學習一個。

計時循環

前面的協程看上去跟普通的Python程序沒有任何區別,現在我們來加入一個帶延遲的循環,來證明它的確是並行運行的:

def routine_loop():n for i in range(0,10):n print Hello world!,in for m in container.waitWithTimeout(1):n yield mn

啟動這個協程:

>>> subroutine(routine_loop())n<routine <generator object routine_loop at 0x03B1D440> at 0x0000000003B1D440>Hello world! 0nn>>> Hello world! 1nHello world! 2nHello world! 3nHello world! 4nHello world! 5nHello world! 6nHello world! 7nHello world! 8nHello world! 9n

這個協程每一秒會輸出一行文本。

這裡要重點解釋一下container的概念。大部分情況下——雖然不是必須,每個協程都有一個與它相關聯的container,它是RoutineContainer類的一個實例,它其實沒有什麼複雜的作用,只是用來保存或傳遞協程運行過程中需要的一些臨時值。通過關聯到RoutineContainer對象,不同的協程之間可以比較簡單地共享數據。多個協程可以使用相同的RoutineContainer,也可以使用不同的,比如說剛才的協程,移到一個獨立的RoutineContainer當中也是可以的:

from vlcp.event import RoutineContainernnclass MyContainer(RoutineContainer):n def routine_loop(self):n for i in range(0,10):n print Hello world!,in for m in self.waitWithTimeout(1):n yield mnnc = MyContainer(server.scheduler)nndef start_routine():n c.subroutine(c.routine_loop())n if False:n yieldnnsubroutine(start_routine())n

我們在另一個協程中調用c.subroutine()方法來啟動一個新的協程,這個協程跟一個獨立的RoutineContainer關聯,實際上它也是這個子類的一個方法。注意其中等待1秒的步驟使用了self來替代之前的container,因為與協程關聯的container發生了變化。

vlcp控制台中的內置變數container是一個預先創建好的RoutineContainer對象,它是控制台中默認的container。subroutine通過以一種「安全」的方式調用container.subroutine來啟動協程。通過控制台內置的subroutine方法啟動的協程會關聯到container變數。

引入container概念的主要原因是Python 2不支持yield from,引入container作為一個臨時存儲對象可以簡化許多中間過程。不過它順便也可以幫助你組織程序,將協程進行合理的分組。

調用過程與啟動協程

接下來,如果我的一個協程需要調用另一個協程過程呢?這裡說的調用,是指讓另一個協程方法的邏輯在本協程當中執行,類似於方法調用。

def routine_call_1():n print routine 1 startn if False:n yieldn print routine 1 endnndef routine_call_2():n print routine 2 startn for m in routine_call_1():n yield mn print routine 2 endn

執行subroutine(routine_call_2())來查看效果。我們可以看到,調用另一個協程方法,就是通過for...yield循環執行這個方法。在Python 3當中也可以使用yield from,不過for...yield是Python2/3兼容的寫法。調用一個協程過程跟調用普通過程差別並不大,只要牢記需要使用for...yield的語法就行了。如果忘記的話……就多記幾次。

另一個概念是從協程中啟動另一個協程,新啟動的協程會和當前協程並行執行。啟動新的協程也非常簡單:

def routine_loop(mark):n for i in range(0,10):n print mark,in for m in container.waitWithTimeout(1):n yield mnndef routine_start_test():n container.subroutine(routine_loop(subroutine))n for m in routine_loop(call):n yield mn

執行結果:

>>> subroutine(routine_start_test())n<routine <generator object routine_start_test at 0x031CBE40> at 0x00000000031CBE40>calln 0n>>> subroutine 0ncall 1nsubroutine 1ncall 2nsubroutine 2ncall 3nsubroutine 3ncall 4nsubroutine 4ncall 5nsubroutine 5ncall 6nsubroutine 6ncall 7nsubroutine 7ncall 8nsubroutine 8ncall 9nsubroutine 9n

可以看到,兩個協程的確是並行執行的。

container.subroutine的參數(注意不是控制台中的subroutine,而是container的方法)可以微調分支出去的協程與當前協程的執行順序。協程並不是真正的並行執行,準確來說其實是「按順序執行」的。在默認參數下,subroutine的協程是延遲啟動的,只有在當前協程進入下一個yield從而交還執行權的時候,新的協程才會開始執行;修改第二個參數asyncStart的值可以修改這個行為,可以試一下:

def routine_loop(mark):n for i in range(0,10):n print mark,in for m in container.waitWithTimeout(1):n yield mnndef routine_start_test():n container.subroutine(routine_loop(subroutine), False)n for m in routine_loop(call):n yield mn

結果:

>>> subroutine(routine_start_test())n<routine <generator object routine_start_test at 0x031CBE40> at 0x00000000031CBE40>subroutinen0n>>> call 0nsubroutine 1ncall 1nsubroutine 2ncall 2nsubroutine 3ncall 3nsubroutine 4ncall 4nsubroutine 5ncall 5nsubroutine 6ncall 6nsubroutine 7ncall 7nsubroutine 8ncall 8nsubroutine 9ncall 9n

subroutine(xxx, False)的執行策略略有不同,它會現在當前協程中執行新協程直到新協程暫停,然後恢復原協程的執行,這樣看上去就是新協程在subroutine執行時立即開始執行了。如果新協程一次暫停都沒有,甚至可以在subroutine結束時立即執行結束。如果新協程在第一次暫停前拋出了異常,這個異常會被subroutine的調用方接收到,可能導致意想不到的問題。大部分情況下默認參數會更安全一些。

異常處理

協程模型最大的好處之一是很方便異常的處理。在協程中可以正常使用try...except...finally來處理運行中的異常,如果異常未被處理而被調度器接收,調度器會列印出出現異常的協程名稱、異常的堆棧信息日誌和導致異常出現的事件(後面提到),方便查找問題。一個協程異常退出不會影響其他協程,最大程度保障了系統運行的穩定性。注意except子句中可以使用yield,但finally中不行。通過subroutine創建的新協程是獨立的,除了前面提到的asyncStart = False的特例以外,新協程中拋出的異常不會影響到啟動它的協程。

def exception_routine_1(tag):n raise ValueError(exception in routine 1, tag = + repr(tag))n if False:n yieldnndef exception_routine_2():n container.subroutine(exception_routine_1(subroutine))n try:n for m in exception_routine_1(call):n yield mn except ValueError as exc:n print Exception caught in routine 2, message: + str(exc)nnsubroutine(exception_routine_2())n

協程通信——事件模型

vlcp的協程通信機制基於一種Pub/Sub的Event,關於這個詳細機制我計劃以後專門寫一篇文章來討論,不過今天可以先看一眼大概。定義一個自定義的Event類型只需要創建vlcp.event.Event的一個子類,使用vlcp.event.withIndices註解添加索引。協程可以使用container.waitForSend方法來發送事件,用event.createMatcher來創建匹配器,用yield語句返回一個或多個匹配器來等待一個新的事件。一個事件在發送後可以被多個協程接收並處理。

from vlcp.event import Event, withIndicesn@withIndices(type)nclass MyEvent(Event):n passnndef routine_1():n # A matcher without index specified matches all this type of eventsn matcher = MyEvent.createMatcher()n while True:n yield (matcher,)n print routine 1, container.event.type, container.event.messagenndef routine_2():n # Filtern # A matcher with index specified matches onlyn # events with specified index valuen matcher1 = MyEvent.createMatcher(type = 1)n matcher2 = MyEvent.createMatcher(type = 2)n while True:n # A yield with multiple matchers returns when an event matchesn # any of the matchersn yield (matcher1, matcher2)n # The matched event is stored at container.event,n # And the matcher that matched this event is stored at container.matchern # Use is to test which matcher has matched the eventn if container.matcher is matcher1:n print routine 2, 1, container.event.messagen else:n print routine 2, 2, container.event.messagenndef routine_sender():n # use asyncStart = False to make sure they are waiting for eventsn r1 = container.subroutine(routine_1(), False)n r2 = container.subroutine(routine_2(), False)n try:n # Extra data can be stored in the event with keyword argumentsn for m in container.waitForSend(MyEvent(type = 1, message = event1)):n yield mn for m in container.waitWithTimeout(1):n yield mn for m in container.waitForSend(MyEvent(type = 2, message = event2)):n yield mn for m in container.waitWithTimeout(1):n yield mn for m in container.waitForSend(MyEvent(type = 3, message = event3)):n yield mn for m in container.waitWithTimeout(1):n yield mn for m in container.waitForSend(MyEvent(type = 1, message = event1)):n yield mn for m in container.waitWithTimeout(1):n yield mn finally:n # Use terminate to stop a routinen container.terminate(r1)n # or use close()n r2.close()nnsubroutine(routine_sender())n

運行結果:

>>> r = subroutine(routine_sender())n>>> routine 2 1 event1nroutine 1 1 event1nroutine 2 2 event2nroutine 1 2 event2nroutine 1 3 event3nroutine 2 1 event1nroutine 1 1 event1n

高級介面演示

講了太多枯燥的內容,現在我們來玩一下比較高級的應用,演示一下HTTP Client的使用。內置的WebClient類可以用於HTTP/HTTPS的訪問,我們來簡單用WebClient抓一下百度的首頁:

from vlcp.utils.webclient import WebClientnwc = WebClient(True)nndef url_load():n for m in wc.urlgetcontent(container, http://www.baidu.com/):n yield mn print repr(container.retvalue)n

執行結果就不貼了……

下一篇文章中我們會接著這次的實驗方式,在vlcp控制台中創建一個非常簡單的WebServer出來,然後簡單講解vlcp的配置管理與模塊管理功能。敬請期待。


推薦閱讀:

Python練習第五題,找出HTML里的正文
Kaggle HousePrice : LB 0.11666(前15%), 用搭積木的方式(2.實踐-特徵工程部分)
【談談智商稅】我究竟該怎麼學Python
從零到搭建一個能提供API介面的網站,過程是怎樣的?
一個網站用兩種或以上的後端編程語言會出現什麼情況?為什麼?

TAG:Python | SDN | 协程 |