python協程簡介

原文是A Curious Course on Coroutines and Concurrency

原作是python cookbook的作者David Beazley,原文和代碼請參考網址

dabeaz.com/coroutines/C

A Curious Course on Coroutines and Concurrency

以下是我對其中第一部分和第二部分的渣翻譯(有一些截取和整理)。

頭腦爆炸目錄:

生成器-協程介紹-一些數據處理-事件處理-混合進程使用協程-作為任務的協程-寫一個多任務操作系統(只翻譯到數據處理為止)

在python2.5裡面,生成器添加了功能來允許協程(PEP-342),主要是一個新的send()方法。「所以你現在可以向生成器發送值來生成斐波那契數列了!(doge臉)」

第一部分 生成器和協程介紹

生成器部分(熟悉的可以跳過這個部分)

生成器是生成一序列結果而不是一個值的函數,經常和for-loop一起使用;

調用生成器函數會創建一個生成器對象,然而這個函數只有在next()上才會執行;

通過yield關鍵詞來生成一個值,但同時會把函數掛起,下一次用next()調用的時候會恢復(從yield下一行代碼開始)。

def countdown(n): print "Counting down from", n while n > 0: yield n n -= 1

>>> x = countdown(10)>>> x<generator object at 0x58490>>>> x.next()Counting down from 1010>>> x.next()9>>> x.next()8......>>> x.next()1>>> x.next()Traceback (most recent call last): File "<stdin>", line 1, in ?StopIteration

下面是一個實際的例子

*python版本的unix tail -f命令(一直讀文件末尾的字元)

import timedef follow(thefile): thefile.seek(0,2) # go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) continue yield line

*使用例子:看WEB伺服器日誌文件

logfile = open("access-log")for line in follow(logfile): print line,

生成器最強力的應用之一就是設置processing pipelines,以下是一個管道實例:

列印所有含python的伺服器日誌記錄

def grep(pattern, lines): for line in lines: if pattern in line: yield line# Set up a processing pipe: tail -f | grep pythonlogfile = open("access-log")loglines = follow(logfile)pylines = grep("python", loglines)# Pull results out of the processing pipelinefor line in pylines: print line,

協程部分

在正式介紹協程以前,還有一件關於yield的小事:python2.5裡面,對yield有一個小改動,現在可以把yield作為表達式使用了。比如把yield放在右邊作為賦值:

def grep(pattern): print "Looking for %s" % pattern while True: line = (yield) if pattern in line: print line,

問題來了:它(line)的值是什麼?

協程:

如果你更廣泛的使用yield,你就得到了一個協程。它們比僅僅生成值做得更多,函數還會消費發送給它的值,然後再通過yield來返回值。

>>> g = grep("python") # 這是沒有輸出,只是返回一個對象>>> g.next() # Prime it (explained shortly) 協程開始運行Looking for python>>> g.send("Yeah, but no, but yeah, but no")>>> g.send("A series of tubes")>>> g.send("python generators rock!")python generators rock!

這裡先講下send()函數:

send(value)

send發送的參數值會成為現在的yield表達式的結果(也就是賦值給yield表達式左邊),send()方法會返回生成器yield的下一個值或者停止迭代(如果不能再yield下一個值而生成器退出的話),

當send()方法第一次被調用來啟動這個生成器,它必須把None作為參數,因為這個時候沒有yield表達式來接受值。(下面會講也可以用另外一種方法)

協程的執行:和生成器一樣,當你調用協程的時候,什麼都沒有發生。它們只有在回應

next()和send()方法的時候運行。

Coroutine Priming(協程啟動):所有的協程都必須先啟動,通過調用.next()或者send(None),

這個會把執行提前到第一個yield表達式的位置(在上面的grep例子中就是跳到line=(yield)這一行,

到了這一步,它就準備好來接受值了。

很容易忘記去調用.next()函數,解決方案是用一個裝飾器來封裝協程:

#可以看到這個裝飾器唯一的作用就是對函數調用一次next()def coroutine(func): def start(*args, **kwargs): cr = func(*args, **kwargs) cr.next() return cr return start@coroutinedef grep(pattern): ...

關閉一個協程,協程可能無限運行下去,可以用.close()來關閉它

>>> g = grep("python")>>> g.next() # Prime itLooking for python>>> g.send("Yeah, but no, but yeah, but no")>>> g.send("python generators rock!")python generators rock!>>> g.close()

注意:垃圾回收也會調用close()

catching close() -- GeneratorExit@coroutinedef grep(pattern): print "Looking for %s" % pattern try: while True: line = (yield) if pattern in line: print line, except GeneratorExit: print "Going away. Goodbye"

你不能忽視這個異常,唯一合法的行為是清理和返回

Throwing an Exception

>>> g = grep("python")>>> g.next() # Prime itLooking for python>>> g.send("python generators rock!")python generators rock!>>> g.throw(RuntimeError,"Youre hosed")Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in grepRuntimeError: Youre hosed>>>

異常會產生於yield表達式,可以用平常的方法去catch/handle

這裡穿插講下生成器和協程兩個概念的不同:

儘管有一些相似之處,生成器和協程基本是兩個完全不同的概念。生成器生成值,而協程更傾向於消費值。

It is easy to get sidetracked because methods meant for coroutines are sometimes described as a way to tweak generators that are in the process of producing an iteration pattern (i.e., resetting its value). This is mostly bogus. (額,這段很不好翻,大意就是容易把協程看成一種扭曲使用生成器的方式,這完全是錯的)

一個明顯錯誤的例子--同時生產和接受值的「生成器」

def countdown(n): print "Counting down from", n while n >= 0: newvalue = (yield n) # If a new value got sent in, reset n with it if newvalue is not None: n = newvalue else: n -= 1

這段代碼會運行,但是非常難以理解而且古怪。

c = countdown(5)for n in c: print n if n == 5: c.send(3)

output(Notice how a value got "lost" in the iteration protocol):

5 2 1 0

這段代碼比較難以理解,所以我加了列印,可以用pycharm來斷點單步執行這段代碼,便於理解

def countdown(n): print "Counting down from", n while n >= 0: newvalue = (yield n) print newvalue:{0} n:{1}.format(newvalue, n) # If a new value got sent in, reset n with it if newvalue is not None: n = newvalue else: n -= 1c = countdown(5)for n in c: print n if n == 5: c.send(3)

以下是輸出和我個人加的理解注釋:

Counting down from 5 #第一次for循環,c.next(),執行到newvalue = (yield n)這行,準備好接受值5 #for循環內部,列印n的值,為5newvalue:3 n:5 #c.send(3),賦值給newvalue為3,繼續執行下一行列印,然後把n賦值為3,yield n返回一個值3,回到for循環newvalue:None n:3 #for循環執行c.next()相當於c.send(None),所以newvalue=None,n還是上次的值3,列印,接著會把n減一變為2,yield n返回值22 #回到for循環內部,列印2newvalue:None n:2 #又一次for循環執行,同上,以下略,自行理解1newvalue:None n:10newvalue:None n:0

保持直接:

生成器產生用來迭代的數據,協程消費數據,把這兩個概念分開!

協程和迭代無關。確實在協程中有用yield來生成值,但不是和迭代綁定的。

第二部分:協程,管道,數據流

Processing Pipelines

協程可以被用來設置管道,你可以把協程串聯起來,用send()方法來把數據推入穿過整個管道。

-send()->coroutine-send->coroutine-send()->corutine->

管道起源,pipeline需要一個最初的源頭也就是生產者:source-send()->coroutine-send()->

源頭驅動了整個pipeline,它通常不是一個協程!

def source(target): while not done: item = produce_an_item() ... target.send(item) target.close()

Pipeline Sinks管道水池

管道必須有一個終點(水池):send()->coroutine-send()->sind

收集所有發來的數據並處理:

@coroutinedef sink(): try: while True: item = (yield) # receive an item ... except GeneratorExit: # handle .close() # Done ...

一個例子

模仿unix tail -f的源頭

import timedef follow(thefile, target): thefile.seek(0,2) # Go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) continue target.send(line)

只是列印這些行的水池

@coroutinedef printer(): while True: line = (yield) print line,

用例:

f = open("access-log")follow(f, printer())

follow()不斷讀取文件末尾行並把它們推向printer()協程

pipeline filters 管道過濾

同時收發的中間階段,典型的進行一些數據轉換、過濾、routing(分發路由?)、等等

@coroutinedef filter(target): while True item = (yield) # receive an item # Transform/filter item ... # send it along to the next stage target.send(item)

一個過濾實例:

@coroutinedef grep(pattern, target): while True: line = (yield) if pattern in line: target.send(line)# follow()->grep()->printer()f = open("access-log")follow(f, grep(python, printer()))

#穿插:協程和生成器在這方面的不同

生成器:input seq -> generator -> generator -> generator -> for x in s:

協程:source -send()-> coroutine -send()-> coroutine()

關鍵不同:生成器隨著迭代拉取數據到pipe。迭代器用send()方法推送數據到管道。

分支化:通過協程,還可以把數據發送到多個目的地,源頭只是簡單的發送數據,更多的

數據分發可以非常複雜

例子:廣播到多個目標,把接受項發送給全部的協程序列(目標)

@coroutinedef broadcast(targets): while True: item = (yield) for target in targets: target.send(item)f = open("access-log")p = printer()follow(f, broadcast([grep(python, p), grep(ply, p), grep(swig, p)]) )

協程VS對象

協程在某種程度上類似於設計簡單handler objects的面對對象設計模式

class GrepHandler(object): def __init__(self,pattern, target): self.pattern = pattern self.target = target def send(self,line): if self.pattern in line: self.target.send(line)

協程版本

@coroutinedef grep(pattern, target): while True: line = (yield) if pattern in line: target.send(line)

-協程概念上更簡單,一個協程只需要一個函數定義

-如果你定義一個handler類,你需要一個類定義,兩個方法定義,可能還有基類和庫導入

-本質上你是把這個概念剝離到了最基本的要素 Essentially youre stripping the idea down to the bare essentials (like a generator vs. iterator)

-協程要更快

一次微型檢測

@coroutinedef null(): while True: item = (yield)line = python is nicep1 = grep(python,null()) # Coroutinep2 = GrepHandler(python,null()) # Object

發送1,000,000行

timeit("p1.send(line)", "from __main__ import line,p1") #0.60 stimeit("p2.send(line)", "from __main__ import line,p2") #0.92 s

性能差別在哪:

用類的時候我們要self.pattern和self.target來查找self(也就是那個實例),而協程不需要self


推薦閱讀:

深入python協程的實現,帶你一層一層揭開協程的神秘面紗!

TAG:Python | 協程 | 編程 |