草根學Python(十三)線程和進程

前言

拖了好久,不過還是得堅持。喜歡本文的話可以加下公眾號【於你供讀】。

目錄

線程與進程

線程與進程是操作系統裡面的術語,簡單來講,每一個應用程序都有一個自己的進程。

操作系統會為這些進程分配一些執行資源,例如內存空間等。在進程中,又可以創建一些線程,他們共享這些內存空間,並由操作系統調用,以便並行計算。

我們都知道現代操作系統比如 Mac OS X,UNIX,Linux,Windows 等可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽敲代碼,一邊用 Markdown 寫博客,這就是多任務,至少同時有 3 個任務正在運行。當然還有很多任務悄悄地在後台同時運行著,只是桌面上沒有顯示而已。對於操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開 PyCharm 就是一個啟動了一個 PtCharm 進程,打開 Markdown 就是啟動了一個 Md 的進程。

雖然現在多核 CPU 已經非常普及了。可是由於 CPU 執行代碼都是順序執行的,這時候我們就會有疑問,單核 CPU 是怎麼執行多任務的呢?

其實就是操作系統輪流讓各個任務交替執行,任務 1 執行 0.01 秒,切換到任務 2 ,任務 2 執行 0.01 秒,再切換到任務 3 ,執行 0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於 CPU的執行速度實在是太快了,我們肉眼和感覺上沒法識別出來,就像所有任務都在同時執行一樣。

真正的並行執行多任務只能在多核 CPU 上實現,但是,由於任務數量遠遠多於 CPU 的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。

有些進程不僅僅只是干一件事的啊,比如瀏覽器,我們可以播放時視頻,播放音頻,看文章,編輯文章等等,其實這些都是在瀏覽器進程中的子任務。在一個進程內部,要同時干多件事,就需要同時運行多個「子任務」,我們把進程內的這些「子任務」稱為線程(Thread)。

由於每個進程至少要干一件事,所以,一個進程至少有一個線程。當然,一個進程也可以有多個線程,多個線程可以同時執行,多線程的執行方式和多進程是一樣的,也是由操作系統在多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執行一樣。

那麼在 Python 中我們要同時執行多個任務怎麼辦?

有兩種解決方案:

一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執行多個任務。

還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執行多個任務。

當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執行的任務就更多了,當然這種模型更複雜,實際很少採用。

總結一下就是,多任務的實現有3種方式:

  • 多進程模式;
  • 多線程模式;
  • 多進程+多線程模式。

同時執行多個任務通常各個任務之間並不是沒有關聯的,而是需要相互通信和協調,有時,任務 1 必須暫停等待任務 2 完成後才能繼續執行,有時,任務 3 和任務 4 又不能同時執行,所以,多進程和多線程的程序的複雜度要遠遠高於我們前面寫的單進程單線程的程序。

因為複雜度高,調試困難,所以,不是迫不得已,我們也不想編寫多任務。但是,有很多時候,沒有多任務還真不行。想想在電腦上看電影,就必須由一個線程播放視頻,另一個線程播放音頻,否則,單線程實現的話就只能先把視頻播放完再播放音頻,或者先把音頻播放完再播放視頻,這顯然是不行的。

多線程編程

其實創建線程之後,線程並不是始終保持一個狀態的,其狀態大概如下:

  • New 創建
  • Runnable 就緒。等待調度
  • Running 運行
  • Blocked 阻塞。阻塞可能在 Wait Locked Sleeping
  • Dead 消亡

線程有著不同的狀態,也有不同的類型。大致可分為:

  • 主線程
  • 子線程
  • 守護線程(後台線程)
  • 前台線程

簡單了解完這些之後,我們開始看看具體的代碼使用了。

1、線程的創建

Python 提供兩個模塊進行多線程的操作,分別是 thread 和 threading

前者是比較低級的模塊,用於更底層的操作,一般應用級別的開發不常用。

#!/usr/bin/env python3n# -*- coding: UTF-8 -*-nnimport timenimport threadingnnnclass MyThread(threading.Thread):n def run(self):n for i in range(5):n print(thread {}, @number: {}.format(self.name, i))n time.sleep(1)nnndef main():n print("Start main threading")nn # 創建三個線程n threads = [MyThread() for i in range(3)]n # 啟動三個線程n for t in threads:n t.start()nn print("End Main threading")nnnif __name__ == __main__:n main()n

運行結果:

Start main threadingnthread Thread-1, @number: 0nthread Thread-2, @number: 0nthread Thread-3, @number: 0nEnd Main threadingnthread Thread-2, @number: 1nthread Thread-1, @number: 1nthread Thread-3, @number: 1nthread Thread-1, @number: 2nthread Thread-3, @number: 2nthread Thread-2, @number: 2nthread Thread-2, @number: 3nthread Thread-3, @number: 3nthread Thread-1, @number: 3nthread Thread-3, @number: 4nthread Thread-2, @number: 4nthread Thread-1, @number: 4n

注意喔,這裡不同的環境輸出的結果肯定是不一樣的。

2、線程合併(join方法)

上面的示例列印出來的結果來看,主線程結束後,子線程還在運行。那麼我們需要主線程要等待子線程運行完後,再退出,要怎麼辦呢?

這時候,就需要用到 join 方法了。

在上面的例子,新增一段代碼,具體如下:

#!/usr/bin/env python3n# -*- coding: UTF-8 -*-nnimport timenimport threadingnnnclass MyThread(threading.Thread):n def run(self):n for i in range(5):n print(thread {}, @number: {}.format(self.name, i))n time.sleep(1)nnndef main():n print("Start main threading")nn # 創建三個線程n threads = [MyThread() for i in range(3)]n # 啟動三個線程n for t in threads:n t.start()nn # 一次讓新創建的線程執行 joinn for t in threads:n t.join()nn print("End Main threading")nnnif __name__ == __main__:n main()n

從列印的結果,可以清楚看到,相比上面示例列印出來的結果,主線程是在等待子線程運行結束後才結束的。

Start main threadingnthread Thread-1, @number: 0nthread Thread-2, @number: 0nthread Thread-3, @number: 0nthread Thread-1, @number: 1nthread Thread-3, @number: 1nthread Thread-2, @number: 1nthread Thread-2, @number: 2nthread Thread-1, @number: 2nthread Thread-3, @number: 2nthread Thread-2, @number: 3nthread Thread-1, @number: 3nthread Thread-3, @number: 3nthread Thread-3, @number: 4nthread Thread-2, @number: 4nthread Thread-1, @number: 4nEnd Main threadingn

3、線程同步與互斥鎖

使用線程載入獲取數據,通常都會造成數據不同步的情況。當然,這時候我們可以給資源進行加鎖,也就是訪問資源的線程需要獲得鎖才能訪問。

其中 threading 模塊給我們提供了一個 Lock 功能。

lock = threading.Lock()n

在線程中獲取鎖

lock.acquire()n

使用完成後,我們肯定需要釋放鎖

lock.release()n

當然為了支持在同一線程中多次請求同一資源,Python 提供了可重入鎖(RLock)。RLock 內部維護著一個 Lock 和一個 counter 變數,counter 記錄了 acquire 的次數,從而使得資源可以被多次 require。直到一個線程所有的 acquire 都被 release,其他的線程才能獲得資源。

那麼怎麼創建重入鎖呢?也是一句代碼的事情:

r_lock = threading.RLock()n

4、Condition 條件變數

實用鎖可以達到線程同步,但是在更複雜的環境,需要針對鎖進行一些條件判斷。Python 提供了 Condition 對象。使用 Condition 對象可以在某些事件觸發或者達到特定的條件後才處理數據,Condition 除了具有 Lock 對象的 acquire 方法和 release 方法外,還提供了 wait 和 notify 方法。線程首先 acquire 一個條件變數鎖。如果條件不足,則該線程 wait,如果滿足就執行線程,甚至可以 notify 其他線程。其他處於 wait 狀態的線程接到通知後會重新判斷條件。

其中條件變數可以看成不同的線程先後 acquire 獲得鎖,如果不滿足條件,可以理解為被扔到一個( Lock 或 RLock )的 waiting 池。直達其他線程 notify 之後再重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

該模式常用於生產者消費者模式,具體看看下面在線購物買家和賣家的示例:

#!/usr/bin/env python3n# -*- coding: UTF-8 -*-nnimport threading, timennnclass Consumer(threading.Thread):n def __init__(self, cond, name):n # 初始化n super(Consumer, self).__init__()n self.cond = condn self.name = namenn def run(self):n # 確保先運行Seeker中的方法n time.sleep(1)n self.cond.acquire()n print(self.name + : 我這兩件商品一起買,可以便宜點嗎)n self.cond.notify()n self.cond.wait()n print(self.name + : 我已經提交訂單了,你修改下價格)n self.cond.notify()n self.cond.wait()n print(self.name + : 收到,我支付成功了)n self.cond.notify()n self.cond.release()n print(self.name + : 等待收貨)nnnclass Producer(threading.Thread):n def __init__(self, cond, name):n super(Producer, self).__init__()n self.cond = condn self.name = namenn def run(self):n self.cond.acquire()n # 釋放對瑣的佔用,同時線程掛起在這裡,直到被 notify 並重新佔有瑣。n self.cond.wait()n print(self.name + : 可以的,你提交訂單吧)n self.cond.notify()n self.cond.wait()n print(self.name + : 好了,已經修改了)n self.cond.notify()n self.cond.wait()n print(self.name + : 嗯,收款成功,馬上給你發貨)n self.cond.release()n print(self.name + : 發貨商品)nnncond = threading.Condition()nconsumer = Consumer(cond, 買家(兩點水))nproducer = Producer(cond, 賣家(三點水))nconsumer.start()nproducer.start()n

輸出的結果如下:

買家(兩點水): 我這兩件商品一起買,可以便宜點嗎n賣家(三點水): 可以的,你提交訂單吧n買家(兩點水): 我已經提交訂單了,你修改下價格n賣家(三點水): 好了,已經修改了n買家(兩點水): 收到,我支付成功了n買家(兩點水): 等待收貨n賣家(三點水): 嗯,收款成功,馬上給你發貨n賣家(三點水): 發貨商品n

5、線程間通信

如果程序中有多個線程,這些線程避免不了需要相互通信的。那麼我們怎樣在這些線程之間安全地交換信息或數據呢?

從一個線程向另一個線程發送數據最安全的方式可能就是使用 queue 庫中的隊列了。創建一個被多個線程共享的 Queue 對象,這些線程通過使用 put() 和 get() 操作來向隊列中添加或者刪除元素。

# -*- coding: UTF-8 -*-nfrom queue import Queuenfrom threading import ThreadnnisRead = Truennndef write(q):n # 寫數據進程n for value in [兩點水, 三點水, 四點水]:n print(寫進 Queue 的值為:{0}.format(value))n q.put(value)nnndef read(q):n # 讀取數據進程n while isRead:n value = q.get(True)n print(從 Queue 讀取的值為:{0}.format(value))nnnif __name__ == __main__:n q = Queue()n t1 = Thread(target=write, args=(q,))n t2 = Thread(target=read, args=(q,))n t1.start()n t2.start()n

輸出的結果如下:

寫進 Queue 的值為:兩點水n寫進 Queue 的值為:三點水n從 Queue 讀取的值為:兩點水n寫進 Queue 的值為:四點水n從 Queue 讀取的值為:三點水n從 Queue 讀取的值為:四點水n

Python 還提供了 Event 對象用於線程間通信,它是由線程設置的信號標誌,如果信號標誌位真,則其他線程等待直到信號接觸。

Event 對象實現了簡單的線程通信機制,它提供了設置信號,清楚信號,等待等用於實現線程間的通信。

  • 設置信號

使用 Event 的 set() 方法可以設置 Event 對象內部的信號標誌為真。Event 對象提供了 isSe() 方法來判斷其內部信號標誌的狀態。當使用 event 對象的 set() 方法後,isSet()方法返回真

  • 清除信號

使用 Event 對象的 clear() 方法可以清除 Event 對象內部的信號標誌,即將其設為假,當使用 Event 的 clear 方法後,isSet() 方法返回假

  • 等待

Event 對象 wait 的方法只有在內部信號為真的時候才會很快的執行並完成返回。當 Event 對象的內部信號標誌位假時,則 wait 方法一直等待到其為真時才返回。

示例:

# -*- coding: UTF-8 -*-nnimport threadingnnnclass mThread(threading.Thread):n def __init__(self, threadname):n threading.Thread.__init__(self, name=threadname)nn def run(self):n # 使用全局Event對象n global eventn # 判斷Event對象內部信號標誌n if event.isSet():n event.clear()n event.wait()n print(self.getName())n else:n print(self.getName())n # 設置Event對象內部信號標誌n event.set()nn# 生成Event對象nevent = threading.Event()n# 設置Event對象內部信號標誌nevent.set()nt1 = []nfor i in range(10):n t = mThread(str(i))n # 生成線程列表n t1.append(t)nnfor i in t1:n # 運行線程n i.start()n

輸出的結果如下:

1n0n3n2n5n4n7n6n9n8n

6、後台線程

默認情況下,主線程退出之後,即使子線程沒有 join。那麼主線程結束後,子線程也依然會繼續執行。如果希望主線程退出後,其子線程也退出而不再執行,則需要設置子線程為後台線程。Python 提供了 setDeamon 方法。

進程

Python 中的多線程其實並不是真正的多線程,如果想要充分地使用多核 CPU 的資源,在 Python 中大部分情況需要使用多進程。Python 提供了非常好用的多進程包 multiprocessing,只需要定義一個函數,Python 會完成其他所有事情。藉助這個包,可以輕鬆完成從單進程到並發執行的轉換。multiprocessing 支持子進程、通信和共享數據、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。

1、類 Process

創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]])

  • target 表示調用對象
  • args 表示調用對象的位置參數元組
  • kwargs表示調用對象的字典
  • name為別名
  • group實質上不使用

下面看一個創建函數並將其作為多個進程的例子:

#!/usr/bin/env python3n# -*- coding: UTF-8 -*-nnimport multiprocessingnimport timennndef worker(interval, name):n print(name + 【start】)n time.sleep(interval)n print(name + 【end】)nnnif __name__ == "__main__":n p1 = multiprocessing.Process(target=worker, args=(2, 兩點水1))n p2 = multiprocessing.Process(target=worker, args=(3, 兩點水2))n p3 = multiprocessing.Process(target=worker, args=(4, 兩點水3))nn p1.start()n p2.start()n p3.start()nn print("The number of CPU is:" + str(multiprocessing.cpu_count()))n for p in multiprocessing.active_children():n print("child p.name:" + p.name + "tp.id" + str(p.pid))n print("END!!!!!!!!!!!!!!!!!")n

輸出的結果:

2、把進程創建成類

當然我們也可以把進程創建成一個類,如下面的例子,當進程 p 調用 start() 時,自動調用 run() 方法。

# -*- coding: UTF-8 -*-nnimport multiprocessingnimport timennnclass ClockProcess(multiprocessing.Process):n def __init__(self, interval):n multiprocessing.Process.__init__(self)n self.interval = intervalnn def run(self):n n = 5n while n > 0:n print("當前時間: {0}".format(time.ctime()))n time.sleep(self.interval)n n -= 1nnnif __name__ == __main__:n p = ClockProcess(3)n p.start()n

輸出結果如下:

3、daemon 屬性

想知道 daemon 屬性有什麼用,看下下面兩個例子吧,一個加了 daemon 屬性,一個沒有加,對比輸出的結果:

沒有加 deamon 屬性的例子:

# -*- coding: UTF-8 -*-nimport multiprocessingnimport timennndef worker(interval):n print(工作開始時間:{0}.format(time.ctime()))n time.sleep(interval)n print(工作結果時間:{0}.format(time.ctime()))nnnif __name__ == __main__:n p = multiprocessing.Process(target=worker, args=(3,))n p.start()n print(【EMD】)n

輸出結果:

【EMD】n工作開始時間:Mon Oct 9 17:47:06 2017n工作結果時間:Mon Oct 9 17:47:09 2017n

在上面示例中,進程 p 添加 daemon 屬性:

# -*- coding: UTF-8 -*-nnimport multiprocessingnimport timennndef worker(interval):n print(工作開始時間:{0}.format(time.ctime()))n time.sleep(interval)n print(工作結果時間:{0}.format(time.ctime()))nnnif __name__ == __main__:n p = multiprocessing.Process(target=worker, args=(3,))n p.daemon = Truen p.start()n print(【EMD】)n

輸出結果:

【EMD】n

根據輸出結果可見,如果在子進程中添加了 daemon 屬性,那麼當主進程結束的時候,子進程也會跟著結束。所以沒有列印子進程的信息。

4、join 方法

結合上面的例子繼續,如果我們想要讓子線程執行完該怎麼做呢?

那麼我們可以用到 join 方法,join 方法的主要作用是:阻塞當前進程,直到調用 join 方法的那個進程執行完,再繼續執行當前進程。

因此看下加了 join 方法的例子:

import multiprocessingnimport timennndef worker(interval):n print(工作開始時間:{0}.format(time.ctime()))n time.sleep(interval)n print(工作結果時間:{0}.format(time.ctime()))nnnif __name__ == __main__:n p = multiprocessing.Process(target=worker, args=(3,))n p.daemon = Truen p.start()n p.join()n print(【EMD】)n

輸出的結果:

工作開始時間:Tue Oct 10 11:30:08 2017n工作結果時間:Tue Oct 10 11:30:11 2017n【EMD】n

5、Pool

如果需要很多的子進程,難道我們需要一個一個的去創建嗎?

當然不用,我們可以使用進程池的方法批量創建子進程。

例子如下:

# -*- coding: UTF-8 -*-nnfrom multiprocessing import Poolnimport os, time, randomnnndef long_time_task(name):n print(進程的名稱:{0} ;進程的PID: {1} .format(name, os.getpid()))n start = time.time()n time.sleep(random.random() * 3)n end = time.time()n print(進程 {0} 運行了 {1}.format(name, (end - start)))nnnif __name__ == __main__:n print(主進程的 PID:{0}.format(os.getpid()))n p = Pool(4)n for i in range(6):n p.apply_async(long_time_task, args=(i,))n p.close()n # 等待所有子進程結束後在關閉主進程n p.join()n print(【End】)n

輸出的結果如下:

主進程的 PID:7256n進程的名稱:0 ;進程的PID: 1492 n進程的名稱:1 ;進程的PID: 12232 n進程的名稱:2 ;進程的PID: 4332 n進程的名稱:3 ;進程的PID: 11604 n進程 2 運行了 0.6500370502471924 秒n進程的名稱:4 ;進程的PID: 4332 n進程 1 運行了 1.0830621719360352 秒n進程的名稱:5 ;進程的PID: 12232 n進程 5 運行了 0.029001712799072266 秒n進程 4 運行了 0.9720554351806641 秒n進程 0 運行了 2.3181326389312744 秒n進程 3 運行了 2.5331451892852783 秒n【End】n

這裡有一點需要注意: Pool 對象調用 join() 方法會等待所有子進程執行完畢,調用 join() 之前必須先調用 close() ,調用close() 之後就不能繼續添加新的 Process 了。

請注意輸出的結果,子進程 0,1,2,3是立刻執行的,而子進程 4 要等待前面某個子進程完成後才執行,這是因為 Pool 的默認大小在我的電腦上是 4,因此,最多同時執行 4 個進程。這是 Pool 有意設計的限制,並不是操作系統的限制。如果改成:

p = Pool(5)n

就可以同時跑 5 個進程。

6、進程間通信

Process 之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python 的 multiprocessing 模塊包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換數據。

以 Queue 為例,在父進程中創建兩個子進程,一個往 Queue 里寫數據,一個從 Queue 里讀數據:

#!/usr/bin/env python3n# -*- coding: UTF-8 -*-nnfrom multiprocessing import Process, Queuenimport os, time, randomnnndef write(q):n # 寫數據進程n print(寫進程的PID:{0}.format(os.getpid()))n for value in [兩點水, 三點水, 四點水]:n print(寫進 Queue 的值為:{0}.format(value))n q.put(value)n time.sleep(random.random())nnndef read(q):n # 讀取數據進程n print(讀進程的PID:{0}.format(os.getpid()))n while True:n value = q.get(True)n print(從 Queue 讀取的值為:{0}.format(value))nnnif __name__ == __main__:n # 父進程創建 Queue,並傳給各個子進程n q = Queue()n pw = Process(target=write, args=(q,))n pr = Process(target=read, args=(q,))n # 啟動子進程 pwn pw.start()n # 啟動子進程prn pr.start()n # 等待pw結束:n pw.join()n # pr 進程里是死循環,無法等待其結束,只能強行終止n pr.terminate()n

輸出的結果為:

讀進程的PID:13208n寫進程的PID:10864n寫進 Queue 的值為:兩點水n從 Queue 讀取的值為:兩點水n寫進 Queue 的值為:三點水n從 Queue 讀取的值為:三點水n寫進 Queue 的值為:四點水n從 Queue 讀取的值為:四點水n

推薦閱讀:

樹莓派Raspberry區域網視頻小車教程
python 3.5 中 PEP0484 新加入的 Type Hints 的使用方法是什麼?
神奇的yield
Python:Pythonic Code Styles
day1 漢諾塔

TAG:Python | Python入门 | Python教程 |