python線程通信與生產者消費者模式

本文主要講解生產者消費者模式,它基於線程之間的通信。

生產者消費者模式是指一部分程序用於生產數據,一部分程序用於處理數據,兩部分分別放在兩個線程中來運行。

舉幾個例子

  • 一個程序專門往列表中添加數字,另一個程序專門提取數字進行處理,二者共同維護這樣一個列表
  • 一個程序去抓取待爬取的url,另一個程序專門解析url將數據存儲到文件中,這相當於維護一個url隊列
  • 維護ip池,一個程序在消耗ip進行爬蟲,另一個程序看ip不夠用了就啟動開始抓取

我們可以想像到,這種情況不使用並發機制(如多線程)是難以實現的。

如果程序線性運行,只能做到先把所有url抓取到列表中,再遍歷列表解析數據;或者解析的過程中將新抓到的url加入列表,但是列表的增添和刪減並不是同時發生的。對於更複雜的機制,線程程序更是難以做到,比如維護url列表,當列表長度大於100時停止填入,小於50時再啟動開始填入。

本文結構

本文思路如下

  • 首先,兩個線程維護同一個列表,需要使用鎖保證對資源修改時不會出錯
  • threading模塊提供了Condition對象專門處理生產者消費者問題
  • 但是為了呈現由淺入深的過程,我們先用普通鎖來實現這個過程,通過考慮程序的不足,再使用Condition來解決,讓讀者更清楚Condition的用處
  • 下一步,python中的queue模塊封裝了Condition的特性,為我們提供了一個方便易用的隊列結構。用queue可以讓我們不需要了解鎖是如何設置的細節
  • 線程安全的概念解釋
  • 這個過程其實就是線程之間的通信,除了Condition,再補充一種通信方式Event

本文分為下面幾個部分

  • Lock與Condition的對比
  • 生產者與消費者的相互等待
  • Queue
  • 線程安全
  • Event

Lock與Condition的對比

下面我們實現這樣一個過程

  • 維護一個整數列表integer_list,共有兩個線程
  • Producer類對應一個線程,功能:隨機產生一個整數,加入整數列表之中
  • Consumer類對應一個線程,功能:從整數列表中pop掉一個整數
  • 通過time.sleep來表示兩個線程運行速度,設置成Producer產生的速度沒有Consumer消耗的快

代碼如下

import timeimport threadingimport randomclass Producer(threading.Thread): # 產生隨機數,將其加入整數列表 def __init__(self, lock, integer_list): threading.Thread.__init__(self) self.lock = lock self.integer_list = integer_list def run(self): while True: # 一直嘗試獲得鎖來添加整數 random_integer = random.randint(0, 100) with self.lock: self.integer_list.append(random_integer) print(integer list add integer {}.format(random_integer)) time.sleep(1.2 * random.random()) # sleep隨機時間,通過乘1.2來減慢生產的速度class Consumer(threading.Thread): def __init__(self, lock, integer_list): threading.Thread.__init__(self) self.lock = lock self.integer_list = integer_list def run(self): while True: # 一直嘗試去消耗整數 with self.lock: if self.integer_list: # 只有列表中有元素才pop integer = self.integer_list.pop() print(integer list lose integer {}.format(integer)) time.sleep(random.random()) else: print(there is no integer in the list)def main(): integer_list = [] lock = threading.Lock() th1 = Producer(lock, integer_list) th2 = Consumer(lock, integer_list) th1.start() th2.start()if __name__ == __main__: main()

程序會無休止地運行下去,一個產生,另一個消耗,截取前面一部分運行結果如下

integer list add integer 100integer list lose integer 100there is no integer in the listthere is no integer in the list... 幾百行一樣的 ...there is no integer in the listinteger list add integer 81integer list lose integer 81there is no integer in the listthere is no integer in the listthere is no integer in the list......

我們可以看到,整數每次產生都會被迅速消耗掉,消費者沒有東西可以處理,但是依然不停地詢問是否有東西可以處理(while True),這樣不斷地詢問會比較浪費CPU等資源(特別是詢問之後不只是print而是加入計算等)。

如果可以在第一次查詢到列表為空的時候就開始等待,直到列表不為空(收到通知而不是一遍一遍地查詢),資源開銷就可以節省很多。Condition對象就可以解決這個問題,它與一般鎖的區別在於,除了可以acquire release,還多了兩個方法wait notify,下面我們來看一下上面過程如何用Condition來實現

import timeimport threadingimport randomclass Producer(threading.Thread): def __init__(self, condition, integer_list): threading.Thread.__init__(self) self.condition = condition self.integer_list = integer_list def run(self): while True: random_integer = random.randint(0, 100) with self.condition: self.integer_list.append(random_integer) print(integer list add integer {}.format(random_integer)) self.condition.notify() time.sleep(1.2 * random.random())class Consumer(threading.Thread): def __init__(self, condition, integer_list): threading.Thread.__init__(self) self.condition = condition self.integer_list = integer_list def run(self): while True: with self.condition: if self.integer_list: integer = self.integer_list.pop() print(integer list lose integer {}.format(integer)) time.sleep(random.random()) else: print(there is no integer in the list) self.condition.wait()def main(): integer_list = [] condition = threading.Condition() th1 = Producer(condition, integer_list) th2 = Consumer(condition, integer_list) th1.start() th2.start()if __name__ == __main__: main()

相比於LockCondition只有兩個變化

  • 在生產出整數時notify通知wait的線程可以繼續了
  • 消費者查詢到列表為空時調用wait等待通知(notify

這樣結果就井然有序

integer list add integer 7integer list lose integer 7there is no integer in the listinteger list add integer 98integer list lose integer 98there is no integer in the listinteger list add integer 84integer list lose integer 84.....

生產者與消費者的相互等待

上面是最基本的使用,下面我們多實現一個功能:生產者一次產生三個數,在列表數量大於5的時候停止生產,小於4的時候再開始

import timeimport threadingimport randomclass Producer(threading.Thread): def __init__(self, condition, integer_list): threading.Thread.__init__(self) self.condition = condition self.integer_list = integer_list def run(self): while True: with self.condition: if len(self.integer_list) > 5: print(Producer start waiting) self.condition.wait() else: for _ in range(3): self.integer_list.append(random.randint(0, 100)) print(now {} after add .format(self.integer_list)) self.condition.notify() time.sleep(random.random())class Consumer(threading.Thread): def __init__(self, condition, integer_list): threading.Thread.__init__(self) self.condition = condition self.integer_list = integer_list def run(self): while True: with self.condition: if self.integer_list: integer = self.integer_list.pop() print(all {} lose {}.format(self.integer_list, integer)) time.sleep(random.random()) if len(self.integer_list) < 4: self.condition.notify() print("Producer dont need to wait") else: print(there is no integer in the list) self.condition.wait()def main(): integer_list = [] condition = threading.Condition() th1 = Producer(condition, integer_list) th2 = Consumer(condition, integer_list) th1.start() th2.start()if __name__ == __main__: main()

可以看下面的結果體會消長過程

now [33, 94, 68] after addall [33, 94] lose 68Producer dont need to waitnow [33, 94, 53, 4, 95] after addall [33, 94, 53, 4] lose 95all [33, 94, 53] lose 4Producer dont need to waitnow [33, 94, 53, 27, 36, 42] after addall [33, 94, 53, 27, 36] lose 42all [33, 94, 53, 27] lose 36all [33, 94, 53] lose 27Producer dont need to waitnow [33, 94, 53, 79, 30, 22] after addall [33, 94, 53, 79, 30] lose 22all [33, 94, 53, 79] lose 30now [33, 94, 53, 79, 60, 17, 34] after addall [33, 94, 53, 79, 60, 17] lose 34all [33, 94, 53, 79, 60] lose 17now [33, 94, 53, 79, 60, 70, 76, 21] after addall [33, 94, 53, 79, 60, 70, 76] lose 21Producer start waitingall [33, 94, 53, 79, 60, 70] lose 76all [33, 94, 53, 79, 60] lose 70all [33, 94, 53, 79] lose 60all [33, 94, 53] lose 79Producer dont need to waitall [33, 94] lose 53Producer dont need to waitall [33] lose 94Producer dont need to waitall [] lose 33Producer dont need to waitthere is no integer in the listnow [16, 67, 23] after addall [16, 67] lose 23Producer dont need to waitnow [16, 67, 49, 62, 50] after add

Queue

queue模塊內部實現了Condition,我們可以非常方便地使用生產者消費者模式

import timeimport threadingimport randomfrom queue import Queueclass Producer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: random_integer = random.randint(0, 100) self.queue.put(random_integer) print(add {}.format(random_integer)) time.sleep(random.random())class Consumer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: get_integer = self.queue.get() print(lose {}.format(get_integer)) time.sleep(random.random())def main(): queue = Queue() th1 = Producer(queue) th2 = Consumer(queue) th1.start() th2.start()if __name__ == __main__: main()

Queue

  • get方法會移除並賦值(相當於list中的pop),但是它在隊列為空的時候會被阻塞(wait)
  • put方法是往裡面添加值
  • 如果想設置隊列最大長度,初始化時這樣做queue = Queue(10)指定最大長度,超過這個長度就會被阻塞(wait)

使用Queue,全程不需要顯式地調用鎖,非常簡單易用。不過內置的queue有一個缺點在於不是可迭代對象,不能對它循環也不能查看其中的值,可以通過構造一個新的類來實現,詳見這裡。

下面消防之前Condition方法,用Queue實現生產者一次加3個,消費者一次消耗1個,每次都返回當前隊列內容,改寫代碼如下

import timeimport threadingimport randomfrom queue import Queue# 為了能查看隊列數據,繼承Queue定義一個類class ListQueue(Queue): def _init(self, maxsize): self.maxsize = maxsize self.queue = [] # 將數據存儲方式改為list def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()class Producer(threading.Thread): def __init__(self, myqueue): threading.Thread.__init__(self) self.myqueue = myqueue def run(self): while True: for _ in range(3): # 一個線程加入3個,注意:條件鎖時上在了put上而不是整個循環上 self.myqueue.put(random.randint(0, 100)) print(now {} after add .format(self.myqueue.queue)) time.sleep(random.random())class Consumer(threading.Thread): def __init__(self, myqueue): threading.Thread.__init__(self) self.myqueue = myqueue def run(self): while True: get_integer = self.myqueue.get() print(lose {}.format(get_integer), now total, self.myqueue.queue) time.sleep(random.random())def main(): queue = ListQueue(5) th1 = Producer(queue) th2 = Consumer(queue) th1.start() th2.start()if __name__ == __main__: main()

得到結果如下

now [79, 39, 64] after addlose 64 now total [79, 39]now [79, 39, 9, 42, 14] after addlose 14 now total [79, 39, 9, 42]lose 42 now total [79, 39, 9]lose 27 now total [79, 39, 9, 78]now [79, 39, 9, 78, 30] after addlose 30 now total [79, 39, 9, 78]lose 21 now total [79, 39, 9, 78]lose 100 now total [79, 39, 9, 78]now [79, 39, 9, 78, 90] after addlose 90 now total [79, 39, 9, 78]lose 72 now total [79, 39, 9, 78]lose 5 now total [79, 39, 9, 78]

上面限制隊列最大為5,有以下細節需要注意

  • 首先ListQueue類的構造:因為Queue類的源代碼中,put是調用了_putget調用_get_init也是一樣,所以我們重寫這三個方法就將數據存儲的類型和存取方式改變了。而其他部分鎖的設計都沒有變,也可以正常使用。改變之後我們就可以通過調用self.myqueue.queue來訪問這個列表數據
  • 輸出結果很怪異,並不是我們想要的。這是因為Queue類的源代碼中,如果隊列數量達到了maxsize,則put的操作wait,而put一次插入一個元素,所以經常插入一個等一次,循環無法一次運行完,而print是在插入三個之後才有的,所以很多時候其實加進去值了卻沒有在運行結果中顯示,所以結果看起來比較怪異。所以要想靈活使用還是要自己來定義鎖的位置,不能簡單依靠queue

另外,queue模塊中有其他類,分別實現先進先出、先進後出、優先順序等隊列,還有一些異常等,可以參考這篇文章和官網。

線程安全

講到了Queue就提一提線程安全。線程安全其實就可以理解成線程同步。

官方定義是:指某個函數、函數庫在多線程環境中被調用時,能夠正確地處理多個線程之間的共享變數,使程序功能正確完成。

我們常常提到的說法是,某某某是線程安全的,比如queue.Queue是線程安全的,而list不是。

根本原因在於前者實現了鎖原語,而後者沒有。

原語指由若干個機器指令構成的完成某種特定功能的一段程序,具有不可分割性;即原語的執行必須是連續的,在執行過程中不允許被中斷。

queue.Queue是線程安全的,即指對他進行寫入和提取的操作不會被中斷而導致錯誤,這也是在實現生產者消費者模式時,使用List就要特意去加鎖,而用這個隊列就不用的原因。

Event

EventCondition的區別在於:Condition = Event + Lock,所以Event非常簡單,只是一個沒有帶鎖的Condition,也是滿足一定條件等待或者執行,這裡不想說很多,只舉一個簡單的例子來看一下

import threadingimport timeclass MyThread(threading.Thread): def __init__(self, event): threading.Thread.__init__(self) self.event = event def run(self): print(first) self.event.wait() print(after wait)event = threading.Event()MyThread(event).start()print(before set)time.sleep(1)event.set()

可以看到結果

firstbefore set

先出現,1s之後才出現

after wait

專欄信息

專欄主頁:python編程

專欄目錄:目錄

版本說明:軟體及包版本說明


推薦閱讀:

TAG:Python | 多線程 | Python庫 |