實例介紹TensorFlow的輸入流水線

碼字不易,歡迎給個贊!

歡迎交流與轉載,文章會同步發布在公眾號:機器學習演算法全棧工程師(Jeemy110)

前言

在訓練模型時,我們首先要處理的就是訓練數據的載入與預處理的問題,這裡稱這個過程為輸入流水線(input pipelines,或輸入管道,參考)。在TensorFlow中,典型的輸入流水線包含三個流程(ETL流程):

  1. 提取(Extract):從存儲介質(如硬碟)中讀取數據,可能是本地讀取,也可能是遠程讀取(比如在分散式存儲系統HDFS)
  2. 預處理(Transform):利用CPU處理器解析和預處理提取的數據,如圖像解壓縮,數據擴增或者變換,然後會做random shuffle,並形成batch。
  3. 載入(load):將預處理後的數據載入到加速設備中(如GPUs)來執行模型的訓練。

輸入流水線對於加速模型訓練還是很重要的,如果你的CPU處理數據能力跟不上GPU的處理速度,此時CPU預處理數據就成為了訓練模型的瓶頸環節。除此之外,上述輸入流水線本身也有很多優化的地方。比如,一個典型的模型訓練過程中,CPU預處理數據時,GPU是閑置的,當GPU訓練模型時,CPU是閑置的,這個過程如下所示:

這樣一個訓練step中所花費的時間是CPU預處理數據和GPU訓練模型時間的總和。顯然這個過程中有資源浪費,一個改進的方法就是交叉CPU數據處理和GPU模型訓練這兩個過程,當GPU處於第個訓練階段,CPU正在準備第步所需的數據,如下圖所示:

明顯上述設計可以充分最大化利用CPU和GPU,從而減少資源的閑置。另外當存在多個CPU核心時,這又會涉及到CPU的並行化技術(多線程)來加速數據預處理過程,因為每個訓練樣本的預處理過程往往是互相獨立的。關於輸入流程線的優化可以參考TensorFlow官網上的Input Pipeline Performance Guide,相信你會受益匪淺。

幸運的是,最新的TensorFlow版本提供了tf.data這一套APIs來幫助我們快速實現高效又靈活的輸入流水線。在TensorFlow中最常見的載入訓練數據的方式是通過Feeding方式,其主要是定義placeholder,然後將通過Session.run()的feed_dict參數送入數據,但是這其實是最低效的載入數據方式。後來,TensorFlow增加了QueueRunner機制,其主要是基於文件隊列以及多線程技術,實現了更高效的輸入流水線,但是其APIs很是讓人難懂,所以就有了現在的tf.data來替代它。

這裡我們通過mnist實例來講解如何使用tf.data建立簡潔而高效的輸入流水線,在介紹之前,我們先介紹如何製作TFRecords文件,這是TensorFlow支持的一種標準文件格式。

製作TFRecords文件

TFRecords文件是TensorFlow中的標準數據格式,它是基於protobuf的二進位文件,每個TFRecord文件的基本元素是tf.train.Example,其對應的是數據集中的一個樣本數據,每個Example包含Features,存儲該樣本的各個feature,每個feature包含一個鍵值對,分別對應feature的特徵名與實際值。下面是一個Example實例(參考):

// An Example for a movie recommendation application: features { feature { key: "age" value { float_list { value: 29.0 }} } feature { key: "movie" value { bytes_list { value: "The Shawshank Redemption" value: "Fight Club" }} } feature { key: "movie_ratings" value { float_list { value: 9.0 value: 9.7 }} } feature { key: "suggestion" value { bytes_list { value: "Inception" }} } feature { key: "suggestion_purchased" value { float_list { value: 1.0 }} } feature { key: "purchase_price" value { float_list { value: 9.99 }} } }

上面是一個電影推薦系統中的一個樣本,可以看到它共含有6個特徵,每個特徵都是key-value類型,key是特徵名,而value是特徵值,值得注意的是value其實存儲的是一個list,根據數據類型共分為三種:bytes_list, float_listint64_list,分別存儲位元組、浮點及整數類型(見這裡)。

作為標準數據格式,TensorFlow當然提供了創建TFRecords文件的python介面,下面我們創建mnist數據集對應的TFRecords文件。對於mnist數據集,每個Example需要存儲兩個feature,一個是圖像的像素值,這裡可以用bytes類型,因為一個像素點正好可以用一個位元組存儲,另外是圖像的標籤值,只能用int64類型存儲了。因此,我們先定義這兩個類型的介面函數:

# int64def _int64_feature(value): return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))# bytesdef _bytes_feature(value): return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

創建TFRecord文件,主要通過TF中的tf.python_io.TFRecordWriter函數來實現,具體代碼如下:

def convert_to_TFRecords(dataset, name): """Convert mnist dataset to TFRecords""" images, labels = dataset.images, dataset.labels n_examples = dataset.num_examples filename = os.path.join(DIR, name + ".tfrecords") print("Writing", filename) with tf.python_io.TFRecordWriter(filename) as writer: for index in range(n_examples): image_bytes = images[index].tostring() label = labels[index] example = tf.train.Example(features=tf.train.Features( feature={"image": _bytes_feature(image_bytes), "label": _int64_feature(label)})) writer.write(example.SerializeToString())

對於mnist數據集,主要分為train、validation和test,利用上面的函數分別創建三個不同的TFRecords文件:

mnist_datasets = mnist.read_data_sets("mnist_data", dtype=tf.uint8, reshape=False)convert_to_TFRecords(mnist_datasets.train, "train")convert_to_TFRecords(mnist_datasets.validation, "validation")convert_to_TFRecords(mnist_datasets.test, "test")

好了,這樣我們就創建3個TFRecords文件了。

讀取TFRecords文件

上面我們創建了TFRecords文件,但是怎麼去讀取它們呢,當然TF提供了讀取TFRecords文件的介面函數,這裡首先介紹如何利用TF中操作TFRecord的python介面來讀取TFRecord文件,主要是tf.python_io.tf_record_iterator函數,它輸入TFRecord文件,但是得到一個迭代器,每個元素是一個Example,但是卻是一個字元串,這裡可以用tf.train.Example來解析它,具體代碼如下:

def read_TFRecords_test(name): filename = os.path.join(DIR, name + ".tfrecords") record_itr = tf.python_io.tf_record_iterator(path=filename) for r in record_itr: example = tf.train.Example() example.ParseFromString(r) label = example.features.feature["label"].int64_list.value[0] print("Label", label) image_bytes = example.features.feature["image"].bytes_list.value[0] img = np.fromstring(image_bytes, dtype=np.uint8).reshape(28, 28) print(img) plt.imshow(img, cmap="gray") plt.show() break # 只讀取一個Example

上面僅是純python的讀取方式,這不是TFRecords文件的正確使用方式。既然是官方標準數據格式,TF也提供了使用TFRecords文件建立輸入流水線的方式。在tf.data出現之前,使用的是QueueRunner方式,即文件隊列機制,其原理如下圖所示:

文件隊列機制主要分為兩個階段:第一個階段將輸入文件打亂,並在文件隊列入列,然後Reader從文件隊列中讀取一個文件,同時文件隊列出列這個文件,Reader同時對文件進行解碼,然後生產數據樣本,並將樣本在樣本隊列中入列,可以定義多個Reader並發地從多個文件同時讀取數據。從樣本隊列中的出列一定量的樣本數據即可以用於一個訓練過程。TF提供了配套的API來完成這個過程,注意的是這個輸入流水線是直接嵌入訓練的Graph中,即是整個圖模型的一部分。根據文件的不同,可以使用不同類型的Reader,對於TFRecord文件,可以使用tf.TFRecordReader,下面是具體的實現代碼:

def read_example(filename_queue): """Read one example from filename_queue""" reader = tf.TFRecordReader() key, value = reader.read(filename_queue) features = tf.parse_single_example(value, features={"image": tf.FixedLenFeature([], tf.string), "label": tf.FixedLenFeature([], tf.int64)}) image = tf.decode_raw(features["image"], tf.uint8) image = tf.reshape(image, [28, 28]) label = tf.cast(features["label"], tf.int32) return image, labelif __name__ == "__main__": queue = tf.train.string_input_producer(["TFRecords/train.tfrecords"], num_epochs=10) image, label = read_example(queue) img_batch, label_batch = tf.train.shuffle_batch([image, label], batch_size=32, capacity=5000, min_after_dequeue=2000, num_threads=4) with tf.Session() as sess: sess.run(tf.local_variables_initializer()) sess.run(tf.global_variables_initializer()) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) try: while not coord.should_stop(): # Run training steps or whatever images, labels = sess.run([img_batch, label_batch]) print(images.shape, labels.shape) except tf.errors.OutOfRangeError: print(Done training -- epoch limit reached) coord.request_stop() coord.join(threads)

對於隊列機制,估計大家看的雲里霧裡的,代碼確實讓人難懂,但是其實只要按照官方提供的標準代碼,還是很容易在自己的數據集上進行修改的。不過現在有了tf.data,可以更加優雅地實現上面的過程。

tf.data簡介

使用tf.data可以更方便地創建高效的輸入流水線,但是其相比隊列機制API更友好,這主要是因為tf.data提供了高級抽象。第一個抽象是使用tf.data.Dataset來表示一個數據集合,集合裡面的每個元素包含一個或者多個Tensor,一般就是對應一個訓練樣本。第二個抽象是使用tf.data.Iterator來從數據集中提取數據,這是一個迭代器對象,可以通過Iterator.get_next()Dataset中產生一個樣本。利用這兩個抽象,Dataset的使用簡化為三個步驟:

  1. 創建Dataset實例對象;
  2. 創建遍歷DatasetIterator實例對象;
  3. Iterator中不斷地產生樣本,並送入模型中進行訓練。

創建Dataset

TF提供了很多方式創建Dataset,下面是幾種方式:

# 從Numpy的arraydataset1 = tf.data.Dataset.from_tensor_slices(np.random.randn((5, 10))print(dataset1.output_types) # ==> "tf.float32"print(dataset1.output_shapes) # ==> "(10,)"# 從Tensordataset2 = tf.data.Dataset.from_tensor_slices((tf.random_uniform([4]), tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)))print(dataset2.output_types) # ==> "(tf.float32, tf.int32)"print(dataset2.output_shapes) # ==> "((), (100,))"# 從文件filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]dataset3 = tf.data.TFRecordDataset(filenames)

更重要的是Dataset可以進行一系列的變換操作,並且支持鏈式調用,這對於數據預處理很重要:

dataset = tf.data.TFRecordDataset(filenames)dataset = dataset.map(...) # 解析數據或者對數據預處理,如normalize.dataset = dataset.repeat() # 重複數據集,一般設置num_epochsdataset = dataset.batch(32) # 形成batch

創建Iterator

創建了Dataset之後,我們需要創建Iterator來遍曆數據集,返回的是迭代器對象,並從中可以產生數據,以用於模型訓練。TF共支持4中迭代器類型,分別是one-shot, initializable, reinitializable和feedable。下面逐個介紹它們。

One-shot Iterator

這是最簡單的Iterator,它僅僅遍歷整個數據集一次,而且不需要顯示初始化,下面是個實例:

dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))iterator = dataset.make_one_shot_iterator()next_element = iterator.get_next()with tf.Session() as sess: for i in range(10): sess.run(next_element) # 0, 1, ..., 9

Initializable Iterator

相比one-shot Iterator,它需要在使用前顯示初始化,這樣就可以支持參數化,每次初始化時送入不同的參數,就可以支持數據集的簡單參數化,下面是一個實例:

max_value = tf.placeholder(tf.int64, [])dataset = tf.data.Dataset.range(max_value)iterator = dataset.make_initializable_iterator()next_element = iterator.get_next()with tf.Session() as sess: # 需要顯示初始化 sess.run(iterator.initializer, feed_dict={max_value: 10}) for i in range(10): print(sess.run(next_element)) # 0, 1, ..., 9

Reinitializable Iterator

相比initializable Iterator,它可以支持從不同的Dataset進行初始化,有時候你需要訓練集和測試集,但是兩者並不同,此時就可以定義兩個不同的Dataset,並配合reinitializable Iterator來定義一個通用的迭代器,在使用前只需要送入不同的Dataset進行初始化就可以,下面是一個實例:

train_data = np.random.randn(100, 5)test_data = np.random.randn(20, 5)train_dataset = tf.data.Dataset.from_tensor_slices(train_data)test_dataset = tf.data.Dataset.from_tensor_slices(test_data)# 創建一個reinitializable iteratorre_iterator = tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes)next_element = re_iterator.get_next()train_init_op = re_iterator.make_initializer(train_dataset)test_init_op = re_iterator.make_initializer(test_dataset)with tf.Session() as sess: # 訓練 n_epochs = 2 for i in range(n_epochs): sess.run(train_init_op) for j in range(100): print(sess.run(next_element)) # 測試 sess.run(test_init_op) for i in range(20): print(sess.run(next_element))

Feedable Iterator

對於reinitializable iterator,它可以支持送入不同Dataset,從而完成數據集的切換,但是每次切換時必須要重新初始化。對於Feedable Iterator,其可以認為支持送入不同的Iterator,通過切換迭代器的string handle來完成不同數據集的切換,並且在切換時迭代器的狀態還會被保留,這相比reinitializable iterator更加靈活,下面是一個實例:

train_data = np.random.randn(100, 5)val_data = np.random.randn(20, 5)n_epochs = 20train_dataset = tf.data.Dataset.from_tensor_slices(train_data).repeat(n_epochs)val_dataset = tf.data.Dataset.from_tensor_slices(val_data)# 創建一個feedable iteratorhandle = tf.placeholder(tf.string, [])feed_iterator = tf.data.Iterator.from_string_handle(handle, train_dataset.output_types, train_dataset.output_shapes)next_element = feed_iterator.get_next()# 創建不同的iteratortrain_iterator = train_dataset.make_one_shot_iterator()val_iterator = val_dataset.make_initializable_iterator()with tf.Session() as sess: # 生成對應的handle train_handle = sess.run(train_iterator.string_handle()) val_handle = sess.run(val_iterator.string_handle()) # 訓練 for n in range(n_epochs): for i in range(100): print(i, sess.run(next_element, feed_dict={handle: train_handle})) # 驗證 if n % 10 == 0: sess.run(val_iterator.initializer) for i in range(20): print(sess.run(next_element, feed_dict={handle: val_handle}))

關於tf.data的基礎知識就這麼多了,更多內容可以參考官方文檔,另外這裡要說一點就是,對於迭代器對象,當其元素取盡之後,會拋出tf.errors.OutOfRangeError錯誤,當然一般情況下你是知道自己的迭代器對象的元素數,那麼也就可以不用通過捕獲錯誤來實現終止條件。下面,我們將使用tf.data實現mnist的完整訓練過程。

MNIST完整實例

我們採用feedable Iterator來實現mnist數據集的訓練過程,分別創建兩個Dataset,一個為訓練集,一個為驗證集,對於驗證集不需要shuffle操作。首先我們創建Dataset對象的輔助函數,主要是解析TFRecords文件,並對image做歸一化處理:

def decode(serialized_example): """decode the serialized example""" features = tf.parse_single_example(serialized_example, features={"image": tf.FixedLenFeature([], tf.string), "label": tf.FixedLenFeature([], tf.int64)}) image = tf.decode_raw(features["image"], tf.uint8) image = tf.cast(image, tf.float32) image = tf.reshape(image, [784]) label = tf.cast(features["label"], tf.int64) return image, labeldef normalize(image, label): """normalize the image to [-0.5, 0.5]""" image = image / 255.0 - 0.5 return image, label

然後定義創建Dataset的函數,對於訓練集和驗證集,兩者的參數會不同:

def create_dataset(filename, batch_size=64, is_shuffle=False, n_repeats=0): """create dataset for train and validation dataset""" dataset = tf.data.TFRecordDataset(filename) if n_repeats > 0: dataset = dataset.repeat(n_repeats) # for train dataset = dataset.map(decode).map(normalize) # decode and normalize if is_shuffle: dataset = dataset.shuffle(1000 + 3 * batch_size) # shuffle dataset = dataset.batch(batch_size) return dataset

我們使用一個簡單的全連接層網路來實現mnist的分類模型:

def model(inputs, hidden_sizes=(500, 500)): h1, h2 = hidden_sizes net = tf.layers.dense(inputs, h1, activation=tf.nn.relu) net = tf.layers.dense(net, h2, activation=tf.nn.relu) net = tf.layers.dense(net, 10, activation=None) return net

然後是訓練的主體代碼:

n_train_examples = 55000n_val_examples = 5000n_epochs = 50batch_size = 64train_dataset = create_dataset("TFRecords/train.tfrecords", batch_size=batch_size, is_shuffle=True, n_repeats=n_epochs)val_dataset = create_dataset("TFRecords/validation.tfrecords", batch_size=batch_size)# 創建一個feedable iteratorhandle = tf.placeholder(tf.string, [])feed_iterator = tf.data.Iterator.from_string_handle(handle, train_dataset.output_types, train_dataset.output_shapes)images, labels = feed_iterator.get_next()# 創建不同的iteratortrain_iterator = train_dataset.make_one_shot_iterator()val_iterator = val_dataset.make_initializable_iterator()# 創建模型logits = model(images, [500, 500])loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)loss = tf.reduce_mean(loss)train_op = tf.train.AdamOptimizer(learning_rate=1e-04).minimize(loss)predictions = tf.argmax(logits, axis=1)accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, labels), tf.float32))init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())with tf.Session() as sess: sess.run(init_op) # 生成對應的handle train_handle = sess.run(train_iterator.string_handle()) val_handle = sess.run(val_iterator.string_handle()) # 訓練 for n in range(n_epochs): ls = [] for i in range(n_train_examples // batch_size): _, l = sess.run([train_op, loss], feed_dict={handle: train_handle}) ls.append(l) print("Epoch %d, train loss: %f" % (n, np.mean(ls))) if (n + 1) % 10 == 0: sess.run(val_iterator.initializer) accs = [] for i in range(n_val_examples // batch_size): acc = sess.run(accuracy, feed_dict={handle: val_handle}) accs.append(acc) print(" validation accuracy: %f" % (np.mean(accs)))

大約可以在驗證集上的accuracy達到98%。

小結

看起來最新的tf.data還是比較好用的,如果你是TensorFlow用戶,可以嘗試著使用它,當然上面的例子並不能包含關於tf.data的所有內容,想繼續深入的話可以移步TF的官網。

參考

  1. Programmers guide: import data.
  2. How to use Dataset in TensorFlow.
  3. Reading data.
  4. Performance: datasets performance.
  5. Introduction to Artificial Neural Networks and Deep Learning: A Practical Guide with Applications in Python.

碼字不易,歡迎給個贊!

歡迎交流與轉載,文章會同步發布在公眾號:機器學習演算法全棧工程師(Jeemy110)


推薦閱讀:

識別漢字圖像的數據集
使用TensorFlow分類手寫數字
tensorflow中的tensorboard可視化中的準確率損失率曲線,為什麼有類似毛刺一樣?
不要慫,就是GAN (生成式對抗網路) (三):判別器和生成器 TensorFlow Model
TensorFlow 官方文檔中文版 [W3Cschool]

TAG:深度學習DeepLearning | TensorFlow | 人工智慧 |