第二代NumPy?阿里開源超大規模矩陣計算框架Mars

Mars 是由阿里雲高級軟體工程師秦續業等人開發的一個基於張量的大規模數據計算的統一框架,目前它已在 GitHub 上開源。該工具能用於多個工作站,而且即使在單塊 CPU 的情況下,它的矩陣運算速度也比 NumPy(MKL)快。

如下圖所示,開發者給出了簡單的性能對比。對 36 億的數據的每個元素加一乘以二,測試隨工作站數量增加的計算時間變化。紅色的叉代表單機 NumPy。Mars 在單機上就能利用多核來加速,多機接近理想值。目前,在張量/矩陣這塊,開發者尚未給出標準的 benchmark。

  • 項目地址:github.com/mars-project
  • 文檔地址:mars-project.readthedocs.io

Mars 張量

Mars 張量提供了類似於 NumPy 的介面。

以下是 Mars 支持的 NumPy 介面的子集概覽。

  • 演算法和數學: +, -, *, /, exp, log 等等。
  • 沿軸縮減(sum, max, argmax 等等)。
  • 大多數數組創建常式(empty, ones_like, diag 等等)。此外,Mars 並不支持在 GPU 上創建數組/張量,但仍然支持創建稀疏張量。
  • 大多數數組操作常式(reshape, rollaxis, concatenate 等等)。
  • 基本索引(通過 ints, slices, newaxes 和 Ellipsis 執行索引)。
  • 沿列表或 numpy 數組的單個軸的 fancy index,例如 x[[1, 4, 8], :5]。
  • 元素運算的通用函數。
  • 線性代數函數,包括乘積(dot, matmul 等等)和分解(cholesky, svd 等等)。

然而,Mars 沒有實現整個 Numpy 介面,時間限制還是主要障礙。下面列出了未實現的主要功能:

  • 未知形狀的張量不支持所有運算。
  • 只實現了 np.linalg 的少量子集。
  • 類似 sort 的難以並行執行的運算沒有實現。
  • Mars 張量並沒有實現類似 tolist 和 nditer 等的介面,因為其在大型張量上的迭代或循環是非常低效的。

架構

Mars 為張量的分散式執行提供了一個庫。分散式應用程序使用 mars.actors 提供的 actor 模型構建,由三部分組成:調度器、工作站和 Web 服務。

用戶使用張量構建的圖形提交任務。Web 服務接收張量圖並將它們發送到調度器,其中圖形被編譯成操作數(operand)圖,在提交給工作站之前進行分析和分區。然後,調度器創建並分散操作數 actor,這些操作數 actor 在給定一致哈希的情況下控制其他調度器上的工作站任務執行,然後激活操作數並以拓撲順序執行。當執行與終止張量相關的所有操作數時,圖形將被標記為已完成,客戶端可以從調度器代理的工作站中提取結果。整個過程如下圖所示。

圖準備

當張量圖提交到 Mars 調度器時,給定在數據源中傳遞的塊(chunks)參數時將生成由操作數和塊構成的圖。

圖構成

在將張量圖平鋪到塊圖之後,我們將組合相鄰節點以減小圖形尺寸以及利用加速庫(例如 numexpr)。目前,Mars 僅合併形成沒有分支的單個鏈的操作數。例如,執行代碼時:

import mars.tensor as mt

a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars 將把操作數 ADD 和 SUM 組合成一個 FUSE 節點。RAND 操作數被排除,因為它們不與 ADD 和 SUM 形成一條線。

調度策略

當正在執行操作數圖時,正確選擇執行順序將減少存儲在集群中的數據總量,從而降低塊被溢出到磁碟中的可能性。正確選擇工作站還可以減少執行中轉移所需的數據量。

操作數選擇

正確的執行順序可以顯著減少集群中存儲的對象數量。我們在下圖中顯示樹縮減的示例,其中橢圓表示操作數,矩形表示塊。紅色表示正在執行操作數,藍色表示操作數已準備好執行。綠色表示存儲塊,而灰色表示釋放塊或操作數。假設我們有 2 個工作站,所有操作數的工作負載是相同的。兩個圖都顯示了在 5 個時間單位後執行的一個操作數選擇策略。左圖顯示了以層次結構順序執行節點的場景,而右圖顯示了以深度優先順序執行圖表的場景。左圖的策略將 6 個塊存儲在集群中,而右圖僅存儲 2 個。

鑒於我們的目標是在執行期間減少存儲在集群中的數據量,我們在準備執行時為操作數設置優先順序:

1. 深度較大的操作數應較早執行;

2. 較深操作數所需的操作數應較早執行;

3. 首先執行輸出尺寸較小的操作數。

操作數狀態

Mars 中的每個操作數都由 OperandActor 獨立調度。執行被設計為狀態轉換過程。我們為每個狀態分配一個狀態處理函數來控制執行過程。最初初始化 actor 時,每個操作數都處於 UNSCHEDULED 狀態。當滿足某些條件時,操作數切換到另一個狀態並執行相應的操作。如果從 KV 存儲中恢復操作數,則將載入調度器崩潰時的狀態並恢復狀態。狀態轉換圖如下所示:

工作站中的執行

當一個操作數在工作站中執行時,它將首先分配內存。然後載入來自其他工作站或已經溢出到磁碟的文件的數據。之後,所需的所有數據都在內存中,並且可以開始計算。計算完成後,工作站會將結果放入共享內存緩存中。這四種狀態可以在下圖中看到。

易於向內擴展和向外擴展

Mars 可以向內擴展到單機,也可以向外擴展到有數千台計算機的集群。本地和分散式版本都共享相同的代碼,因此隨著數據增加從單機遷移到集群是很簡單的。

在單機上運行包括基於線程的調度,以及捆綁整個分散式組件的本地集群調度。Mars 也很容易通過啟動集群中不同機器上的 mars 分散式運行時的不同組件來擴展到一個集群。

線程化

execute 方法將默認在單機的基於線程的調度器上運行。

import mars.tensor as mt

a = mt.ones((10, 10))
a.execute()

用戶可以明確地創建一個 session。

from mars.session import new_session

session = new_session()
session.run(a + 1)
(a * 2).execute(session=session)
# session will be released when out of with statementwith new_session() as session2:
session2.run(a / 3)

本地集群

用戶可以從單機上的分散式運行時啟動本地集群,本地集群模式需要 mars 分散式版本。

from mars.deploy.local import new_cluster
# cluster will create a session and set it as default
cluster = new_cluster()
# run on the local cluster
(a + 1).execute()
# create a session explicitly by specifying the clusters endpoint
session = new_session(cluster.endpoint)
session.run(a * 3)

分散式

在集群中每個節點都安裝了分散式版本之後,一個節點可以被選為調度器,另一個節點作為 web 服務,剩下其它節點作為工作站。調度器可以通過以下命令啟動:

mars-scheduler -a <scheduler_ip> -p <scheduler_port>

web 服務可以通過以下命令啟動:

mars-web -a <web_ip> -s <scheduler_ip> --ui-port <ui_port_exposed_to_user>

工作站可以通過以下命令啟動:

mars-worker -a <worker_ip> -p <worker_port> -s <scheduler_ip>

在所有 mars 進程啟動後,用戶可以運行:

sess = new_session(http://<web_ip>:<ui_port>)
a = mt.ones((2000, 2000), chunks=200)
b = mt.inner(a, a)
sess.run(b)

推薦閱讀:

TAG:科技 | 人工智慧 | 阿里巴巴集團 |