Python並發學習筆記:從協程到GEVENT(一)
直覺上,最容易實現並發的兩種途徑應該是多進程和多線程。這兩種方法各有利弊。多進程可以充分利用現在處理器本身多核多進程的特點,提高了CPU的使用效率,而且各子進程間相互隔離,一個處理事務的子進程掛了,並不會連帶把其他子進程也掛掉。但其缺點也是不容忽視的。進程開銷很大,各進程間信息共享比較麻煩。多線程的情況則反過來了,各線程間共享同一塊內存空間,共享信息會很簡便,而且線程的實現是在同一進程內,系統開銷比多進程小很多。但使用多線程就要考慮加鎖的問題,鎖的處理會大大增加代碼的複雜程度,同時,共享內存資源容易導致整個進程內的線程被一個線程的崩潰而全部掛斷。具體到Python,多線程還有一個巨大的GIL全局編譯器鎖橫亘在面前。這種粗粒度的全局鎖,事實上導致了Python線程調用是串列的,無論線程間是否在訪問同一內存資源,只要一個線程獲得了CPU時間,其他線程就會被阻塞。
除了多進程和多線程,Python還提供了一種特性可以實現程序某種程度上的「並發」,協程。協程最簡單的例子應該是基於yield的一種實現:
def foo():n print "foo start"n r = n while True:n num = yield rn print numn r = OKnnif __name__ == "__main__":n f = foo()n f.next()n print "begin send"n for x in range(5):n r = f.send(x)n print r n f.close()n
回顯輸出:
>>> nfoo startnbegin sendn0nOKn1nOKn2nOKn3nOKn4nOKn>>> n
這個例子可以看出協程和子程序調用的不同,更像是系統中斷,主程序執行的過程中,子程序開始執行,到達yield時代碼執行中斷,返回到主程序,直到主程序send一個數字給子程序,周而復始。
當然,協程這樣子寫是不能支持並發的,但如果用協程來替代子線程,似乎有不小的優勢。首先協程間的切換本身不涉及線程切換,所有的代碼是運行在一個線程里的,也就不存在同步鎖的問題;其次協程間的切換完全由用戶決定,可以避免子線程長時間掛在一個地方阻塞其他子線程執行的問題。把協程和多進程結合起來,實現並發,提高多核CPU使用效率,這大概是個不錯的選擇。
協程的實現除了使用yield,還有一個非常著名的庫,(可不是庫嗎,Python最大的優點就是各種各樣現成的庫了。)greenlet。
greenlet主要實現了控制Python棧切換的功能,這部分內容比較艱深,得慢慢啃,現在先不說了。但應用起來還是比較簡明的,舉個栗子:
Python 2.7.6 (default, Jun 22 2015, 17:58:13) n[GCC 4.8.2] on linux2nType "help", "copyright", "credits" or "license" for more information.n>>> from greenlet import greenletn>>> def foo1():n... print "foo1.1"n... gr2.switch()n... print "foo1.2"n... n>>> def foo2():n... print "foo2.1"n... gr1.switch()n... print "foo2.2"n... n>>> gr1 = greenlet(foo1)n>>> gr2 = greenlet(foo2)n>>> gr1.switch()nfoo1.1nfoo2.1nfoo1.2n>>> n
總結一下的話,協程並不是真正意義上的並發,在我看來是一種手動觸發的代碼段轉移執行,用來規避阻塞操作。這樣,當代碼運行到一個地方IO阻塞了,那就調用另一個代碼段執行別的任務去,等到這邊阻塞結束了,再重新回來執行。這種切換完全在於程序員自己的安排。
說回到greenlet的話,它其實提供了大量的介面,但這裡就先不提了,因為還有一個對它進行了封裝,對Web開發更好用的庫,gevent。
gevent是基於greenlet封裝的一個網路同步庫,裡面有好多好東西,效率又高又好用。下面先舉幾個例子,顯示一下它的基本運行過程:
Python 2.7.6 (default, Jun 22 2015, 17:58:13) n[GCC 4.8.2] on linux2nType "help", "copyright", "credits" or "license" for more information.n>>> n>>> n>>> import geventn>>> def foo():n... print "foo is runing"n... gevent.sleep(0)n... print "foo is ending"n... n>>> def bar():n... print "bar is runing"n... gevent.sleep(0)n... print "bar is ending"n... n>>> gevent.joinall([gevent.spawn(foo), gevent.spawn(bar)])nfoo is runingnbar is runingnfoo is endingnbar is endingn[<Greenlet at 0x7f8100f71c30>, <Greenlet at 0x7f8100f71550>]n
第一次看到這個例子的時候感覺還是很amazing的。當foo進入阻塞狀態的時候,gevent自動調用bar,而foo阻塞結束後又被重新在中斷的位置繼續向下執行,執行結束後bar繼續執行。這樣,在不直接進行switch操作的前提下,gevent通過對事件的監控,自動對各協程的執行進行調度。
再補充一個例子,顯示一下gevent下代碼執行的「非同步性」,例子里會有兩個函數,一個是我們正常情況下實現的同步調用,另一個是使用了gevent的「非同步」調用,看一下它們的回顯有什麼不同。
#!venv/bin/pythonnnimport geventnimport randomnndef task(num):n gevent.sleep(random.randint(0, 2)*0.01)n print "task %d is done!" % numnndef synchronous():n for i in xrange(10):n task(i)nndef asynchronous():n threads = [gevent.spawn(task, i) for i in xrange(10)]n gevent.joinall(threads)nnif __name__ == "__main__":n synchronous()n print "=" * 20n asynchronous()n
執行後的結果是:
task 0 is done!ntask 1 is done!ntask 2 is done!ntask 3 is done!ntask 4 is done!ntask 5 is done!ntask 6 is done!ntask 7 is done!ntask 8 is done!ntask 9 is done!n====================ntask 1 is done!ntask 4 is done!ntask 7 is done!ntask 8 is done!ntask 9 is done!ntask 0 is done!ntask 2 is done!ntask 6 is done!ntask 3 is done!ntask 5 is done!n
雖然知道,內部實現里是把各個協程分別調用,但從實際效果上看已經有了非同步的影子。
這裡還有兩個問題需要指出,一個是當gevent出錯的時候,比如不能正常yield,一個是當協程執行超時的時候。
出錯可以在主程序里對gevent的錯誤信號進行處理:
gevent.signal(signal.SIGQUIT, gevent.shutdown)n
這樣,當錯誤信號發生的時候,就會調用gevent的shutdown方法關閉gevent進程。
超時也是一個大問題,畢竟協程還是在一個線程里執行的,無論怎麼「閃展騰挪」。這裡gevent提供了一個Timeout定時器類來處理。用法如下:
import geventnfrom gevent import Timeoutnnseconds = 10nntimeout = Timeout(seconds)ntimeout.start()nndef wait():n gevent.sleep(10)nntry:n gevent.spawn(wait).join()nexcept Timeout:n print(Could not complete)n
注意一點,這個定時器不會被重置,上面的代碼里如果把wait方法里休眠的時間改為5,並執行兩遍,定時器還是會超時的。
在協程間傳遞消息的話,gevent提供了一個數據結構,event,它可以非同步的在各協程間傳遞,而且可以進行一對多的對話。舉個栗子(今天舉了好多栗子啊 ==!):
Python 2.7.6 (default, Jun 22 2015, 17:58:13) n[GCC 4.8.2] on linux2nType "help", "copyright", "credits" or "license" for more information.n>>> import geventn>>> from gevent.event import Eventn>>> evt = Event()n>>> n>>> def server():n... print "i am preparing data."n... gevent.sleep(3)n... evt.set()n... n>>> def client():n... print "client"n... evt.wait()n... print "get data"n... n>>> gevent.joinall([gevent.spawn(server), gevent.spawn(client), gevent.spawn(client)])ni am preparing data.nclientnclientnget datanget datan[<Greenlet at 0x7f84b46caeb0>, <Greenlet at 0x7f84b46ca370>, <Greenlet at 0x7f84b46ca550>]n>>> n
還可以在event里傳遞參數,
Python 2.7.6 (default, Jun 22 2015, 17:58:13) n[GCC 4.8.2] on linux2nType "help", "copyright", "credits" or "license" for more information.n>>> import geventn>>> from gevent.event import AsyncResultn>>> a = AsyncResult()n>>> n>>> def server():n... print "i am preparing data."n... gevent.sleep(2)n... a.set("Hello,world.")n... n>>> def client():n... print "client"n... print a.get()n... n>>> gevent.joinall([gevent.spawn(server), gevent.spawn(client)])ni am preparing data.nclientnHello,world.n[<Greenlet at 0x7f04d3250c30>, <Greenlet at 0x7f04d3250d70>]n>>> n
除了event,還有一種數據結構可以用來在協程間通信,Queue隊列。協程可以通過Queue的成員方法put把數據壓入隊列,通過get方法取出。注意,這兩種方法是阻塞的,如果通過初始化Queue時指定maxsize的方法,規定了Queue實例最大的容量,那麼當空餘容量為0的時候put操作就會被阻塞,而當隊列空的時候,get操作會返回一個Empty異常。參考下面的例子和回顯:
import gevent
from gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):try:
while True: task = tasks.get(timeout=1) # decrements queue size by 1print(Worker %s got task %s % (n, task)) gevent.sleep(0)except Empty:print(Quitting time!)def boss():""" Boss will wait to hand out work until a individual worker isfree since the maxsize of the task queue is 3.
"""for i in xrange(1,10): tasks.put(i)print(Assigned all work in iteration 1)for i in xrange(10,20): tasks.put(i)print(Assigned all work in iteration 2)gevent.joinall([ gevent.spawn(boss),gevent.spawn(worker, steve),
gevent.spawn(worker, john), gevent.spawn(worker, bob),])
Worker steve got task 1
Worker john got task 2Worker bob got task 3Worker steve got task 4Worker bob got task 5Worker john got task 6Assigned all work in iteration 1
Worker steve got task 7Worker john got task 8Worker bob got task 9Worker steve got task 10Worker bob got task 11Worker john got task 12Worker steve got task 13Worker john got task 14Worker bob got task 15Worker steve got task 16
Worker bob got task 17Worker john got task 18Assigned all work in iteration 2Worker steve got task 19Quitting time!Quitting time!
當然,get和put還有不阻塞的方式,get_nowait和put_nowait,這兩個方法在無法成功執行的時候回分別拋出Empty異常和Full異常。
推薦閱讀: