【效率提升】Python中的並行運算

本文主要總結了用python做並行或並發計算的方法,介紹了ipyparallel、threading、multiprocessing這三個常用的package。

1.ipyparallel

1.1 安裝

conda install ipyparallelipcluster nbextension enable

之後,你可以在Jupyter上通過IPython Clusters的tab啟動一個集群,也可以用命令行實現:

ipcluster start -n

1.2 基本概念

from ipyparallel import Client

client可以連接到不同的集群的engine上,這些engine可以在同一個機器上也可以在不同的機器上。

rc = Client()rc.ids#[0, 1, 2, 3]

一個視圖(view)提供了訪問不同engine的方法,任務可以通過視圖提交到這些engine上,直接視圖(direct view)可以允許用戶精確的將不同任務發送給不同的engines,而load balanced view有點像multiprocessing里的pool對象。

Direct View

dv = rc[:]dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))#[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Load Balanced View

lv = rc.load_balanced_view()lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))#[50261.04884692176, 49966.438133877964, 49825.131766711958, 50131.890397676114, 49939.572135256865, 50162.518589135783, 50065.751713594087, 49922.903432015002, 49983.505820534752, 49942.245237953692]

使用Apply

除了map方法,我們還可以使用apply進行任務的分配:

rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)

同步任務和非同步任務

我們之前已經使用了map和apply方法,其中sync代表的是同步任務,我們也可以用map_async和apply_async實現非同步任務:

res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))res#<AsyncMapResult: <lambda>>

不過和同步任務不同的是,這時候res返回的是一個object,我們還要判斷其是否完成,並使用get()函數,才能得到其中具體的數值:

res.done()#Trueres.get()#[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

2. Threading

Threading是Python里一個可以實現多線程的包,下面用一個例子介紹其簡單用法:

import threading #用於線程執行的函數 def counter(n): cnt = 0; for i in xrange(n): for j in xrange(i): cnt += j; print cnt; if __name__ == "__main__": jobs = [] for i in range(5): th = threading.Thread(target=counter, args=(i,)); jobs.append(th) th.start();

其中,counter是我們定義的函數,我們的目的就是同時運行多個counter函數(但是這些函數輸入的參數不同)來實現速度的提升。

這段代碼可以直接copy然後自己進行修改,Thread就是幫助我們實現多線程的函數,而target屬性後面就是我們要並發的函數,而args則關乎輸入的參數。

不過python多線程有個討厭的限制:全局解釋器鎖(global interpreter lock)。這個鎖的意思是任一時間只能有一個線程使用解釋器,所以就變成了單CPU跑多個程序。這實際上叫「並發」,不是「並行」。這個鎖造成的問題就是如果有一個計算密集型的線程占著cpu,其他的線程都得等著。如果你的多個線程中有這麼一個線程,那就悲劇了,有時候多線程竟被生生被搞成串列運算。

3. multiprocessing

相比於Threading,multiprocessing是實現多進程的,其使用方法和threading完全一樣,只不過是將threading.Thread換成multiprocessing.Process。

import multiprocessing#用於進程執行的函數def worker(num): print "Worker:", num returnif __name__ == "__main__": jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()

在multiprocessing還可以引入進程池(pool)的概念:

from multiprocessing import Poolimport os, timedef long_time_task(name): print "Run task %s (%s)..." % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print "Task %s runs %0.2f seconds." % (name, (end - start))if __name__=="__main__": print "Parent process %s." % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print "Waiting for all subprocesses done..." p.close() p.join() print "All subprocesses done."

最後

想要了解關於R、Python、數據科學以及機器學習更多內容。

請關注我的專欄:Data Science with R&Python, 以及關注我的知乎賬號:文兄

推薦閱讀:

Kaggle入門系列(三)Titanic初試身手
營銷轉數據,兩年半到P7,我都做了哪些事兒?
權力的遊戲:網紅CEO之崛起
從用戶梳理到構建體系再到數據分析
這跟特朗普沒關係!中國製造業開始衰退?

TAG:Python | Python入门 | 数据分析 |