學習筆記TF040:多GPU並行

TensorFlow並行,模型並行,數據並行。模型並行根據不同模型設計不同並行方式,模型不同計算節點放在不同硬伯上資源運算。數據並行,比較通用簡便實現大規模並行方式,同時使用多個硬體資源計算不同batch數據梯度,匯總梯度全局參數更新。

數據並行,多塊GPU同時訓練多個batch數據,運行在每塊GPU模型基於同一神經網路,網路結構一樣,共享模型參數。

同步數據並行,所有GPU計算完batch數據梯度,統計將多個梯度合在一起,更新共享模型參數,類似使用較大batch。GPU型號、速度一致時,效率最高。

非同步數據並行,不等待所有GPU完成一次訓練,哪個GPU完成訓練,立即將梯度更新到共享模型參數。

同步數據並行,比非同步收斂速度更快,模型精度更高。

同步數據並行,數據集CIFAR-10。載入依賴庫,TensorFlow Models cifar10類,下載CIFAR-10數據預處理。

設置batch大小 128,最大步數100萬步(中間隨時停止,模型定期保存),GPU數量4。

定義計算損失函數tower_loss。cifar10.distorted_inputs產生數據增強images、labels,調用cifar10.inference生成卷積網路,每個GPU生成單獨網路,結構一致,共享模型參數。根據卷積網路、labels,調用cifar10.loss計算損失函數(loss儲存到collection),tf.get_collection(losses,scope)獲取當前GPU loss(scope限定範圍),tf.add_n 所有損失疊加一起得total_loss。返回total_loss作函數結果。

定義函數average_gradients,不同GPU計算梯度合成。輸入參數tower_grads梯度雙層列表,外層列表不同GPU計算梯度,內層列表GPU計算不同Variable梯度。最內層元素(grads,variable),tower_grads基本元素二元組(梯度、變數),具體形式[[(grad0_gpu0,var0_gpu0),(grad1_gpu0,var1_gpu0)……],[(grad0_gpu1,var0_gpu1),(grad1_gpu1,var1_gpu1)……]……]。創建平均梯度列表average_grads,梯度在不同GPU平均。zip(*tower_grads)雙層列錶轉置,變[[(grad0_gpu0,var0_gpu0),(grad0_gpu1,var0_gpu1)……],[(grad1_gpu0,var1_gpu0),(grad1_gpu1,var1_gpu1)……]……]形式,循環遍曆元素。循環獲取元素grad_and_vars,同Variable梯度在不同GPU計算結果。同Variable梯度不同GPU計算副本,計算梯度均值。梯度N維向量,每個維度平均。tf.expand_dims給梯度添加冗餘維度0,梯度放列表grad。tf.concat 維度0上合併。tf.reduce_mean維度0平均,其他維度全部平均。平均梯度,和Variable組合得原有二元組(梯度、變數)格式,添加到列表average_grads。所有梯度求均後,返回average_grads。

定義訓練函數。設置默認計算設備CPU。global_step記錄全局訓練步數,計算epoch對應batch數,學習速率衰減需要步數decay_steps。tf.train.exponential_decay創建隨訓練步數衰減學習速率,第一參數初始學習速率,第二參數全局訓練步數,第三參數每次衰減需要步數,第四參數衰減率,staircase設true,階梯式衰減。設置優化演算法GradientDescent,傳入隨機步數衰減學習速率。

定義儲存GPU計算結果列表tower_grads。創建循環,循環次數GPU數量。循環中tf.device限定使用哪個GPU。tf.name_scope命名空間。

GPU用tower_loss獲取損失。tf.get_variable_scope().reuse_variables()重用參數。GPU共用一個模型入完全相同參數。opt.compute_gradients(loss)計算單個GPU梯度,添加到梯度列表tower_grads。average_gradients計算平均梯度,opt.apply_gradients更新模型參數。

創建模型保存器saver,Session allow_soft_placement 參數設True。有些操作只能在CPU上進行,不使用soft_placement。初始化全部參數,tf.train.start_queue_runner()準備大量數據增強訓練樣本,防止訓練被阻塞在生成樣本。

訓練循環,最大迭代次數max_steps。每步執行一次更新梯度操作apply_gradient_op(一次訓練操作),計算損失操作loss。time.time()記錄耗時。每隔10步,展示當前batch loss。每秒鐘可訓練樣本數和每個batch訓練花費時間。每隔1000步,Saver保存整個模型文件。

cifar10.maybe_download_and_extract()下載完整CIFAR-10數據,train()開始訓練。

loss從最開始4點幾,到第70萬步,降到0.07。平均每個batch耗時0.021s,平均每秒訓練6000個樣本,單GPU 4倍。

import os.path

import re

import time

import numpy as np

import tensorflow as tf

import cifar10

batch_size=128

#train_dir=/tmp/cifar10_train

max_steps=1000000

num_gpus=4

#log_device_placement=False

def tower_loss(scope):

"""Calculate the total loss on a single tower running the CIFAR model.

Args:

scope: unique prefix string identifying the CIFAR tower, e.g. tower_0

Returns:

Tensor of shape [] containing the total loss for a batch of data

"""

# Get images and labels for CIFAR-10.

images, labels = cifar10.distorted_inputs()

# Build inference Graph.

logits = cifar10.inference(images)

# Build the portion of the Graph calculating the losses. Note that we will

# assemble the total_loss using a custom function below.

_ = cifar10.loss(logits, labels)

# Assemble all of the losses for the current tower only.

losses = tf.get_collection(losses, scope)

# Calculate the total loss for the current tower.

total_loss = tf.add_n(losses, name=total_loss)

# Compute the moving average of all individual losses and the total loss.

# loss_averages = tf.train.ExponentialMovingAverage(0.9, name=avg)

# loss_averages_op = loss_averages.apply(losses + [total_loss])

# Attach a scalar summary to all individual losses and the total loss; do the

# same for the averaged version of the losses.

# for l in losses + [total_loss]:

# Remove tower_[0-9]/ from the name in case this is a multi-GPU training

# session. This helps the clarity of presentation on tensorboard.

# loss_name = re.sub(%s_[0-9]*/ % cifar10.TOWER_NAME, , l.op.name)

# Name each loss as (raw) and name the moving average version of the loss

# as the original loss name.

# tf.scalar_summary(loss_name + (raw), l)

# tf.scalar_summary(loss_name, loss_averages.average(l))

# with tf.control_dependencies([loss_averages_op]):

# total_loss = tf.identity(total_loss)

return total_loss

def average_gradients(tower_grads):

"""Calculate the average gradient for each shared variable across all towers.

Note that this function provides a synchronization point across all towers.

Args:

tower_grads: List of lists of (gradient, variable) tuples. The outer list

is over individual gradients. The inner list is over the gradient

calculation for each tower.

Returns:

List of pairs of (gradient, variable) where the gradient has been averaged

across all towers.

"""

average_grads = []

for grad_and_vars in zip(*tower_grads):

# Note that each grad_and_vars looks like the following:

# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))

grads = []

for g, _ in grad_and_vars:

# Add 0 dimension to the gradients to represent the tower.

expanded_g = tf.expand_dims(g, 0)

# Append on a tower dimension which we will average over below.

grads.append(expanded_g)

# Average over the tower dimension.

grad = tf.concat(grads, 0)

grad = tf.reduce_mean(grad, 0)

# Keep in mind that the Variables are redundant because they are shared

# across towers. So .. we will just return the first towers pointer to

# the Variable.

v = grad_and_vars[0][1]

grad_and_var = (grad, v)

average_grads.append(grad_and_var)

return average_grads

def train():

"""Train CIFAR-10 for a number of steps."""

with tf.Graph().as_default(), tf.device(/cpu:0):

# Create a variable to count the number of train() calls. This equals the

# number of batches processed * FLAGS.num_gpus.

global_step = tf.get_variable(

global_step, [],

initializer=tf.constant_initializer(0), trainable=False)

# Calculate the learning rate schedule.

num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /

batch_size)

decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)

# Decay the learning rate exponentially based on the number of steps.

lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,

global_step,

decay_steps,

cifar10.LEARNING_RATE_DECAY_FACTOR,

staircase=True)

# Create an optimizer that performs gradient descent.

opt = tf.train.GradientDescentOptimizer(lr)

# Calculate the gradients for each model tower.

tower_grads = []

for i in range(num_gpus):

with tf.device(/gpu:%d % i):

with tf.name_scope(%s_%d % (cifar10.TOWER_NAME, i)) as scope:

# Calculate the loss for one tower of the CIFAR model. This function

# constructs the entire CIFAR model but shares the variables across

# all towers.

loss = tower_loss(scope)

# Reuse variables for the next tower.

tf.get_variable_scope().reuse_variables()

# Retain the summaries from the final tower.

# summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)

# Calculate the gradients for the batch of data on this CIFAR tower.

grads = opt.compute_gradients(loss)

# Keep track of the gradients across all towers.

tower_grads.append(grads)

# We must calculate the mean of each gradient. Note that this is the

# synchronization point across all towers.

grads = average_gradients(tower_grads)

# Add a summary to track the learning rate.

# summaries.append(tf.scalar_summary(learning_rate, lr))

# Add histograms for gradients.

# for grad, var in grads:

# if grad is not None:

# summaries.append(

# tf.histogram_summary(var.op.name + /gradients, grad))

# Apply the gradients to adjust the shared variables.

apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)

# Add histograms for trainable variables.

# for var in tf.trainable_variables():

# summaries.append(tf.histogram_summary(var.op.name, var))

# Track the moving averages of all trainable variables.

# variable_averages = tf.train.ExponentialMovingAverage(

# cifar10.MOVING_AVERAGE_DECAY, global_step)

# variables_averages_op = variable_averages.apply(tf.trainable_variables())

# Group all updates to into a single train op.

# train_op = tf.group(apply_gradient_op, variables_averages_op)

# Create a saver.

saver = tf.train.Saver(tf.all_variables())

# Build the summary operation from the last tower summaries.

# summary_op = tf.merge_summary(summaries)

# Build an initialization operation to run below.

init = tf.global_variables_initializer()

# Start running operations on the Graph. allow_soft_placement must be set to

# True to build towers on GPU, as some of the ops do not have GPU

# implementations.

sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))

sess.run(init)

# Start the queue runners.

tf.train.start_queue_runners(sess=sess)

# summary_writer = tf.train.SummaryWriter(train_dir, sess.graph)

for step in range(max_steps):

start_time = time.time()

_, loss_value = sess.run([apply_gradient_op, loss])

duration = time.time() - start_time

assert not np.isnan(loss_value), Model diverged with loss = NaN

if step % 10 == 0:

num_examples_per_step = batch_size * num_gpus

examples_per_sec = num_examples_per_step / duration

sec_per_batch = duration / num_gpus

format_str = (step %d, loss = %.2f (%.1f examples/sec; %.3f

sec/batch))

print (format_str % (step, loss_value,

examples_per_sec, sec_per_batch))

# if step % 100 == 0:

# summary_str = sess.run(summary_op)

# summary_writer.add_summary(summary_str, step)

# Save the model checkpoint periodically.

if step % 1000 == 0 or (step + 1) == max_steps:

# checkpoint_path = os.path.join(train_dir, model.ckpt)

saver.save(sess, /tmp/cifar10_train/model.ckpt, global_step=step)

cifar10.maybe_download_and_extract()

#if tf.gfile.Exists(train_dir):

# tf.gfile.DeleteRecursively(train_dir)

#tf.gfile.MakeDirs(train_dir)

train()

參考資料:

《TensorFlow實戰》

歡迎付費諮詢(150元每小時),我的微信:qingxingfengzi


推薦閱讀:

tensorflow中的tensorboard可視化中的準確率損失率曲線,為什麼有類似毛刺一樣?

TAG:TensorFlow | 机器学习 | 深度学习DeepLearning |