python中多進程+協程的使用

前面講了為什麼Python里推薦用多進程而不是多線程,但是多進程也有其自己的限制:相比線程更加笨重、切換耗時更長,並且在python的多進程下,進程數量不推薦超過CPU核心數(一個進程只有一個GIL,所以一個進程只能跑滿一個CPU),因為一個進程佔用一個CPU時能充分利用機器的性能,但是進程多了就會出現頻繁的進程切換,反而得不償失。

不過特殊情況(特指IO密集型任務)下,多線程是比多進程好用的。

舉個例子:給你200W條url,需要你把每個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果肯定是很差的。為什麼呢?

例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):

1、單進程+單線程:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的

2、單進程+多線程:例如我們在這個進程中開了10個多線程,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這裡的實際執行是:線程1遇見了阻塞,CPU切換到線程2去執行,遇見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,所以速度上提升大約能到10倍(這裡忽略了線程切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是線程的切換也是有開銷的,所以不能無限的啟動多線程(開200W個線程肯定是不靠譜的)

3、多進程+多線程:這裡就厲害了,一般來說也有很多人用這個方法,多進程下,每個進程都能佔一個cpu,而多線程從一定程度上繞過了阻塞的等待,所以比單進程下的多線程又更好使了,例如我們開10個進程,每個進程里開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗肯定比切換20W個進程大得多,考慮到這部分開銷,所以是10倍以上)。

還有更好的方法嗎?答案是肯定的,它就是:

4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)

what:

協程是一種用戶級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

在並發編程中,協程與線程類似,每個協程表示一個執行單元,有自己的本地數據,與其它協程共享全局數據和其它資源。

why:

目前主流語言基本上都選擇了多線程作為並發設施,與線程相關的概念是搶佔式多任務(Preemptive multitasking),而與協程相關的是協作式多任務。

不管是進程還是線程,每次阻塞、切換都需要陷入系統調用(system call),先讓CPU跑操作系統的調度程序,然後再由調度程序決定該跑哪一個進程(線程)。

而且由於搶佔式調度執行順序無法確定的特點,使用線程時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程序也有同樣的優點)。

因為協程是用戶自己來編寫調度邏輯的,對CPU來說,協程其實是單線程,所以CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多線程。

how:

python裡面怎麼使用協程?答案是使用gevent,使用方法:看這裡

使用協程,可以不受線程開銷的限制,我嘗試過一次把20W條url放在單進程的協程里執行,完全沒問題。

所以最推薦的方法,是多進程+協程(可以看作是每個進程里都是單線程,而這個單線程是協程化的)

多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提升是巨大的。

小例子:

#-*- coding=utf-8 -*-import requestsfrom multiprocessing import Processimport geventfrom gevent import monkey; monkey.patch_all()import sysreload(sys)sys.setdefaultencoding(utf8)def fetch(url): try: s = requests.Session() r = s.get(url,timeout=1)#在這裡抓取頁面 r.close()#用完之後記得釋放資源,不然會出現很多殭屍進程 except Exception,e: print e return def process_start(tasks): gevent.joinall(tasks)#使用協程來執行def task_start(filepath,flag = 100000):#每10W條url啟動一個進程 with open(filepath,r) as reader:#從給定的文件中讀取url url = reader.readline().strip()#通過readline每次讀取一行,避免一次讀取全部url導致內存不夠用。 task_list = []#這個list用於存放協程任務 i = 0 #計數器,記錄添加了多少個url到協程隊列 while url!=: i += 1 task_list.append(gevent.spawn(fetch,url,queue))#每次讀取出url,將任務添加到協程隊列 if i == flag:#一定數量的url就啟動一個進程並執行 p = Process(target=process_start,args=(task_list,)) p.start() task_list = [] #重置協程隊列 i = 0 #重置計數器 url = reader.readline().strip() if task_list not []:#若退出循環後任務隊列里還有url剩餘 p = Process(target=process_start,args=(task_list,))#把剩餘的url全都放到最後這個進程來執行 p.start() if __name__ == __main__: task_start(./testData.txt)#讀取指定文件

細心的同學會發現:上面的例子中隱藏了一個問題:進程的數量會隨著url數量的增加而不斷增加,我們在這裡不使用進程池multiprocessing.Pool來控制進程數量的原因是multiprocessing.Pool和gevent有衝突不能同時使用,有興趣的同學可以研究一下為什麼會衝突。並且在已經知道url有多少條的情況下,我們完全可以通過控制每個進程處理的url數量來控制進程數。

另外還有一個問題:每個進程處理的url是累積的而不是獨立的,例如第一個進程會處理10W個,第二個進程會變成20W個,以此類推。最後定位到問題是gevent.joinall()導致的問題,有興趣的同學可以研究一下為什麼會這樣。不過這個問題的處理方案是:主進程只負責讀取url然後寫入到list中,在創建子進程的時候直接把list傳給子進程,由子進程自己去構建協程。這樣就不會出現累加的問題


推薦閱讀:

永不返回的exit函數是如何返回到main函數的調用者的?
協程和纖程的區別?
Gevent的協程,能夠非同步是為什麼呢?
from yield to await——Python協程演進過程(一)

TAG:协程 | Python | 高并发 |