TensorFlow遇上Spark

TensorFlowOnSpark 項目是由Yahoo開源的一個軟體包,實現TensorFlow集群服務部署在Spark平台之上。

大家好,這次我將分享TensorFlow On Spark的解決方案,將TensorFlow集群部署在Spark平台之上,實現了TensorFlow與Spark的無縫連接,更好地解決了兩者數據傳遞的問題。

tfos.part1.1_1.pdf.jpg

tfos.part2.2_2.pdf.jpg

這次分享的主要內容包括TensorFlowOnSpark架構設計,探討其工作原理,通過理解其設計,更好地理解TensorFlow集群在Spark平台上的運行機制。

tfos.part3.3_3.pdf.jpg

首先,探討TensorFlowOnSpark的架構與設計。主要包括如下兩個基本內容:

  • 架構分析
  • 生命周期

tfos.part4.4_4.pdf.jpg

在開始之前,先探討一下TensorFlowOnSpark的背景,及其它需要解決的問題。為了實現Spark利用TensorFlow深度學習,及其GPU加速的能力,最常見的解決方案如上圖所示。

搭建TensorFlow集群,並通過利用既有的Spark集群的數據完成模型的訓練,最種再將訓練好的模型部署在Spark集群上,實現數據的預測。

該方案雖然實現了Spark集群的深度學習,及其GPU加速的能力,但需要Spark集群與TensorFlow集群之間的數據傳遞,造成冗餘的系統複雜度。

tfos.part5.5_5.pdf.jpg

很容易想到,可以將TensorFlow集群部署在Spark之上,用於解決集群間數據傳遞的問題。

依次類同,該方案可實現Caffe部署在Spark集群之上,實現Spark集群對多種深度學習框架的支持能力,併兼容既有Spark組件的完整性,包括Spark MLLib, Spark Streaming, Spark SQL等。

tfos.part6.6_6.pdf.jpg

TensorFlowOnSpark的架構較為簡單,Spark Driver程序並不會參與TensorFlow內部相關的計算和處理。其設計思路像是將一個TensorFlow集群運行在了Spark上,其在每個Spark Executor中啟動TensorFlow應用程序,然後通過gRPC或RDMA方式進行數據傳遞與交互。

tfos.part7.7_7.pdf.jpg

TensorFlowOnSpark的Spark應用程序包括4個基本過程。

  • Reserve:組建TensorFlow集群,並在每個Executor進程上預留監聽埠,啟動「數據/控制」消息的監聽程序。
  • Start:在每個Executor進程上啟動TensorFlow應用程序;
  • Train/Inference:在TensorFlow集群上完成模型的訓練或推理
  • Shutdown:關閉Executor進程上的TensorFlow應用程序,釋放相應的系統資源(消息隊列)。

tfos.part8.8_8.pdf.jpg

用戶直接通過spark-submit的方式提交Spark應用程序(mnist_spark.py)。其中通過--py_files選項附帶TensorFlowOnSpark框架(tfspark.zip),及其TensorFlow應用程序(mnist_dist.py),從而實現TensorFlow集群在Spark平台上的部署。

tfos.part9.9_9.pdf.jpg

首先看看TensorFlow集群的建立過程。首先根據spark-submit傳遞的num_executor參數,通過調用cluster = sc.parallelize(num_executor)建立一個ParllelCollectionRDD,其中分區數為num_executor。也就是說,此時分區數等於Executor數。

然後再調用cluster.mapPartitions(TFParkNode.reserve)將ParllelCollectionRDD變換(transformation)為MapPartitionsRDD,在每個分區上回調TRSparkNode.reserve。

TRSparkNode.reserve將會在該節點上預留一個埠,並駐留一個Manager服務。Manager持有一個隊列,用於完成進程間的同步,實現該節點的「數據/控制」消息的服務。

數據消息啟動了兩個隊列:Input與Output,分別用於RDD與Executor進程之間的數據交換。

控制消息啟動了一個隊列:Control,用於Driver進程式控制制PS任務的生命周期,當模型訓練完成之後,通過Driver發送Stop的控制消息結束PS任務。

tfos.part10.10_10.pdf.jpg

這是從分區的角度看待TensorFlow集群建立的過程,橫軸表示RDD。這裡存在兩個RDD,第一個為ParllelCollectionRDD,然後變換為MapPartitionsRDD。

縱軸表示同一個分區(Partition),並在每個分區上啟動一個Executor進程 。在Spark中,分區數等於最終在TaskScheduler上調度的Task數目。

此處,sc.parallelize(num_executor)生成一個分區數為num_executor的ParllelCollectionRDD。也就是說,此時分區數等於num_executor數目。

在本例中,num_executor為3,包括1個PS任務,2個Worker任務。

tfos.part11.11_11.pdf.jpg

TensorFlow集群建立後,將生成上圖所示的領域模型。其中,一個TFCluster將持有num_executor個TFSparkNode節點;在每個TFSparkNode上駐留一個Manager服務,並預留一個監聽埠,用於監聽「數據/控制」消息。

實際上,TFSparkNode節點承載於Spark Executor進程之上。

tfos.part12.12_12.pdf.jpg

TensorFlow集群建立後,通過調用cluster.start啟動集群服務。其結果將在每個Executor進程上啟動TensorFlow應用程序。

此處,需要對原生的TensorFlow應用程序進行適配修改,包括2個部分:

  • Feeding與Fetching: 數據輸入/輸出機制修改
  • ClusterSpec: TF集群的構造描述

其餘代碼都將保留,最小化TensorFlow應用程序的修改。

tfos.part13.13_13.pdf.jpg

在cluster上調用foreachPartition(TFSparkNode.start(map_func)),將在每個分區(Executor進程)上回調TFSparkNode.start(map_func)。其中,map_func是對應TF應用程序的包裝。

通過上述過程,將在Spark上拉起了一個TF的集群服務。從而使得Spark集群擁有了深度學習和GPU加速的能力。

tfos.part14.14_14.pdf.jpg

當Spark平台上已經拉起了TF集群服務之後,便可以啟動模型的訓練或推理過程了。在訓練或推理過程中,最重要的是解決數據的Feeding和Fetching問題。

TFoS上提供了兩種方案:

  • TensorFlow QueueRunner:利用TensorFlow提供的FileReader和QueueRunner機制。Spark未參與任何工作,請查閱TensorFlow官方相關文檔。
  • Spark Feeding:首先從RDD讀取分區數據(通過HadoopRDD.compute),然後將其放在Input隊列中,Executor進程再從該隊列中取出,並進一步通過feed_dict,調用session.run將分區數據供給給TensorFlow Graph中。

tfos.part15.15_15.pdf.jpg

Feeding過程,就是通過Input Queue同步實現的。當RDD讀取分區數據後,阻塞式地將分區數據put到Input隊列中;TFGraph在session.run獲取Next Batch時,也是阻塞式地等待數據的到來。

tfos.part16.16_16.pdf.jpg

同樣的道理,Fetching過程與Feeding過程類同,只是使用Output Queue,並且數據流方向相反。

session.run返回的數據,通過put阻塞式地放入Output Queue,RDD也是阻塞式地等待數據到來。

tfos.part17.17_17.pdf.jpg

以模型訓練過程為例,講解RDD的變換過程。此處以Mnist手寫識別為例,左邊表示X,右邊表示Y。分別通過HadoopRDD讀取分區數據,然後通過MapPartititionRDD變換分區的數據格式;然後通過zip運算元,實現兩個RDD的摺疊,生成ZipPartitionsRDD。

然後,根據Epochs超級參數的配置,將該RDD重複執行Epochs次,最終將結果匯總,生成UnionRDD。

在此之前,都是Transformation的過程,最終調用foreachPartition(train)啟動Action,觸發Spark Job的提交和任務的運行。

tfos.part18.18_18.pdf.jpg

當模型訓練或推理完成之後,分別在Input/Control隊列中投擲Stop(以傳遞None實現)消息,當Manager收到Stop消息後,停止隊列的運行。

最終,Spark應用程序退出,Executor進程退出,整個工作流執行結束。

tfos.part19.19_19.pdf.jpg

tfos.part20.20_20.pdf.jpg

推薦資料,強烈推薦直接地源代碼閱讀。

tfos.part21.21_21.pdf.jpg

原文鏈接:jianshu.com/p/62b4ebb5a

推薦閱讀:

深入淺出Spark(三)什麼是Standalone
Spark比Hadoop的優勢有這麼大嗎?
MapReduce過程中,如果Map之後每個Key對應Value的數量不平衡會不會影響效率?
Scala 究竟好在那裡?
如何解釋spark mllib中ALS演算法的原理?

TAG:深度学习DeepLearning | TensorFlow | Spark |