標籤:

Tensorflow on Spark爬坑指南

作者: biggeng

原文鏈接:jianshu.com/p/72cb5816a

查看更多的專業文章,請移步至「人工智慧LeadAI」公眾號,查看更多的課程信息及產品信息,請移步至全新打造的官網:www.leadai.org.

正文共10718個字 3張圖,預計閱讀時間27分鐘。

由於機器學習和深度學習不斷被炒熱,Tensorflow作為Google家(Jeff Dean大神)推出的開源深度學習框架,也獲得了很多關注。Tensorflow的靈活性很強,允許用戶使用多台機器的多個設備(如不同的CPU和GPU)。但是由於Tensorflow 分散式的方式需要用戶在客戶端顯示指定集群信息,另外需要手動拉起ps, worker等task. 對資源管理和使用上有諸多不便。因此,Yahoo開源了基於Spark的Tensorflow,使用executor執行worker和ps task. 項目地址為:github.com/yahoo/Tensor

寫在前面,前方高能,請注意!

雖然yahoo提供了如何在Spark集群中運行Tensorflow的步驟,但是由於這個guideline過於簡單,一般情況下,根據這個guideline是跑不起來的。

Tensorflow on spark介紹

TensorflowOnSpark 支持使用Spark/Hadoop集群分散式的運行Tensorflow,號稱支持所有的Tensorflow操作。需要注意的是用戶需要對原有的TF程序進行簡單的改造,就能夠運行在Spark集群之上。

如何跑起來Tensorflow on spark

雖然Yahoo在github上說明了安裝部署TFS (github.com/yahoo/Tensor), 但是根據實際實踐,根據這個文檔如果能跑起來,那真的要謝天謝地。因為在實際過程中,會因為環境問題遇到一些unexpected error。以下就是我將自己在實踐過程中遇到的一些問題總結列舉。

1、編譯python和pip

yahoo提供的編譯步驟為:

# download and extract Python 2.7nexport PYTHON_ROOT=~/Pythonncurl -O https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgzntar -xvf Python-2.7.12.ntgzrm Python-2.7.12.tgzn# compile into local PYTHON_ROOTnpushd Python-2.7.12n./configure --prefix="${PYTHON_ROOT}" --enable-unicode=ucs4nmakenmake installpopdrm -rf Python-2.7.12 n# install pipnpushd "${PYTHON_ROOT}"ncurl -O https://bootstrap.pypa.io/get-pip.pynbin/python get-pip.pynrm get-pip.pyn# install tensorflow (and any custom dependencies)n${PYTHON_ROOT}/bin/pip install pydoopn# Note: add any extra dependencies herenpopdn

在實際編譯過程中,採用的Centos7.2操作系統,可能出現以下問題:

安裝pip報錯

bin/python get-pip.pynERROR:root:code for hash sha224 was not found.nTraceback (most recent call last):n

報這個錯一般是因為python中缺少_ssl.so 和 _hashlib.so庫造成,可以從系統python庫中找對應版本的拷貝到相應的python文件夾下(例如:lib/python2.7/lib-dynload)。

缺少zlib

bin/python get-pip.pynTraceback (most recent call last):n File "get-pip.py", line 20061, in <module>n main()n File "get-pip.py", line 194, in mainn bootstrap(tmpdir=tmpdir)n File "get-pip.py", line 82, in bootstrapn import pipnzipimport.ZipImportError: cant decompress data; zlib not availablen

解決這個問題的方法是使用yum安裝zlib*後,重新編譯python後,即可解決。

ssl 報錯

bin/python get-pip.pynpip is configured with locations that require TLS/SSL, however the ssl module in Python is not available.nCollecting pipn Could not fetch URL https://pypi.python.org/simple/pip/: There was a problem confirming the ssl certificate: Cant connect to HTTPS URL because the SSL module is not available. - skippingn Could not find a version that satisfies the requirement pip (from versions: )nNo matching distribution found for pipn

解決方法: 在Python安裝目錄下打開文件lib/python2.7/ssl.py,注釋掉 , HAS_ALPN

from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN#, HAS_ALPN

pip install pydoop報錯

gcc: error trying to exec cc1plus: execvp:

解決辦法:需要在機器上安裝g++編譯器

2、安裝編譯 tensorflow w/RDMA support

git clone git@github.com:yahoo/tensorflow.git# follow build instructions to install into ${PYTHON_ROOT}n

注意編譯過程需要google的bazel和protoc, 這兩個工具需要提前裝好。

3、接下來的步驟按照

github.com/yahoo/Tensor 指導的步驟完成。

4、在HDP2.5部署的spark on Yarn環境上運行tensorflow

在yarn-env.sh中設置環境變數,增加 * export HADOOP_HDFS_HOME=/usr/hdp/2.5.0.0-1245/hadoop-hdfs/*

因為這個環境變數需要在執行tensorflow任務時被用到,如果沒有export,會報錯。

重啟YARN,使上述改動生效。

按照Yahoo github上的步驟,執行訓練mnist任務時,按下面命令提交作業:

export PYTHON_ROOT=/data2/Python/export LD_LIBRARY_PATH=${PATH}export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/pythonexport SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"export PATH=${PYTHON_ROOT}/bin/:$PATHexport QUEUE=defaultnnn spark-submit n--master yarn n--deploy-mode cluster n--queue ${QUEUE} n--num-executors 4 n--executor-memory 1G n--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py n--conf spark.dynamicAllocation.enabled=false n--conf spark.yarn.maxAppAttempts=1 n--archives hdfs:///user/${USER}/Python.zip#Python --conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" n/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py n--images mnist/csv/test/images n--labels mnist/csv/test/labels n--mode inference n--model mnist_model n--output predictionsn

此時,通過Spark界面可以觀察到worker0處於阻塞狀態。

17/03/21 18:17:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.4 KB, free 542.6 KB)n17/03/21 18:17:18 INFO TorrentBroadcast: Reading broadcast variable 1 took 17 msn17/03/21 18:17:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 440.6 KB, free 983.3 KB)n2017-03-21 18:17:18,404 INFO (MainThread-14872) Connected to TFSparkNode.mgr on ochadoop03, ppid=14685, state=runningn2017-03-21 18:17:18,411 INFO (MainThread-14872) mgr.state=runningn2017-03-21 18:17:18,411 INFO (MainThread-14872) Feeding partition <generator object load_stream at 0x7f447f120960> into input queue <multiprocessing.queues.JoinableQueue object at 0x7f447f129890>n17/03/21 18:17:20 INFO PythonRunner: Times: total = 2288, boot = -5387, init = 5510, finish = 2165n17/03/21 18:17:20 INFO PythonRunner: Times: total = 101, boot = 3, init = 21, finish = 77n2017-03-21 18:17:20.587060: I tensorflow/core/distributed_runtime/master_session.cc:1011] Start master session b5d9a21a16799e0b with config:n

通過分析原因發現,在mnist例子中,logdir設置的是hdfs的路徑,可能是由於tf對hdfs的支持有限或者存在bug(慚愧,並沒有深究 :))。將logdir改為本地目錄,就可以正常運行。但是由此又帶來了另一個問題,因為Spark每次啟動時worker0的位置並不確定,有可能每次啟動的機器都不同,這就導致在inference的時候沒有辦法獲得訓練的模型。

一個解決辦法是:在worker 0訓練完模型後,將模型同步到hdfs中,在inference的之前,再將hdfs的checkpoints文件夾拉取到本地執行。以下為我對yahoo提供的mnist example做的類似的修改.

def writeFileToHDFS():nrootdir = /tmp/mnist_model nclient = HdfsClient(hosts=localhost:50070)nclient.mkdirs(/user/root/mnist_model) nfor parent,dirnames,filenames in os.walk(rootdir): nfor dirname in dirnames: nprint("parent is:{0}".format(parent)) nfor filename in filenames: nclient.copy_from_local(os.path.join(parent,filename), os.path.join(/user/root/mnist_model,filename), overwrite=True) n#logdir = TFNode.hdfs_path(ctx, args.model) nlogdir = "/tmp/" + args.model nwhile not sv.should_stop() and step < args.steps: n# Run a training step asynchronously. n# See `tf.train.SyncReplicasOptimizer` for additional details on how to n# perform *synchronous* training. n# using feed_dict nbatch_xs, batch_ys = feed_dict() nfeed = {x: batch_xs, y_: batch_ys} nif len(batch_xs) != batch_size: nprint("done feeding") nbreak nelse: nif args.mode == "train": n_, step = sess.run([train_op, global_step], feed_dict=feed) n# print accuracy and save model checkpoint to HDFS every 100 steps nif (step % 100 == 0): nprint("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy,{x: batch_xs, y_: batch_ys}))) nelse: n# args.mode == "inference" nlabels, preds, acc = sess.run([label, prediction, accuracy], feed_dict=feed) nresults = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)] nTFNode.batch_results(ctx.mgr, results) nprint("acc: {0}".format(acc)) nif task_index == 0: nwriteFileToHDFS()n

當然這段代碼只是為了進行說明,並不是很嚴謹,在上傳hdfs的時候,是需要對文件夾是否存在等要做一系列的判斷。

5、train& inference

向Spark集群提交訓練任務.

spark-submit n--master yarn n--deploy-mode cluster n--queue ${QUEUE} n--num-executors 3 n--executor-memory 7G n--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py n--conf spark.dynamicAllocation.enabled=false n--conf spark.yarn.maxAppAttempts=1 n--archives hdfs:///user/${USER}/Python.zip#Python --conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" n/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py n--images mnist/csv/train/images n--labels mnist/csv/train/labels n--mode train n--model mnist_modeln

執行起來後,查看Spark UI,可以看到當前訓練過程中的作業執行情況。

6.46.43.png

執行完後,檢查hdsf,checkpoint目錄, 可以看到模型的checkpoints已經上傳到hdfs中。

hadoop fs -ls /user/root/mnist_modelnFound 8 itemsn-rwxr-xr-x 3 root hdfs 179 2017-03-21 18:53 /user/root/mnist_model/checkpointn-rwxr-xr-x 3 root hdfs 117453 2017-03-21 18:53 /user/root/mnist_model/graph.pbtxtn-rwxr-xr-x 3 root hdfs 814164 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.data-00000-of-00001-rwxr-xr-x 3 root hdfs 372 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.indexn-rwxr-xr-x 3 root hdfs 45557 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-0.metan-rwxr-xr-x 3 root hdfs 814164 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.data-00000-of-00001-rwxr-xr-x 3 root hdfs 372 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.indexn-rwxr-xr-x 3 root hdfs 45557 2017-03-21 18:53 /user/root/mnist_model/model.ckpt-338.metan

根據訓練的結果,執行模型inference

spark-submit n--master yarn n--deploy-mode cluster n--queue ${QUEUE} n--num-executors 4 n--executor-memory 1G n--py-files /data2/tesorflowonSpark/TensorFlowOnSpark/tfspark.zip,/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py n--conf spark.dynamicAllocation.enabled=false n--conf spark.yarn.maxAppAttempts=1 n--archives hdfs:///user/${USER}/Python.zip#Python --conf spark.executorEnv.LD_LIBRARY_PATH="/usr/jdk64/jdk1.8.0_77/jre/lib/amd64/server/" n/data2/tesorflowonSpark/TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py n--images mnist/csv/test/images n--labels mnist/csv/test/labels n--mode inference n--model mnist_model n--output predictionsn

等任務執行完成後,會發現,模型判斷的結果已經輸出到hdfs相關目錄下了。

hadoop fs -ls /user/root/predictionsnFound 11 itemsn-rw-r--r-- 3 root hdfs 0 2017-03-21 19:16 /user/root/predictions/_SUCCESSn-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00000n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00001n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00002n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00003n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00004n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00005n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00006n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00007n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00008n-rw-r--r-- 3 root hdfs 51000 2017-03-21 19:16 /user/root/predictions/part-00009n

查看其中的某一個文件,可看到裡面保存的是測試集的標籤和根據模型預測的結果。

# hadoop fs -cat /user/root/predictions/part-000002017-03-21T19:16:40.795694 Label: 7, Prediction: 7n2017-03-21T19:16:40.795729 Label: 2, Prediction: 2n2017-03-21T19:16:40.795741 Label: 1, Prediction: 1n2017-03-21T19:16:40.795750 Label: 0, Prediction: 0n2017-03-21T19:16:40.795759 Label: 4, Prediction: 4n2017-03-21T19:16:40.795769 Label: 1, Prediction: 1n2017-03-21T19:16:40.795778 Label: 4, Prediction: 4n2017-03-21T19:16:40.795787 Label: 9, Prediction: 9n2017-03-21T19:16:40.795796 Label: 5, Prediction: 6n2017-03-21T19:16:40.795805 Label: 9, Prediction: 9n2017-03-21T19:16:40.795814 Label: 0, Prediction: 0n2017-03-21T19:16:40.795822 Label: 6, Prediction: 6n2017-03-21T19:16:40.795831 Label: 9, Prediction: 9n2017-03-21T19:16:40.795840 Label: 0, Prediction: 0n2017-03-21T19:16:40.795848 Label: 1, Prediction: 1n2017-03-21T19:16:40.795857 Label: 5, Prediction: 5n2017-03-21T19:16:40.795866 Label: 9, Prediction: 9n2017-03-21T19:16:40.795875 Label: 7, Prediction: 7n2017-03-21T19:16:40.795883 Label: 3, Prediction: 3n2017-03-21T19:16:40.795892 Label: 4, Prediction: 4n2017-03-21T19:16:40.795901 Label: 9, Prediction: 9n2017-03-21T19:16:40.795909 Label: 6, Prediction: 6n2017-03-21T19:16:40.795918 Label: 6, Prediction: 6n

Spark集群和tensorflow job task的對應關係,如下圖,spark集群起了4個executor,其中一個作為PS, 另外3個作為worker,而誰做ps誰做worker是由Yarn和spark調度的。

7.22.23.png

Cluster spec: {ps: [ochadoop02:50060], worker: [ochadoop04:52150, ochadoop03:52733, ochad


推薦閱讀:

怎麼理解tensorflow中tf.train.shuffle_batch()函數?
My solutions for `Google TensorFlow Speech Recognition Challenge`
tensorflow如何訓練自己的圖像數據?
2.2 RNN入門

TAG:TensorFlow |