如何看UCBerkeley RISELab即將問世的Ray,replacement of Spark?

從新聞和博客介紹看,Ray不在採取MapReduce/BSP風格,而是採用data flow的風格協調異構任務之間數據交互,從技術上講借鑒了MPI的一些思想;Ray主要面向一些自動駕駛、輔助醫療等基於ML/AI技術且需要Just-In-Time decision的應用場景。


背景:本人在 UCBerkeley RISELab 交流期間使用過 ray 做過 project (當時 ray 還在大量重構的階段),並給出了大量 issue 及一部分fix,自然也翻過其源代碼。

我並不認同 Ray 和Spark是對標的,相反地Ray的定位更像是一個高性能 分散式 Python RPC (Remote Procedure Call)框架。

總所周知,Python 這幾年成為了機器學習領域的絕對明星,但是卻缺乏通用且專門針對Python的高性能分散式RPC框架用以實現強化學習和非同步學習等必須的 Parameter Server 等模型。Ray的目的就是填補這個空缺。

Ray 的核心想法很簡單:

  1. 利用 Redis實現非同步分散式。Redis 是一個使用ANSI C編寫的開源、支持網路、基於內存、可選持久性的鍵值對存儲資料庫(Key-value database)。非同步處理通過 Redis 消息循環實現。
  2. 將 Python 的函數、數值等等序列化作為value,並獲得其hash作為key存入到redis;需要使用時根據key提取它。

看上去非常簡單,但是實現一些就會發現其中的坑可以說深不見底。最麻煩的一點在於 Python 的內存對象到純 memory 表示的 gap 太大。同時一個分散式框架並不是寫出來能跑就行,其需要合理地處理異常,保留一切可用的信息等等。

首先造成困難一點就是 Python2.x 到 3.x 各個版本間的不兼容性。

比如說加上修飾器之後,想要函數繼承文檔以方便調試,你就得兼容py2和py3:

if sys.version_info &>= (3, 0):
func_invoker.__doc__ = func.__doc__
else:
func_invoker.func_doc = func.func_doc

這些倒是小問題。稍複雜一點的在於RPC的實現部分:

我們怎麼樣為函數生成key,以便存放在redis?

比如說,我們把下面的函數標誌成需要遠程執行的:

@ray.remote
def foobar(a, b):
return a * b

// execute it
object_id = foobar.remote(3, 4)
result = ray.get(object_id)

首先,我們為函數生成一個「名字」,這通過唯一路徑獲得:

func_name = "{}.{}".format(foobar.__module__, foobar.__name__)

需要注意到,更好的策略是使用 foobar.__qualname__ 而不是 foobar.__name__。因為前者會帶上函數的類層次信息(函數在哪個類裡面),而後者不會,這會導致名稱的碰撞(雖然這種情形非常罕見)。但是呢 __qualname__ 關鍵字只有 python3.x 才有,是PEP 3155的內容,而2.x又缺乏完整的獲取任意函數類層次信息的元編程手段,所以最終還是只用了 __name__。

但是問題還沒有結束,在python解釋器交互界面和jupyter notebook裡面,以及用戶搞了一個lambda表達式(沒辦法用戶就是有這種奇怪的需求)會遇到幾個問題:

  1. __module__ 是無效的。
  2. 函數可能隨時「熱更新」。
  3. 函數沒有「名字」。

於是這個時候只好選擇最後一種策略:直接把函數源代碼的sha1用作func_name(因為這些情形下是可以獲取到源代碼的)。這點可以通過 inspect.getsource(func) 實現。

非常想吐槽的一點是,python各個版本間沒有統一的判斷是不是lambda表達式的方式,現在的一個workaround是:

def islambda(value):
"""
Whether a variable is a lambda
Parameters
----------
value : Any
A variable
Returns
-------
bool
If True, the variable is a lambda
"""

_lambda = lambda: None # noqa:E731
return isinstance(value, type(_lambda)) and value.__name__ == _lambda.__name__

當然還有更加極端的情況,比如某個喪心病狂的用戶通過ray把一個lambda中的生成器生成的一個lambda和它的閉包環境作為一個數據直接傳過來,然後你又想把這個遠程的lambda作為一個函數獲得它的key以便實現RPC。這個時候只好hack python的位元組碼了。目前這類非常極端的情況ray的支持還不是很好,甚至將來也很難預期有框架完全做到(比如說某函數返回了一個生成器的閉包並返回了一個lambda 表達式並且這個表達式的閉包中有一個打開的一個本地文件描述符和關閉地socket描述符加上一個線程句柄,並遞歸調用了父函數,這種情形純粹在為難RPC框架,雖然用戶甚至也提出了希望RPC框架能通過redis建立一個通道然後遠程打開文件的意願)。

通過上面一些步驟完成了func_name的計算,然後離生成函數的key還差一步:我們需要定位函數在調度器中的「域」。ray有局部調度器和全局調度器,如果我們只想在局部執行代碼,那麼這個函數不應該被全局訪問,所以ray還加上了調度器的ID,和func_name一起組合成最終的key。

然後我們依然避免不了hack python函數的位元組碼,因為我們需要把函數的執行環境作為value存儲到redis上,這樣才能遠程調用函數。ray利用了第三方庫cloudpickle完成了這些,有興趣的可以看看這個庫,看懂了可以說你就完全了解如何做python元編程以及hack位元組碼。

Ray 的 actor 更進一步:它把python的整個類都給RPC了,其中又有若干挑戰,比如說python函數會動態生成類對象,如何在整個分散式框架同步這種東西?其中的問題又是一大堆。

此外 ray 還實現了各種消息介面,調度器,以及任務監測介面,這些都不多說。

不過到此為止也只僅僅是實現了「可用」,離「高性能」還很遠。

Python 的內存對象到純 memory 表示的 gap 太大又體現在性能瓶頸上。現有的把python對象變成序列化位元組的是pickle庫,且python3.x之後其由C編寫,效率很高。

但是pickle依然成為瓶頸,原因在於:

  1. pickle還是無法避免複雜的python對象分析過程。
  2. pickle過程中發生了多次內存拷貝,然後ray還要提交到redis上,這又多copy了幾次。
  3. redis不能直接支持本地內存共享,讓多個python進程共享內存對象。
  4. pickle無法直接支持自定義硬體,比如GPU和TPU的內存拷貝,需要大量copy。
  5. 不同數據對象和框架之間需要複雜的格式轉換,比如pandas的dataframe和spark的rdd。

為了解決這些問題,ray團隊搞了一個子模塊叫plasma。plasma直接在底層hack了python的內存對象,以及和redis的內存共享,可以實現numpy, pandas等等的數據格式序列化時做到zero-copy。此後plasma還支持「捕捉」藏在類內部的特定對象(因為用戶可能希望序列化一個numpy的list也很快)

plasma 解析層次化python內部內存對象示意圖

後來ray團隊發現plasma搞的太大了,此外它即使獨立出來也是一個非常有意義的項目,於是plasma後來改名為apache arrow,作為apache資金會的直接項目,這導致了ray的一次巨大重構。參見 Apache Arrow Homepage。ray有了apache arrow 後序列化大型的numpy array的性能得到了巨大提升,而numpy array等正是很多機器學習項目的主角。

ray有了apache arrow 後序列化大型的numpy array的性能得到了巨大提升

可以說 Ray 是 高性能 分散式 Python RPC 框架上的一次非常有意義的嘗試,其開發本身也帶動了諸如 apache arrow 等項目的成立,我看好其未來的發展。


本人背景:分散式系統博士生,研究改進過MapReduce類系統,同時利用actor開發了工業界應用的新一代實時計算引擎(尚未開源)。因此對於Spark和Ray的設計初衷和可能遇到的技術債有一定了解。

先說個人的看法:Ray和Spark是立志於解決兩個領域問題的計算機系統,Ray取代Spark是危言聳聽了。

背景

回首分散式計算系統進化的10年,我們可以更容易認識到Spark和Ray的相對位置。

2004年,Google提出MapReduce作為一個集群編程框架,並且配合Google File System等技術作為底層存儲的支持。之後10餘年,MapReduce大行其道。其成功的原因在於其給廣大程序員和數據科學家提供了一個非常好理解,表達力豐富,容錯性極高,且很容易基於商業硬體(commodity devices)來實現的分散式系統架構。之後在2010年,隨著Stanford提出的memory cloud的概念,研究人員意識到原本看似非常昂貴的內存正在變得廉價,許多高度依賴於磁碟的容錯操作其實可以利用在內存中實現。在這個背景下,Spark應運而生,催生了RDD和一系列基於內存的優化技術,在中小型規模計算上取代了原本的Hadoop Hive等基於磁碟的框架。可是至今,Hive並沒有因此而被完全取代。在超大規模計算(PB級別)場景下,其依賴於SSD以及超強的魯棒性,依然是很多公司的首選。因此,計算框架出現的初衷往往不是相互取代的關係,而是支持新計算需求和場景,對於新的硬體條件進行利用,形成優勢互補,相互合作的關係。ray和spark也是如此。

為什麼我們需要新的計算框架?

2015年前後,大量AI計算任務崛起,其中增強學習(Reinforcement Learning)和自動駕駛AI訓練等等作為一個重大的計算需求,一直很難在MapReduce得到很好的表達。MapReduce本質上是一個大規模的數據聚合(data aggregation)的模型。另一方面,許多AI的任務的核心訴求是在大規模模擬(Simulation)的環境下優化AI的行為。這種訴求和Spark當時的設計初衷完全不同。

首先,模擬的規模能夠輕易輕易到達Billions的級別。這種級別難以在Spark集群得到良好支持。Spark的Task本質上對於基於CPU的操作系統線程抽象。因此Spark並沒有非常自然的方式能夠將數十億個模擬實體(AlphaGo的模擬級別)能夠合理的同時調度到幾千個CPU上面。

另外,MapReduce範式難以表達複雜的計算狀態和同步。上億個模擬實體不僅僅運行的時間不同(有些遊戲只要幾秒就結束,有些遊戲可能十幾分鐘那個還在運行)。讓他們在統一的Bulk Synchronization Processing(BSP)模型頻繁的同步,不僅僅系統開銷很大,而且很難實現。另外,模擬實體往往需要實現複雜的計算行為,伴隨計算中間狀態的抽象,而這在Spark下難以實現。

最後,資源調度層的壓力。Spark的設計初衷是解決大數據處理問題。那麼數據已經靜態的存儲在文件系統中,Spark只需要依照資源可用性,啟動一定的大小的計算圖做靜態計算即可。可是在大規模模擬中,這上億個計算實體可能在計算中間大量產生和消失(例如,分散式優化演算法的剪枝行為)。這些龐大的對於計算圖在線修改的行為在Spark中很難有效支持。

Ray解決了什麼問題?

因此,Ray針對這類超大規模模擬的計算任務提供了一種全新的計算框架。其底層利用actor而不是類似於Spark的基於系統線程實現的資源框架。

首先在抽象層次,Ray專門給RL的幾大類經典模擬問題提供了專用演算法框架。RL用戶現在難以得到滿足的計算需求得到迅速滿足。另外犧牲普適性後,Ray可以為幾大類的RL演算法專門設計同步演算法,從而高效推進演算法的執行和資源利用率。

然後在計算效率上,Ray的底層使用了actor框架來實現。這麼做有兩大優勢。第一,actor可以認為是用戶級的線程。其可以輕易在16核的伺服器上,同時調度上百萬的actor。這使得我們可以輕易實現Billion級別的大規模模擬。第二,actor之間本身是松耦合的,在運行時大量創建和刪除actor都可以在server本地完成(毫秒級別的調度延遲),並不會嚴重影響整個集群的運行效率(當然,這麼做的假設是大量的分散式AI演算法可以忍受eventual consistency,因此比BSP模型更局限)。

另一方面,actor相對於Spark系統線程實現存在三大挑戰。第一,缺少了系統隔離能力,一個有害的actor實現可以輕易獨佔當前的cpu資源從而影響他人的使用(依賴於cooperative scheduling)。第二,由於頻繁的需要在cpu上切換不同的actor,其調度以及context switch的開銷理論上更大。第三,允許python來實現,以及進行大規模模擬的巨大中間計算狀態的更新和存儲可能成為Ray一個可能的技術債。首先,python在支持function的序列化(函數閉包,有名類,依賴注入)以及遠端部署的方面不及Java和C#這類語言成熟。另一方面,目前利用master集群來存儲大量的計算中間狀態(往往是key-value pairs),而所有的模擬actor是無狀態的。這樣雖然可以讓系統的實現變簡單,但是master節點卻容易成為系統的瓶頸(同步通訊,大狀態存儲,喪失data locality以及增加額外的memory copy)。

結論

因此我們可以看到,Ray其實本質上Berkeley對於AI時代大量崛起的大規模模擬計算需求的一個方案。其在犧牲了Spark等批處理框架易用性的同時,著眼於AI領域特定演算法,針對AI程序員的核心訴求提供了的靈活和高性能的框架支持。

一個可以預見的場景是,人們利用spark來做大規模數據準備,利用ray來訓練AI,最終將訓練的AI回饋到spark的計算圖裡面去。

最後一句話回答本題:Spark更懂數據科學(大數據里找總結),Ray更懂AI訓練(大模擬中求智慧)。


一直是開源的,誰都可以用。內部早就在用了,還時不時的搞個Ray培訓什麼的。Ray確實比Spark好用。Spark只能同步,Ray可以非同步;Spark的RDD不能記錄狀態,Ray的Actor可以。用來實現演算法的話Ray更方便。好像沒聽說過要用Ray取代Spark;只是兩種不同的系統而已吧……


作為用系統輪子的我大概半年前就開始嘗試用Ray了。雖然目前我個人主要是用在embarrassingly parallel的場景,例如隨機種子調參(大誤),不過Ray的API的確很容易理解和使用,親測單機效率也不錯。

一個半年前的問題,現在還是不太清楚:Ray目前支持高效的和MPI類似的reduce API嗎?文檔里好像有一個遞歸的方法定義,但是用起來總感覺略糾結


用起來,歡迎加入

tigerneil/ray-zh-docgithub.com圖標


ray的論文都出來好幾篇了,就不能先讀讀文章再問嗎。吃飯等菜中隨便說兩句,ray並不是要取代spark,而主要是解決了一個scheduling latency的問題。spark費勁心力在streaming上做到了百毫秒級別的latency,我覺得這基本就是這種batch processing系統的極限了,當任務力度越來越小,耗時越來越短時他就不行了。ray做到了毫秒級的(本地百微秒)latency。ray解決的另一個問題是dynamic tasks,說白了就是當你在計算A的時候需要根據A的中間結果算B,再根據B的結果繼續算A,B就是一個dynamic task。這個要在spark這種dag based的系統里做就要把A depend on B的計算單獨分離成一個task,比較麻煩。而ray可以暫時把A掛起先算B。最後就是一個global state (reddis)


我是riselab的。

我覺得ray不是代替spark的啊。


說replacement有些標題黨了,看了英文新聞原文:

「You need something much more like a just-in time, data-flow type architecture, where a task goes and all the tasks it depends on are ready and finished.

「So building a system that supports that, and retains all the desirable features of Hadoop and Spark, is the goal of our project called Ray,」 he said ( ~ Michael I. Jordan).

應該是站在巨人(Hadoop Spark)肩膀上而不是replacement,Spark目前在處理要進行大量迭代計算的supervised learning仍有不可替代的優越性,Ray則是針對reinforcement learning的。

看了下Paper - Ray: A Distributed Framework for Emerging AI Applications.

覺得其key point在於4.2.2 Bottom-Up Distributed Scheduler,所以相較於Spark實現了task的非同步。還有用actor記錄state啥的。

看樣子現在還在developing中,剛剛star了一下其github repo。。暗中觀察ing


感覺Ray還是面對ML為主, 特別適合目前做RL research的workflow.

正在看tune和rllib的部分, 學到很多good practice. 收到tune的啟發打算寫一個針對我們lab裡面可以拿來調參的輪子. 源碼再多看一些過後來細答.


推薦閱讀:

Scaling Memcache in Facebook 筆記(三)
幾個有意思的開源庫/工具
SparkSQL的3種Join實現
下一波計算浪潮
分散式計算和並行計算的區別與聯繫在哪裡?

TAG:分散式計算 | 大數據 | 深度學習DeepLearning | 大規模機器學習 |