學習筆記TF041:分散式並行
TensorFlow分散式並行基於gRPC通信框架,一個master負責創建Session,多個worker負責執行計算圖任務。
先創建TensorFlow Cluster對象,包含一組task(每個task一台獨立機器),分散式執行TensorFlow計算圖。一個Cluster切分多個job,一個job是一類特定任務(parameter server ps,worker),每個job可以包含多個task。每個task創建一個server,連接到Cluster,每個task執行在不同機器。也可以一台機器執行多個task(不同GPU)。tf.train.ClusterSpec初始化Cluster對象,初始化信息是Python dict,tf.train.ClusterSpec({"ps":["192.168.233.201:2222"],"worker":["192.168.233.202:2222","192.168.233.203:2222"]}),代表一個parameter server和兩個worker,分別在三個不同機器上。每個task,定義自己身份,如server=tf.train.Server(cluster,job_name="ps",task_index=0),機器job定義ps第0台機器。程序中with tf.device("/job:worker/task:7"),限定Variable存放在哪個task或機器。
TensorFlow分散式模式:In-graph replication模型並行,模型計算圖不同部分放在不同機器執行;Between-graph replication數據並行,每台機器相同計算圖,計算不同batch數據。非同步並行,每台機器獨立計算梯度,計算完更新到parameter server,不等其他機器。同步並行,等所有機器都完成梯度計算,多個梯度合成統一更新模型參數。同步並行訓練,loss下降速度更快,可達到最大精度更高,同步並行速度取決最慢機器,設備速度一致,效率較高。
TensorFlow實現包含1個paramter server、2個worker分散式並行訓練程序,MNIST手寫數據識別任務示例。寫一個完整Python文件,在不同機器不同task執行。載入依賴庫。
tf.app.flags定義標記,命令行執行TensorFlow設置參數。命令行指定參數被TensorFlow讀取,轉flags。設定數據儲存目錄data_dir默認/tmp/mnist-data,隱藏節點數默認100,訓練最大步數train_steps默認1000000,batch_size默認100,學習速率默認0.01。
設定是否使用同步並行標記sync_replicas默認False,命令行執行時可設True開戶同步並行。設定需要累計梯度個數更新模型值默認None,代表同步並行積累多少個batch梯度再進行參數更新,設None 為worker數量,所有worker完成一個batch訓練後再更新模型參數。
定義ps地址,默認192.168.233.201:2222,根據集群實際情況配置。worker地址設置192.168.233.202:2222和192.168.233.203:2222.設置job_name和task_index FLAG。
flags.FLAGS直接命名FLAGS,簡化使用。設置圖片尺寸IMAGE_PIXELS 28。
編寫程序主函數main,input_data.read_data_sets下載讀取MNIST數據集,設置one_hot編碼格式。檢測命令行輸入參數,確保job_name和task_index兩個必備參數。顯示job_name和task_index,ps、worker所有地址解析成列表ps_spec、worker_spec。
計算總共worker數量,tf.train.ClusterSpec生成TensorFlow Cluster對象,傳入參數ps地址信息和worker地址信息。tf.train.Server創建當前機器server,連接Cluster。如當前節點是parameter server,不進行後續操作,server.join等待worker工作。
判斷當前機器是否主節點,task_index是否0。定義當前機器worker_device,格式"job:worker/task:0/gpu:0"。多台機器,每台機器1塊GPU,總共需要機器數量worker。如一台機器多GPU,一個task管理多個GPU或多個task分別管理。tf.train.replica_device_setter()設置worker資源,worker_device計算資源,ps_device存儲模型參數資源。replica_device_setter將模型參數部署在獨立ps伺服器"/job:ps/cpu:0",訓練操作部署在"/job:worker/task:0/gpu:0",本機GPU。創建記錄全局訓練步數變數global_step。
定義神經網路模型,tf.truncated_normal初始化權重,tf.zeros初始化偏置,創建輸入 placeholder,tf.nn.xw_plus_b輸入矩陣乘法、偏置操作,ReLU激活函數處理,得到第一個隱層輸出hid。tf.nn.xw_plus_b、tf.nn.softmax對第一層輸出hid處理,得到網路最終輸出y。最後計算損失cross_entropy,定義優化器Adam。
判斷是否設置同步訓練模式sync_replicas,如果同步模型,先獲取同步更新模型參數需要副本數replicas_to_aggregate;如果沒有單獨設置,worker數作默認值。tf.train.SyncReplicasOptimizer創建同步訓練優化器,對原有優化器擴展。傳入原有優化器及其他參數(replicas_to_aggregate、total_num_replicas、replica_id),原有優化器改造為同步分散式訓練版本。用普通(非同步)或同步優化器優化損失cross_entropy。
同步訓練模式,主節點,opt.get_chief_queue_runner創建隊列執行器,opt.get_init_tokens_op創建全局參數初始化器。
生成本地參數初始化操作init_op,創建臨時訓練目錄,tf.train_Supervisor創建分散式訓練監督器,傳入參數is_chief、train_dir、init_op。Supervisor管理task參與到分散式訓練。
設置Session參數,allow_soft_placement設True,代表操作在指定device不能執行時轉到其他device執行。
如果主節點,顯示初始化Session,其他節點顯示等待主節點初始化操作。執行sv.prepate_or_wait_for_session()。
如果處於同步模型主節點,sv.start_queue_runners執行隊列化執行器chief_queue_runner,執行全局參數初始化器init_tokens_op。
訓練過程,記錄worker執行訓練啟動時間,初始化本地訓練步數local_step,進入訓練循環。每步訓練,從nnist.train.next_batch讀取一個batch數據,生成feed_dict,調train_step執行訓練。當全局訓練步數達到預設最大值,停止訓練。
訓練結束,展示總訓練時間,在驗證數據上計算預測結果損失cross_entropy,展示。
在主程序執行tf.app.run()啟動main函數,全部代碼保存到distributed.py文件。3台不同機器分別執行distributed.py啟動3個task,每次執行distributed.py,傳入job_name、task_index指定worker身份。
分別在三台機器192.168.233.201?192.168.233.202、192.168.233.203執行python distributed.py。
同步模式,加上--sync_replicas=True。global_step,非同步時,全局步數是所有worker訓練步數和,同步時是多少輪並行訓練。
#from __future__ import absolute_import
#from __future__ import division
#from __future__ import print_function
import math
#import sys
import tempfile
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
flags = tf.app.flags
flags.DEFINE_string("data_dir", "/tmp/mnist-data",
"Directory for storing mnist data")
#flags.DEFINE_boolean("download_only", False,
# "Only perform downloading of data; Do not proceed to "
# "session preparation, model definition or training")
flags.DEFINE_integer("task_index", None,
"Worker task index, should be >= 0. task_index=0 is "
"the master worker task the performs the variable "
"initialization ")
#flags.DEFINE_integer("num_gpus", 2,
# "Total number of gpus for each machine."
# "If you dont use GPU, please set it to 0")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_integer("hidden_units", 100,
"Number of units in the hidden layer of the NN")
flags.DEFINE_integer("train_steps", 1000000,
"Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
flags.DEFINE_boolean("sync_replicas", False,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
#flags.DEFINE_boolean(
# "existing_servers", False, "Whether servers already exists. If True, "
# "will use the worker hosts via their GRPC URLs (one client process "
# "per worker host). Otherwise, will create an in-process TensorFlow "
# "server.")
flags.DEFINE_string("ps_hosts","192.168.233.201:2222",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "192.168.233.202:2223,192.168.233.203:2224",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")
FLAGS = flags.FLAGS
IMAGE_PIXELS = 28
def main(unused_argv):
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
# if FLAGS.download_only:
# sys.exit(0)
if FLAGS.job_name is None or FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if FLAGS.task_index is None or FLAGS.task_index =="":
raise ValueError("Must specify an explicit `task_index`")
print("job name = %s" % FLAGS.job_name)
print("task index = %d" % FLAGS.task_index)
#Construct the cluster and start the server
ps_spec = FLAGS.ps_hosts.split(",")
worker_spec = FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec({
"ps": ps_spec,
"worker": worker_spec})
#if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
is_chief = (FLAGS.task_index == 0)
# if FLAGS.num_gpus > 0:
# if FLAGS.num_gpus < num_workers:
# raise ValueError("number of gpus is less than number of workers")
# # Avoid gpu allocation conflict: now allocate task_num -> #gpu
# # for each worker in the corresponding machine
# gpu = (FLAGS.task_index % FLAGS.num_gpus)
# worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
# elif FLAGS.num_gpus == 0:
# # Just allocate the CPU to worker server
# cpu = 0
# worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
# # The device setter will automatically place Variables ops on separate
# # parameter servers (ps). The non-Variable ops will be placed on the workers.
# # The ps use CPU and workers use corresponding GPU
worker_device = "/job:worker/task:%d/gpu:0" % FLAGS.task_index
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
global_step = tf.Variable(0, name="global_step", trainable=False)
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal(
[IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0 / IMAGE_PIXELS),
name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal(
[FLAGS.hidden_units, 10],
stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
# Ops: located on the worker specified with FLAGS.task_index
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
replica_id=FLAGS.task_index,
name="mnist_sync_replicas")
train_step = opt.minimize(cross_entropy, global_step=global_step)
if FLAGS.sync_replicas and is_chief:
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = opt.get_chief_queue_runner()
init_tokens_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
train_dir = tempfile.mkdtemp()
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
FLAGS.task_index)
# if FLAGS.existing_servers:
# server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
# print("Using existing server at: %s" % server_grpc_url)
#
# sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
# else:
sess = sv.prepare_or_wait_for_session(server.target,
config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op
print("Starting chief queue runner and running init_tokens_op")
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_tokens_op)
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
while True:
# Training feed
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_step, global_step], feed_dict=train_feed)
local_step += 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, FLAGS.task_index, local_step, step))
if step >= FLAGS.train_steps:
break
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
# Validation feed
val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
val_xent = sess.run(cross_entropy, feed_dict=val_feed)
print("After %d training step(s), validation cross entropy = %g" %
(FLAGS.train_steps, val_xent))
if __name__ == "__main__":
tf.app.run()
參考資料:
《TensorFlow實戰》
歡迎付費諮詢(150元每小時),我的微信:qingxingfengzi
推薦閱讀:
※關於L1、L2正規化的一些疑問?
※致力於分享最新最全面的機器學習資料,歡迎你成為貢獻者!
※你用機器學習做過哪些有趣的事兒?
TAG:TensorFlow | 机器学习 | 深度学习DeepLearning |