python協程簡介
原文是A Curious Course on Coroutines and Concurrency
原作是python cookbook的作者David Beazley,原文和代碼請參考網址
http://www.dabeaz.com/coroutines/Coroutines.pdf
A Curious Course on Coroutines and Concurrency
以下是我對其中第一部分和第二部分的渣翻譯(有一些截取和整理)。
頭腦爆炸目錄:
生成器-協程介紹-一些數據處理-事件處理-混合進程使用協程-作為任務的協程-寫一個多任務操作系統(只翻譯到數據處理為止)
在python2.5裡面,生成器添加了功能來允許協程(PEP-342),主要是一個新的send()方法。「所以你現在可以向生成器發送值來生成斐波那契數列了!(doge臉)」
第一部分 生成器和協程介紹
生成器部分(熟悉的可以跳過這個部分)
生成器是生成一序列結果而不是一個值的函數,經常和for-loop一起使用;
調用生成器函數會創建一個生成器對象,然而這個函數只有在next()上才會執行;
通過yield關鍵詞來生成一個值,但同時會把函數掛起,下一次用next()調用的時候會恢復(從yield下一行代碼開始)。
def countdown(n): print "Counting down from", n while n > 0: yield n n -= 1
>>> x = countdown(10)>>> x<generator object at 0x58490>>>> x.next()Counting down from 1010>>> x.next()9>>> x.next()8......>>> x.next()1>>> x.next()Traceback (most recent call last): File "<stdin>", line 1, in ?StopIteration
下面是一個實際的例子
*python版本的unix tail -f命令(一直讀文件末尾的字元)
import timedef follow(thefile): thefile.seek(0,2) # go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) continue yield line
*使用例子:看WEB伺服器日誌文件
logfile = open("access-log")for line in follow(logfile): print line,
生成器最強力的應用之一就是設置processing pipelines,以下是一個管道實例:
列印所有含python的伺服器日誌記錄
def grep(pattern, lines): for line in lines: if pattern in line: yield line# Set up a processing pipe: tail -f | grep pythonlogfile = open("access-log")loglines = follow(logfile)pylines = grep("python", loglines)# Pull results out of the processing pipelinefor line in pylines: print line,
協程部分
在正式介紹協程以前,還有一件關於yield的小事:python2.5裡面,對yield有一個小改動,現在可以把yield作為表達式使用了。比如把yield放在右邊作為賦值:
def grep(pattern): print "Looking for %s" % pattern while True: line = (yield) if pattern in line: print line,
問題來了:它(line)的值是什麼?
協程:
如果你更廣泛的使用yield,你就得到了一個協程。它們比僅僅生成值做得更多,函數還會消費發送給它的值,然後再通過yield來返回值。
>>> g = grep("python") # 這是沒有輸出,只是返回一個對象>>> g.next() # Prime it (explained shortly) 協程開始運行Looking for python>>> g.send("Yeah, but no, but yeah, but no")>>> g.send("A series of tubes")>>> g.send("python generators rock!")python generators rock!
這裡先講下send()函數:
send(value)
send發送的參數值會成為現在的yield表達式的結果(也就是賦值給yield表達式左邊),send()方法會返回生成器yield的下一個值或者停止迭代(如果不能再yield下一個值而生成器退出的話),
當send()方法第一次被調用來啟動這個生成器,它必須把None作為參數,因為這個時候沒有yield表達式來接受值。(下面會講也可以用另外一種方法)
協程的執行:和生成器一樣,當你調用協程的時候,什麼都沒有發生。它們只有在回應
next()和send()方法的時候運行。
Coroutine Priming(協程啟動):所有的協程都必須先啟動,通過調用.next()或者send(None),
這個會把執行提前到第一個yield表達式的位置(在上面的grep例子中就是跳到line=(yield)這一行,
到了這一步,它就準備好來接受值了。
很容易忘記去調用.next()函數,解決方案是用一個裝飾器來封裝協程:
#可以看到這個裝飾器唯一的作用就是對函數調用一次next()def coroutine(func): def start(*args, **kwargs): cr = func(*args, **kwargs) cr.next() return cr return start@coroutinedef grep(pattern): ...
關閉一個協程,協程可能無限運行下去,可以用.close()來關閉它
>>> g = grep("python")>>> g.next() # Prime itLooking for python>>> g.send("Yeah, but no, but yeah, but no")>>> g.send("python generators rock!")python generators rock!>>> g.close()
注意:垃圾回收也會調用close()
catching close() -- GeneratorExit@coroutinedef grep(pattern): print "Looking for %s" % pattern try: while True: line = (yield) if pattern in line: print line, except GeneratorExit: print "Going away. Goodbye"
你不能忽視這個異常,唯一合法的行為是清理和返回
Throwing an Exception
>>> g = grep("python")>>> g.next() # Prime itLooking for python>>> g.send("python generators rock!")python generators rock!>>> g.throw(RuntimeError,"Youre hosed")Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in grepRuntimeError: Youre hosed>>>
異常會產生於yield表達式,可以用平常的方法去catch/handle
這裡穿插講下生成器和協程兩個概念的不同:
儘管有一些相似之處,生成器和協程基本是兩個完全不同的概念。生成器生成值,而協程更傾向於消費值。
It is easy to get sidetracked because methods meant for coroutines are sometimes described as a way to tweak generators that are in the process of producing an iteration pattern (i.e., resetting its value). This is mostly bogus. (額,這段很不好翻,大意就是容易把協程看成一種扭曲使用生成器的方式,這完全是錯的)
一個明顯錯誤的例子--同時生產和接受值的「生成器」
def countdown(n): print "Counting down from", n while n >= 0: newvalue = (yield n) # If a new value got sent in, reset n with it if newvalue is not None: n = newvalue else: n -= 1
這段代碼會運行,但是非常難以理解而且古怪。
c = countdown(5)for n in c: print n if n == 5: c.send(3)
output(Notice how a value got "lost" in the iteration protocol):
5 2 1 0
這段代碼比較難以理解,所以我加了列印,可以用pycharm來斷點單步執行這段代碼,便於理解
def countdown(n): print "Counting down from", n while n >= 0: newvalue = (yield n) print newvalue:{0} n:{1}.format(newvalue, n) # If a new value got sent in, reset n with it if newvalue is not None: n = newvalue else: n -= 1c = countdown(5)for n in c: print n if n == 5: c.send(3)
以下是輸出和我個人加的理解注釋:
Counting down from 5 #第一次for循環,c.next(),執行到newvalue = (yield n)這行,準備好接受值5 #for循環內部,列印n的值,為5newvalue:3 n:5 #c.send(3),賦值給newvalue為3,繼續執行下一行列印,然後把n賦值為3,yield n返回一個值3,回到for循環newvalue:None n:3 #for循環執行c.next()相當於c.send(None),所以newvalue=None,n還是上次的值3,列印,接著會把n減一變為2,yield n返回值22 #回到for循環內部,列印2newvalue:None n:2 #又一次for循環執行,同上,以下略,自行理解1newvalue:None n:10newvalue:None n:0
保持直接:
生成器產生用來迭代的數據,協程消費數據,把這兩個概念分開!
協程和迭代無關。確實在協程中有用yield來生成值,但不是和迭代綁定的。
第二部分:協程,管道,數據流
Processing Pipelines
協程可以被用來設置管道,你可以把協程串聯起來,用send()方法來把數據推入穿過整個管道。
-send()->coroutine-send->coroutine-send()->corutine->
管道起源,pipeline需要一個最初的源頭也就是生產者:source-send()->coroutine-send()->
源頭驅動了整個pipeline,它通常不是一個協程!
def source(target): while not done: item = produce_an_item() ... target.send(item) target.close()
Pipeline Sinks管道水池
管道必須有一個終點(水池):send()->coroutine-send()->sind
收集所有發來的數據並處理:
@coroutinedef sink(): try: while True: item = (yield) # receive an item ... except GeneratorExit: # handle .close() # Done ...
一個例子
模仿unix tail -f的源頭
import timedef follow(thefile, target): thefile.seek(0,2) # Go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) continue target.send(line)
只是列印這些行的水池
@coroutinedef printer(): while True: line = (yield) print line,
用例:
f = open("access-log")follow(f, printer())
follow()不斷讀取文件末尾行並把它們推向printer()協程
pipeline filters 管道過濾
同時收發的中間階段,典型的進行一些數據轉換、過濾、routing(分發路由?)、等等
@coroutinedef filter(target): while True item = (yield) # receive an item # Transform/filter item ... # send it along to the next stage target.send(item)
一個過濾實例:
@coroutinedef grep(pattern, target): while True: line = (yield) if pattern in line: target.send(line)# follow()->grep()->printer()f = open("access-log")follow(f, grep(python, printer()))
#穿插:協程和生成器在這方面的不同
生成器:input seq -> generator -> generator -> generator -> for x in s:
協程:source -send()-> coroutine -send()-> coroutine()
關鍵不同:生成器隨著迭代拉取數據到pipe。迭代器用send()方法推送數據到管道。
分支化:通過協程,還可以把數據發送到多個目的地,源頭只是簡單的發送數據,更多的
數據分發可以非常複雜
例子:廣播到多個目標,把接受項發送給全部的協程序列(目標)
@coroutinedef broadcast(targets): while True: item = (yield) for target in targets: target.send(item)f = open("access-log")p = printer()follow(f, broadcast([grep(python, p), grep(ply, p), grep(swig, p)]) )
協程VS對象
協程在某種程度上類似於設計簡單handler objects的面對對象設計模式
class GrepHandler(object): def __init__(self,pattern, target): self.pattern = pattern self.target = target def send(self,line): if self.pattern in line: self.target.send(line)
協程版本
@coroutinedef grep(pattern, target): while True: line = (yield) if pattern in line: target.send(line)
-協程概念上更簡單,一個協程只需要一個函數定義
-如果你定義一個handler類,你需要一個類定義,兩個方法定義,可能還有基類和庫導入
-本質上你是把這個概念剝離到了最基本的要素 Essentially youre stripping the idea down to the bare essentials (like a generator vs. iterator)
-協程要更快
一次微型檢測
@coroutinedef null(): while True: item = (yield)line = python is nicep1 = grep(python,null()) # Coroutinep2 = GrepHandler(python,null()) # Object
發送1,000,000行
timeit("p1.send(line)", "from __main__ import line,p1") #0.60 stimeit("p2.send(line)", "from __main__ import line,p2") #0.92 s
性能差別在哪:
用類的時候我們要self.pattern和self.target來查找self(也就是那個實例),而協程不需要self
推薦閱讀: