學習筆記TF041:分散式並行

TensorFlow分散式並行基於gRPC通信框架,一個master負責創建Session,多個worker負責執行計算圖任務。

先創建TensorFlow Cluster對象,包含一組task(每個task一台獨立機器),分散式執行TensorFlow計算圖。一個Cluster切分多個job,一個job是一類特定任務(parameter server ps,worker),每個job可以包含多個task。每個task創建一個server,連接到Cluster,每個task執行在不同機器。也可以一台機器執行多個task(不同GPU)。tf.train.ClusterSpec初始化Cluster對象,初始化信息是Python dict,tf.train.ClusterSpec({"ps":["192.168.233.201:2222"],"worker":["192.168.233.202:2222","192.168.233.203:2222"]}),代表一個parameter server和兩個worker,分別在三個不同機器上。每個task,定義自己身份,如server=tf.train.Server(cluster,job_name="ps",task_index=0),機器job定義ps第0台機器。程序中with tf.device("/job:worker/task:7"),限定Variable存放在哪個task或機器。

TensorFlow分散式模式:In-graph replication模型並行,模型計算圖不同部分放在不同機器執行;Between-graph replication數據並行,每台機器相同計算圖,計算不同batch數據。非同步並行,每台機器獨立計算梯度,計算完更新到parameter server,不等其他機器。同步並行,等所有機器都完成梯度計算,多個梯度合成統一更新模型參數。同步並行訓練,loss下降速度更快,可達到最大精度更高,同步並行速度取決最慢機器,設備速度一致,效率較高。

TensorFlow實現包含1個paramter server、2個worker分散式並行訓練程序,MNIST手寫數據識別任務示例。寫一個完整Python文件,在不同機器不同task執行。載入依賴庫。

tf.app.flags定義標記,命令行執行TensorFlow設置參數。命令行指定參數被TensorFlow讀取,轉flags。設定數據儲存目錄data_dir默認/tmp/mnist-data,隱藏節點數默認100,訓練最大步數train_steps默認1000000,batch_size默認100,學習速率默認0.01。

設定是否使用同步並行標記sync_replicas默認False,命令行執行時可設True開戶同步並行。設定需要累計梯度個數更新模型值默認None,代表同步並行積累多少個batch梯度再進行參數更新,設None 為worker數量,所有worker完成一個batch訓練後再更新模型參數。

定義ps地址,默認192.168.233.201:2222,根據集群實際情況配置。worker地址設置192.168.233.202:2222和192.168.233.203:2222.設置job_name和task_index FLAG。

flags.FLAGS直接命名FLAGS,簡化使用。設置圖片尺寸IMAGE_PIXELS 28。

編寫程序主函數main,input_data.read_data_sets下載讀取MNIST數據集,設置one_hot編碼格式。檢測命令行輸入參數,確保job_name和task_index兩個必備參數。顯示job_name和task_index,ps、worker所有地址解析成列表ps_spec、worker_spec。

計算總共worker數量,tf.train.ClusterSpec生成TensorFlow Cluster對象,傳入參數ps地址信息和worker地址信息。tf.train.Server創建當前機器server,連接Cluster。如當前節點是parameter server,不進行後續操作,server.join等待worker工作。

判斷當前機器是否主節點,task_index是否0。定義當前機器worker_device,格式"job:worker/task:0/gpu:0"。多台機器,每台機器1塊GPU,總共需要機器數量worker。如一台機器多GPU,一個task管理多個GPU或多個task分別管理。tf.train.replica_device_setter()設置worker資源,worker_device計算資源,ps_device存儲模型參數資源。replica_device_setter將模型參數部署在獨立ps伺服器"/job:ps/cpu:0",訓練操作部署在"/job:worker/task:0/gpu:0",本機GPU。創建記錄全局訓練步數變數global_step。

定義神經網路模型,tf.truncated_normal初始化權重,tf.zeros初始化偏置,創建輸入 placeholder,tf.nn.xw_plus_b輸入矩陣乘法、偏置操作,ReLU激活函數處理,得到第一個隱層輸出hid。tf.nn.xw_plus_b、tf.nn.softmax對第一層輸出hid處理,得到網路最終輸出y。最後計算損失cross_entropy,定義優化器Adam。

判斷是否設置同步訓練模式sync_replicas,如果同步模型,先獲取同步更新模型參數需要副本數replicas_to_aggregate;如果沒有單獨設置,worker數作默認值。tf.train.SyncReplicasOptimizer創建同步訓練優化器,對原有優化器擴展。傳入原有優化器及其他參數(replicas_to_aggregate、total_num_replicas、replica_id),原有優化器改造為同步分散式訓練版本。用普通(非同步)或同步優化器優化損失cross_entropy。

同步訓練模式,主節點,opt.get_chief_queue_runner創建隊列執行器,opt.get_init_tokens_op創建全局參數初始化器。

生成本地參數初始化操作init_op,創建臨時訓練目錄,tf.train_Supervisor創建分散式訓練監督器,傳入參數is_chief、train_dir、init_op。Supervisor管理task參與到分散式訓練。

設置Session參數,allow_soft_placement設True,代表操作在指定device不能執行時轉到其他device執行。

如果主節點,顯示初始化Session,其他節點顯示等待主節點初始化操作。執行sv.prepate_or_wait_for_session()。

如果處於同步模型主節點,sv.start_queue_runners執行隊列化執行器chief_queue_runner,執行全局參數初始化器init_tokens_op。

訓練過程,記錄worker執行訓練啟動時間,初始化本地訓練步數local_step,進入訓練循環。每步訓練,從nnist.train.next_batch讀取一個batch數據,生成feed_dict,調train_step執行訓練。當全局訓練步數達到預設最大值,停止訓練。

訓練結束,展示總訓練時間,在驗證數據上計算預測結果損失cross_entropy,展示。

在主程序執行tf.app.run()啟動main函數,全部代碼保存到distributed.py文件。3台不同機器分別執行distributed.py啟動3個task,每次執行distributed.py,傳入job_name、task_index指定worker身份。

分別在三台機器192.168.233.201?192.168.233.202、192.168.233.203執行python distributed.py。

同步模式,加上--sync_replicas=True。global_step,非同步時,全局步數是所有worker訓練步數和,同步時是多少輪並行訓練。

#from __future__ import absolute_import

#from __future__ import division

#from __future__ import print_function

import math

#import sys

import tempfile

import time

import tensorflow as tf

from tensorflow.examples.tutorials.mnist import input_data

flags = tf.app.flags

flags.DEFINE_string("data_dir", "/tmp/mnist-data",

"Directory for storing mnist data")

#flags.DEFINE_boolean("download_only", False,

# "Only perform downloading of data; Do not proceed to "

# "session preparation, model definition or training")

flags.DEFINE_integer("task_index", None,

"Worker task index, should be >= 0. task_index=0 is "

"the master worker task the performs the variable "

"initialization ")

#flags.DEFINE_integer("num_gpus", 2,

# "Total number of gpus for each machine."

# "If you dont use GPU, please set it to 0")

flags.DEFINE_integer("replicas_to_aggregate", None,

"Number of replicas to aggregate before parameter update"

"is applied (For sync_replicas mode only; default: "

"num_workers)")

flags.DEFINE_integer("hidden_units", 100,

"Number of units in the hidden layer of the NN")

flags.DEFINE_integer("train_steps", 1000000,

"Number of (global) training steps to perform")

flags.DEFINE_integer("batch_size", 100, "Training batch size")

flags.DEFINE_float("learning_rate", 0.01, "Learning rate")

flags.DEFINE_boolean("sync_replicas", False,

"Use the sync_replicas (synchronized replicas) mode, "

"wherein the parameter updates from workers are aggregated "

"before applied to avoid stale gradients")

#flags.DEFINE_boolean(

# "existing_servers", False, "Whether servers already exists. If True, "

# "will use the worker hosts via their GRPC URLs (one client process "

# "per worker host). Otherwise, will create an in-process TensorFlow "

# "server.")

flags.DEFINE_string("ps_hosts","192.168.233.201:2222",

"Comma-separated list of hostname:port pairs")

flags.DEFINE_string("worker_hosts", "192.168.233.202:2223,192.168.233.203:2224",

"Comma-separated list of hostname:port pairs")

flags.DEFINE_string("job_name", None,"job name: worker or ps")

FLAGS = flags.FLAGS

IMAGE_PIXELS = 28

def main(unused_argv):

mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

# if FLAGS.download_only:

# sys.exit(0)

if FLAGS.job_name is None or FLAGS.job_name == "":

raise ValueError("Must specify an explicit `job_name`")

if FLAGS.task_index is None or FLAGS.task_index =="":

raise ValueError("Must specify an explicit `task_index`")

print("job name = %s" % FLAGS.job_name)

print("task index = %d" % FLAGS.task_index)

#Construct the cluster and start the server

ps_spec = FLAGS.ps_hosts.split(",")

worker_spec = FLAGS.worker_hosts.split(",")

# Get the number of workers.

num_workers = len(worker_spec)

cluster = tf.train.ClusterSpec({

"ps": ps_spec,

"worker": worker_spec})

#if not FLAGS.existing_servers:

# Not using existing servers. Create an in-process server.

server = tf.train.Server(

cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

if FLAGS.job_name == "ps":

server.join()

is_chief = (FLAGS.task_index == 0)

# if FLAGS.num_gpus > 0:

# if FLAGS.num_gpus < num_workers:

# raise ValueError("number of gpus is less than number of workers")

# # Avoid gpu allocation conflict: now allocate task_num -> #gpu

# # for each worker in the corresponding machine

# gpu = (FLAGS.task_index % FLAGS.num_gpus)

# worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)

# elif FLAGS.num_gpus == 0:

# # Just allocate the CPU to worker server

# cpu = 0

# worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)

# # The device setter will automatically place Variables ops on separate

# # parameter servers (ps). The non-Variable ops will be placed on the workers.

# # The ps use CPU and workers use corresponding GPU

worker_device = "/job:worker/task:%d/gpu:0" % FLAGS.task_index

with tf.device(

tf.train.replica_device_setter(

worker_device=worker_device,

ps_device="/job:ps/cpu:0",

cluster=cluster)):

global_step = tf.Variable(0, name="global_step", trainable=False)

# Variables of the hidden layer

hid_w = tf.Variable(

tf.truncated_normal(

[IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],

stddev=1.0 / IMAGE_PIXELS),

name="hid_w")

hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

# Variables of the softmax layer

sm_w = tf.Variable(

tf.truncated_normal(

[FLAGS.hidden_units, 10],

stddev=1.0 / math.sqrt(FLAGS.hidden_units)),

name="sm_w")

sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

# Ops: located on the worker specified with FLAGS.task_index

x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])

y_ = tf.placeholder(tf.float32, [None, 10])

hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)

hid = tf.nn.relu(hid_lin)

y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))

cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

opt = tf.train.AdamOptimizer(FLAGS.learning_rate)

if FLAGS.sync_replicas:

if FLAGS.replicas_to_aggregate is None:

replicas_to_aggregate = num_workers

else:

replicas_to_aggregate = FLAGS.replicas_to_aggregate

opt = tf.train.SyncReplicasOptimizer(

opt,

replicas_to_aggregate=replicas_to_aggregate,

total_num_replicas=num_workers,

replica_id=FLAGS.task_index,

name="mnist_sync_replicas")

train_step = opt.minimize(cross_entropy, global_step=global_step)

if FLAGS.sync_replicas and is_chief:

# Initial token and chief queue runners required by the sync_replicas mode

chief_queue_runner = opt.get_chief_queue_runner()

init_tokens_op = opt.get_init_tokens_op()

init_op = tf.global_variables_initializer()

train_dir = tempfile.mkdtemp()

sv = tf.train.Supervisor(

is_chief=is_chief,

logdir=train_dir,

init_op=init_op,

recovery_wait_secs=1,

global_step=global_step)

sess_config = tf.ConfigProto(

allow_soft_placement=True,

log_device_placement=False,

device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])

# The chief worker (task_index==0) session will prepare the session,

# while the remaining workers will wait for the preparation to complete.

if is_chief:

print("Worker %d: Initializing session..." % FLAGS.task_index)

else:

print("Worker %d: Waiting for session to be initialized..." %

FLAGS.task_index)

# if FLAGS.existing_servers:

# server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]

# print("Using existing server at: %s" % server_grpc_url)

#

# sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)

# else:

sess = sv.prepare_or_wait_for_session(server.target,

config=sess_config)

print("Worker %d: Session initialization complete." % FLAGS.task_index)

if FLAGS.sync_replicas and is_chief:

# Chief worker will start the chief queue runner and call the init op

print("Starting chief queue runner and running init_tokens_op")

sv.start_queue_runners(sess, [chief_queue_runner])

sess.run(init_tokens_op)

# Perform training

time_begin = time.time()

print("Training begins @ %f" % time_begin)

local_step = 0

while True:

# Training feed

batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)

train_feed = {x: batch_xs, y_: batch_ys}

_, step = sess.run([train_step, global_step], feed_dict=train_feed)

local_step += 1

now = time.time()

print("%f: Worker %d: training step %d done (global step: %d)" %

(now, FLAGS.task_index, local_step, step))

if step >= FLAGS.train_steps:

break

time_end = time.time()

print("Training ends @ %f" % time_end)

training_time = time_end - time_begin

print("Training elapsed time: %f s" % training_time)

# Validation feed

val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}

val_xent = sess.run(cross_entropy, feed_dict=val_feed)

print("After %d training step(s), validation cross entropy = %g" %

(FLAGS.train_steps, val_xent))

if __name__ == "__main__":

tf.app.run()

參考資料:

《TensorFlow實戰》

歡迎付費諮詢(150元每小時),我的微信:qingxingfengzi


推薦閱讀:

關於L1、L2正規化的一些疑問?
致力於分享最新最全面的機器學習資料,歡迎你成為貢獻者!
你用機器學習做過哪些有趣的事兒?

TAG:TensorFlow | 机器学习 | 深度学习DeepLearning |