





同步數據並行,數據集CIFAR-10。載入依賴庫,TensorFlow Models cifar10類,下載CIFAR-10數據預處理。

設置batch大小 128,最大步數100萬步(中間隨時停止,模型定期保存),GPU數量4。

定義計算損失函數tower_loss。cifar10.distorted_inputs產生數據增強images、labels,調用cifar10.inference生成卷積網路,每個GPU生成單獨網路,結構一致,共享模型參數。根據卷積網路、labels,調用cifar10.loss計算損失函數(loss儲存到collection),tf.get_collection(losses,scope)獲取當前GPU loss(scope限定範圍),tf.add_n 所有損失疊加一起得total_loss。返回total_loss作函數結果。

定義函數average_gradients,不同GPU計算梯度合成。輸入參數tower_grads梯度雙層列表,外層列表不同GPU計算梯度,內層列表GPU計算不同Variable梯度。最內層元素(grads,variable),tower_grads基本元素二元組(梯度、變數),具體形式[[(grad0_gpu0,var0_gpu0),(grad1_gpu0,var1_gpu0)……],[(grad0_gpu1,var0_gpu1),(grad1_gpu1,var1_gpu1)……]……]。創建平均梯度列表average_grads,梯度在不同GPU平均。zip(*tower_grads)雙層列錶轉置,變[[(grad0_gpu0,var0_gpu0),(grad0_gpu1,var0_gpu1)……],[(grad1_gpu0,var1_gpu0),(grad1_gpu1,var1_gpu1)……]……]形式,循環遍曆元素。循環獲取元素grad_and_vars,同Variable梯度在不同GPU計算結果。同Variable梯度不同GPU計算副本,計算梯度均值。梯度N維向量,每個維度平均。tf.expand_dims給梯度添加冗餘維度0,梯度放列表grad。tf.concat 維度0上合併。tf.reduce_mean維度0平均,其他維度全部平均。平均梯度,和Variable組合得原有二元組(梯度、變數)格式,添加到列表average_grads。所有梯度求均後,返回average_grads。




創建模型保存器saver,Session allow_soft_placement 參數設True。有些操作只能在CPU上進行,不使用soft_placement。初始化全部參數,tf.train.start_queue_runner()準備大量數據增強訓練樣本,防止訓練被阻塞在生成樣本。

訓練循環,最大迭代次數max_steps。每步執行一次更新梯度操作apply_gradient_op(一次訓練操作),計算損失操作loss。time.time()記錄耗時。每隔10步,展示當前batch loss。每秒鐘可訓練樣本數和每個batch訓練花費時間。每隔1000步,Saver保存整個模型文件。


loss從最開始4點幾,到第70萬步,降到0.07。平均每個batch耗時0.021s,平均每秒訓練6000個樣本,單GPU 4倍。

import os.path

import re

import time

import numpy as np

import tensorflow as tf

import cifar10






def tower_loss(scope):

"""Calculate the total loss on a single tower running the CIFAR model.


scope: unique prefix string identifying the CIFAR tower, e.g. tower_0


Tensor of shape [] containing the total loss for a batch of data


# Get images and labels for CIFAR-10.

images, labels = cifar10.distorted_inputs()

# Build inference Graph.

logits = cifar10.inference(images)

# Build the portion of the Graph calculating the losses. Note that we will

# assemble the total_loss using a custom function below.

_ = cifar10.loss(logits, labels)

# Assemble all of the losses for the current tower only.

losses = tf.get_collection(losses, scope)

# Calculate the total loss for the current tower.

total_loss = tf.add_n(losses, name=total_loss)

# Compute the moving average of all individual losses and the total loss.

# loss_averages = tf.train.ExponentialMovingAverage(0.9, name=avg)

# loss_averages_op = loss_averages.apply(losses + [total_loss])

# Attach a scalar summary to all individual losses and the total loss; do the

# same for the averaged version of the losses.

# for l in losses + [total_loss]:

# Remove tower_[0-9]/ from the name in case this is a multi-GPU training

# session. This helps the clarity of presentation on tensorboard.

# loss_name = re.sub(%s_[0-9]*/ % cifar10.TOWER_NAME, , l.op.name)

# Name each loss as (raw) and name the moving average version of the loss

# as the original loss name.

# tf.scalar_summary(loss_name + (raw), l)

# tf.scalar_summary(loss_name, loss_averages.average(l))

# with tf.control_dependencies([loss_averages_op]):

# total_loss = tf.identity(total_loss)

return total_loss

def average_gradients(tower_grads):

"""Calculate the average gradient for each shared variable across all towers.

Note that this function provides a synchronization point across all towers.


tower_grads: List of lists of (gradient, variable) tuples. The outer list

is over individual gradients. The inner list is over the gradient

calculation for each tower.


List of pairs of (gradient, variable) where the gradient has been averaged

across all towers.


average_grads = []

for grad_and_vars in zip(*tower_grads):

# Note that each grad_and_vars looks like the following:

# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))

grads = []

for g, _ in grad_and_vars:

# Add 0 dimension to the gradients to represent the tower.

expanded_g = tf.expand_dims(g, 0)

# Append on a tower dimension which we will average over below.


# Average over the tower dimension.

grad = tf.concat(grads, 0)

grad = tf.reduce_mean(grad, 0)

# Keep in mind that the Variables are redundant because they are shared

# across towers. So .. we will just return the first towers pointer to

# the Variable.

v = grad_and_vars[0][1]

grad_and_var = (grad, v)


return average_grads

def train():

"""Train CIFAR-10 for a number of steps."""

with tf.Graph().as_default(), tf.device(/cpu:0):

# Create a variable to count the number of train() calls. This equals the

# number of batches processed * FLAGS.num_gpus.

global_step = tf.get_variable(

global_step, [],

initializer=tf.constant_initializer(0), trainable=False)

# Calculate the learning rate schedule.

num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /


decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)

# Decay the learning rate exponentially based on the number of steps.

lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,





# Create an optimizer that performs gradient descent.

opt = tf.train.GradientDescentOptimizer(lr)

# Calculate the gradients for each model tower.

tower_grads = []

for i in range(num_gpus):

with tf.device(/gpu:%d % i):

with tf.name_scope(%s_%d % (cifar10.TOWER_NAME, i)) as scope:

# Calculate the loss for one tower of the CIFAR model. This function

# constructs the entire CIFAR model but shares the variables across

# all towers.

loss = tower_loss(scope)

# Reuse variables for the next tower.


# Retain the summaries from the final tower.

# summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)

# Calculate the gradients for the batch of data on this CIFAR tower.

grads = opt.compute_gradients(loss)

# Keep track of the gradients across all towers.


# We must calculate the mean of each gradient. Note that this is the

# synchronization point across all towers.

grads = average_gradients(tower_grads)

# Add a summary to track the learning rate.

# summaries.append(tf.scalar_summary(learning_rate, lr))

# Add histograms for gradients.

# for grad, var in grads:

# if grad is not None:

# summaries.append(

# tf.histogram_summary(var.op.name + /gradients, grad))

# Apply the gradients to adjust the shared variables.

apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)

# Add histograms for trainable variables.

# for var in tf.trainable_variables():

# summaries.append(tf.histogram_summary(var.op.name, var))

# Track the moving averages of all trainable variables.

# variable_averages = tf.train.ExponentialMovingAverage(

# cifar10.MOVING_AVERAGE_DECAY, global_step)

# variables_averages_op = variable_averages.apply(tf.trainable_variables())

# Group all updates to into a single train op.

# train_op = tf.group(apply_gradient_op, variables_averages_op)

# Create a saver.

saver = tf.train.Saver(tf.all_variables())

# Build the summary operation from the last tower summaries.

# summary_op = tf.merge_summary(summaries)

# Build an initialization operation to run below.

init = tf.global_variables_initializer()

# Start running operations on the Graph. allow_soft_placement must be set to

# True to build towers on GPU, as some of the ops do not have GPU

# implementations.

sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))


# Start the queue runners.


# summary_writer = tf.train.SummaryWriter(train_dir, sess.graph)

for step in range(max_steps):

start_time = time.time()

_, loss_value = sess.run([apply_gradient_op, loss])

duration = time.time() - start_time

assert not np.isnan(loss_value), Model diverged with loss = NaN

if step % 10 == 0:

num_examples_per_step = batch_size * num_gpus

examples_per_sec = num_examples_per_step / duration

sec_per_batch = duration / num_gpus

format_str = (step %d, loss = %.2f (%.1f examples/sec; %.3f


print (format_str % (step, loss_value,

examples_per_sec, sec_per_batch))

# if step % 100 == 0:

# summary_str = sess.run(summary_op)

# summary_writer.add_summary(summary_str, step)

# Save the model checkpoint periodically.

if step % 1000 == 0 or (step + 1) == max_steps:

# checkpoint_path = os.path.join(train_dir, model.ckpt)

saver.save(sess, /tmp/cifar10_train/model.ckpt, global_step=step)


#if tf.gfile.Exists(train_dir):

# tf.gfile.DeleteRecursively(train_dir)








