【效率提升】Python中的並行運算
本文主要總結了用python做並行或並發計算的方法,介紹了ipyparallel、threading、multiprocessing這三個常用的package。
1.ipyparallel
1.1 安裝
conda install ipyparallelipcluster nbextension enable
之後,你可以在Jupyter上通過IPython Clusters的tab啟動一個集群,也可以用命令行實現:
ipcluster start -n
1.2 基本概念
from ipyparallel import Client
client可以連接到不同的集群的engine上,這些engine可以在同一個機器上也可以在不同的機器上。
rc = Client()rc.ids#[0, 1, 2, 3]
一個視圖(view)提供了訪問不同engine的方法,任務可以通過視圖提交到這些engine上,直接視圖(direct view)可以允許用戶精確的將不同任務發送給不同的engines,而load balanced view有點像multiprocessing里的pool對象。
Direct View
dv = rc[:]dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))#[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Load Balanced View
lv = rc.load_balanced_view()lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))#[50261.04884692176, 49966.438133877964, 49825.131766711958, 50131.890397676114, 49939.572135256865, 50162.518589135783, 50065.751713594087, 49922.903432015002, 49983.505820534752, 49942.245237953692]
使用Apply
除了map方法,我們還可以使用apply進行任務的分配:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)
同步任務和非同步任務
我們之前已經使用了map和apply方法,其中sync代表的是同步任務,我們也可以用map_async和apply_async實現非同步任務:
res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))res#<AsyncMapResult: <lambda>>
不過和同步任務不同的是,這時候res返回的是一個object,我們還要判斷其是否完成,並使用get()函數,才能得到其中具體的數值:
res.done()#Trueres.get()#[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
2. Threading
Threading是Python里一個可以實現多線程的包,下面用一個例子介紹其簡單用法:
import threading #用於線程執行的函數 def counter(n): cnt = 0; for i in xrange(n): for j in xrange(i): cnt += j; print cnt; if __name__ == "__main__": jobs = [] for i in range(5): th = threading.Thread(target=counter, args=(i,)); jobs.append(th) th.start();
其中,counter是我們定義的函數,我們的目的就是同時運行多個counter函數(但是這些函數輸入的參數不同)來實現速度的提升。
這段代碼可以直接copy然後自己進行修改,Thread就是幫助我們實現多線程的函數,而target屬性後面就是我們要並發的函數,而args則關乎輸入的參數。
不過python多線程有個討厭的限制:全局解釋器鎖(global interpreter lock)。這個鎖的意思是任一時間只能有一個線程使用解釋器,所以就變成了單CPU跑多個程序。這實際上叫「並發」,不是「並行」。這個鎖造成的問題就是如果有一個計算密集型的線程占著cpu,其他的線程都得等著。如果你的多個線程中有這麼一個線程,那就悲劇了,有時候多線程竟被生生被搞成串列運算。
3. multiprocessing
相比於Threading,multiprocessing是實現多進程的,其使用方法和threading完全一樣,只不過是將threading.Thread換成multiprocessing.Process。
import multiprocessing#用於進程執行的函數def worker(num): print "Worker:", num returnif __name__ == "__main__": jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
在multiprocessing還可以引入進程池(pool)的概念:
from multiprocessing import Poolimport os, timedef long_time_task(name): print "Run task %s (%s)..." % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print "Task %s runs %0.2f seconds." % (name, (end - start))if __name__=="__main__": print "Parent process %s." % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print "Waiting for all subprocesses done..." p.close() p.join() print "All subprocesses done."
最後
想要了解關於R、Python、數據科學以及機器學習更多內容。
請關注我的專欄:Data Science with R&Python, 以及關注我的知乎賬號:文兄
推薦閱讀:
※Kaggle入門系列(三)Titanic初試身手
※營銷轉數據,兩年半到P7,我都做了哪些事兒?
※權力的遊戲:網紅CEO之崛起
※從用戶梳理到構建體系再到數據分析
※這跟特朗普沒關係!中國製造業開始衰退?