標籤:

Python 多進程及進程間通信

Python 多進程及進程間通信

4 人贊了文章

python 因 GIL 的存在,處理計算密集型的任務時無法高效利用多核 CPU 的計算資源,這時就需要使用多進程來提高對 CPU 的資源利用。Python 多進程主要用 multiprocessing 模塊實現,提供了進程、進程池、隊列、管理者、共享數據、同步原語功能。

單進程版

為了便於演示 multiprocessing 的使用,我們使用素數檢查模擬計算密集型任務。單進程版本的代碼如下:

# encoding:utf8from math import sqrtCHECK_NUMBERS = 1000000def check_prime(n): if n == 2: return True for i in range(2, int(sqrt(n) + 1)): if n % i == 0: return False return Truedef run(): total = 0 for i in range(2, CHECK_NUMBERS + 1): if check_prime(i): total += 1 return totalif __name__ == "__main__": import timeit print(run()) print(timeit.timeit("run()", "from __main__ import run", number=1))

以上單進程的示例在我的計算機上輸出結果為:

78498

4.788863064308802

即 1000000 以內共有 78498 個素數,耗時約 4.8 秒。

進程與隊列

生成一個新的進程並啟動如下:

processes = multiprocessing.Process(target=worker, args=param)processes.start() # 啟動processes.join() # 主進程等待

子進程中要把結果返回可以通過 Queue 隊列返回,Queue 的操作主要是 get 和 put,支持阻塞(可以設置超時),非阻塞,也能監測隊列狀態是否為空。

以下代碼使用了多個進程,將 2 到 1000000 的大區間分為 4 個小區間,再分配給 4 個進程去分別計算小區間內共有多少素數,通過隊列返回。主進程最後把每個進程統計的素數個數相加即是最終結果。

# encoding:utf8import multiprocessingfrom math import sqrtfrom multi.single_thread_check_prime import check_primeCHECK_NUMBERS = 1000000NUM_PROCESSES = multiprocessing.cpu_count()def worker(start, end, result_mq): count prime numbers between start and end(exclusive) total = 0 for n in range(start, end): if check_prime(n): total += 1 result_mq.put(total)def divide_range(lower_end, upper_end, num_range): divide a larger range into smaller ranges step = int((upper_end - lower_end) / num_range) ranges = [] subrange_upper = lower_end while subrange_upper <= upper_end: subrange_lowerend = subrange_upper subrange_upper += step if subrange_upper <= upper_end: ranges.append((subrange_lowerend, subrange_upper)) continue if subrange_lowerend < upper_end: ranges.append((subrange_lowerend, upper_end)) return rangesdef run(): params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)] result_mq = multiprocessing.Queue() processes = [] for i in range(NUM_PROCESSES): process = multiprocessing.Process(target=worker, args=list(params[i]) + [result_mq] ) processes.append(process) process.start() for process in processes: process.join() total = 0 for i in range(NUM_PROCESSES): count = result_mq.get() total += count print(total) return totalif __name__ == "__main__": import timeit print(timeit.timeit("run()", "from __main__ import run", number=1))

使用多進程後的輸出結果為:

78498

1.6613719538839973

最終結果一致,約快了 2.9 倍。我的電腦 CPU 有 4 核,由於創建進程、進程間通信也都需要消耗資源,所以沒法達到理想的 4 倍,但也已經是不錯的提升了。

進程池

前面一個例子通過實例化 Process 的方法生成新的進程,再對每個進程調用 start 和 join,但其實通過進程池可以使代碼更簡潔。上面多進程的例子中,使用進程池後可以去除 Queue 隊列的使用。首先在 worker 中去除消息隊列 result_mq,直接返回結果如下:

def worker(sub_range): count prime numbers between start and end(exclusive) start, end = sub_range total = 0 for n in range(start, end): if check_prime(n): total += 1 return total

這時在 run 函數里就可以用進程池的 map 方法,修改後的 run:

def run(): params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)] pool = multiprocessing.Pool(processes=NUM_PROCESSES) result = pool.map(worker, params) total = sum(result) print(total) return total

由於不用在代碼里逐個生成子進程,同時 map 方法可以直接返回結果,run 函數從原來的 19 行縮短為了 8 行。進程池不僅有 map 方法,還有 map_async, apply, apply_async 等方法,帶 async 後綴的方法能夠實現非阻塞調用,主進程不必等到子進程運行完畢才往下運行。比如上面的 run 函數可以修改成如下:

def run(): params = divide_range(2, CHECK_NUMBERS + 1, 4) # [(2, 250001), (250001, 500000), (500000, 749999), (749999, 999998), (999998, 1000001)] pool = multiprocessing.Pool(processes=NUM_PROCESSES) result = pool.map_async(worker, params) pool.close() # do something else here ... pool.join() total = sum(result.get()) print(total) return total

管理者

管理者 Manager 可以存儲需要在進程間共享的對象,其支持的類型包括 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value, Array。相比於共用內存,Manager 可以讓不同機器上的進程通過網路共享對象。Manager 的 register 方法還可以自定義新的類型或者可調用對象,具體使用見文檔:Manager

sharedctypes

multiprocessing.sharedctypes 可以用來在共享內存中創建 c 類型數據,可以作為參數在創建子進程時傳入。其中主要用到的有 Value, RawValue, Array, RawArray 。Value, Array 通過參數可以設置是否需要鎖實現進程安全,如果對進程間同步沒有要求使用 RawValue 和 RawArray 則有更高的運行效率。實例化使用如下:

n = Value(i, 7)x = RawArray(『h』, 7)s = RawArray(『i』, (9, 2, 8))

第一個參數指定類型,類型編碼見array module,或者使用 ctypes 模塊,如 ctypes.c_double,第二個參數為值,對於 Array 和 RawArray,當第二個參數為整型時則為數組的長度,數組元素初始化為0。對於 linux 操作系統,只要把 n, x, s 設置成全局變數,即可在子進程中使用,無需顯式傳參,但 window 操作系統則需要在實例化 Process 時傳入參數。不可使用 Pool.map 把 n, x, s 作為參數傳遞,因為 map 使用的序列化而 n,x,s 不可序列化。具體使用見下一小節。

共享 numpy 數組

需要用到 numpy 時往往是數據量較大的場景,如果直接複製會造成大量內存浪費。共享 numpy 數組則是通過上面一節的 Array 實現,再用 numpy.frombuffer 以及 reshape 對共享的內存封裝成 numpy 數組,代碼如下:

# encoding:utf8import ctypesimport osimport multiprocessingimport numpy as npNUM_PROCESS = multiprocessing.cpu_count()def worker(index): main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(NUM_PROCESS, 10) pid = os.getpid() main_nparray[index, :] = pid return pidif __name__ == "__main__": shared_array_base = multiprocessing.Array( ctypes.c_double, NUM_PROCESS * 10, lock=False) pool = multiprocessing.Pool(processes=NUM_PROCESS) result = pool.map(worker, range(NUM_PROCESS)) main_nparray = np.frombuffer(shared_array_base, dtype=ctypes.c_double) main_nparray = main_nparray.reshape(NUM_PROCESS, 10) print main_nparray

mmap

mmap 把文件映射到內存,也可以用於進程間通信,可以像字元串或文件一樣對其進行操作,操作較為簡單。

import mmap# write a simple example filewith open("hello.txt", "wb") as f: f.write("Hello Python!
")with open("hello.txt", "r+b") as f: # memory-map the file, size 0 means whole file mm = mmap.mmap(f.fileno(), 0) # read content via standard file methods print mm.readline() # prints "Hello Python!" # read content via slice notation print mm[:5] # prints "Hello" # update content using slice notation; # note that new content must have same size mm[6:] = " world!
" # ... and read again using standard file methods mm.seek(0) print mm.readline() # prints "Hello world!" # close the map mm.close()

以上例子(轉載自文檔)為映射文件,對於進程間共享內存在 mmap 時第一個參數設置為 -1 實現匿名映射,只創建共享內存不映射到磁碟,見官網mmap。


推薦閱讀:

【我是解決安裝問題系列_1】Mac python basemap安裝
PyQt5系列教程(38):還是QQ模擬(QListWidget的使用)1
Day17
sklearn中的one hot編碼
暴漲1000%,幣盛鏈COL成為又一個橫空出世的暴漲幣

TAG:Python |