標籤:

TensorFlow中的Queue和QueueRunner

一句話概括就是:Queue->(構建圖階段)創建隊列;QueueRunner->(構建圖階段)創建線程進行入隊操作;tf.train.start_queue_runners()->(執行圖階段)填充隊列;tf.train.Coordinator() 在線程出錯時關閉之。

【1】從流水線(pipeline)說起

最早用到的TensorFlow讀數據的方法就是把數據集中所有的數據都讀到內存中。但是有些數據集很大,沒法一次讀入內存。我們可以按batch讀數據,一次讀入一個batch。如果是純串列的操作,即「讀數據->計算->讀數據」,這樣的計算效率就相當低。

一個可行的改進就是「流水線」(pipeline)。流水線的一個簡明的解釋如下:(來自young cc的答案zhihu.com/question/3502:)

流水線(pipeline)是將組合邏輯進行分割,能讓任務以類似並行方式處理,提高系統頻率,提高吞吐量(throughput),使各模塊利用率達到最高

舉個例子,假設洗衣分為四個步驟,分別在四個設備上進行,其中漂洗機器,烘乾機器,在衣架上摺疊衣服,把衣服放進柜子里各用30分鐘,全過程需兩小時。現有四個人去洗衣服,若sequential處理,一個人完成全步驟別人才開始,每人兩小時,四個人共用八小時。如下圖:

但如果利用pipeline式的流水處理,當某人完成某步驟,其所用的設備就空閑了,後面的人就開始使用,四個人洗衣服只用3.5小時就能完成。如下圖:

注意,pipeline只是提高系統的吞吐量,不能改善單個任務的latency。在實際電路中是在組合邏輯中插入register,分割組合邏輯實現pipeline,而register讀寫也需要時間,所以單個任務的執行時間反而會增長。另外在分割組合邏輯時,使分割後的每段處理時間盡量相同,因為系統時鐘是由最慢的那段決定的。如下圖:

TensorFlow中,讀取數據的線程負責把數據從硬碟讀到內存中的隊列裡面,計算的線程從內存中的隊列得到數據進行計算。也就是說,讀數據的線程只管把數據讀到內存中,計算的線程只管從內存中取數據。兩者都不會空閑下來等對方。

【2】Queue:

Queue,隊列,本身也是圖中的一個節點。入隊和出隊的操作(enqueue, dequeue)也是圖中的節點,可以修改Queue節點中的內容。類似Variable,用來存放數據。

如果Queue中的數據滿了,那麼enqueue操作將會阻塞,如果Queue是空的,那麼dequeue操作就會阻塞。如果操作不當,可能會出現程序卡住的問題,我就遇到過這個情況。

在常用環境中,一般是有多個enqueue線程同時像Queue中放數據,有一個dequeue操作從Queue中取數據。一般來說enqueue線程就是準備數據的線程,dequeue線程就是訓練數據的線程.

運行了以下代碼(From tensorflow中關於隊列使用的實驗 - CSDN博客),以理解Queue究竟發生了什麼

#-*- coding:utf-8 -*- nimport tensorflow as tf n n#創建的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操作 nq = tf.FIFOQueue(3, "float") ninit = q.enqueue_many(([0.1, 0.2, 0.3],)) nx = q.dequeue() ny = x + 1 nq_inc = q.enqueue([y]) n n#開啟一個session,session是會話,會話的潛在含義是狀態保持,各種tensor的狀態保持 nwith tf.Session() as sess: n sess.run(init) n n for i in range(2): n sess.run(q_inc) n n quelen = sess.run(q.size()) n for i in range(quelen): n print (sess.run(q.dequeue())) n

# -*- coding:utf-8 -*-nimport tensorflow as tfnn# 創建的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操作nq = tf.FIFOQueue(3, "float")n

init = q.enqueue_many(([0.1, 0.2, 0.3],))n

node {n name: "fifo_queue"n op: "FIFOQueueV2"n attr {n key: "capacity"n value {n i: 3n }n }n attr {n key: "component_types"n value {n list {n type: DT_FLOATn }n }n }n attr {n key: "container"n value {n s: ""n }n }n attr {n key: "shapes"n value {n list {n }n }n }n attr {n key: "shared_name"n value {n s: ""n }n }n}nnode {n name: "fifo_queue_EnqueueMany/component_0"n op: "Const"n attr {n key: "dtype"n value {n type: DT_FLOATn }n }n attr {n key: "value"n value {n tensor {n dtype: DT_FLOATn tensor_shape {n dim {n size: 3n }n }n tensor_content: "315314314=315314L>232231231>"n }n }n }n}nnode {n name: "fifo_queue_EnqueueMany"n op: "QueueEnqueueManyV2"n input: "fifo_queue"n input: "fifo_queue_EnqueueMany/component_0"n attr {n key: "Tcomponents"n value {n list {n type: DT_FLOATn }n }n }n attr {n key: "timeout_ms"n value {n i: -1n }n }n}nversions {n producer: 24n}n

x = q.dequeue()n

y = x + 1n

q_inc = q.enqueue([y])n

# 開啟一個session,session是會話,會話的潛在含義是狀態保持,各種tensor的狀態保持nwith tf.Session() as sess:n sess.run(init)n for i in range(2):n sess.run(q_inc)n quelen = sess.run(q.size())n for i in range(quelen):n print(sess.run(q.dequeue()))n

輸出結果為:

0.3n1.1n1.2n

【3】QueueRunner

關於QueueRunner, tensorflow中關於隊列使用的實驗 - CSDN博客 講得特別好:

入隊操作是從硬碟中讀取輸入,放到內存當中,速度較慢。

使用QueueRunner可以 創建一系列新的線程 進行入隊操作,讓主線程繼續使用數據。

如果在訓練神經網路的場景中,就是訓練網路和讀取數據是非同步的,主線程在訓練網路,另一個線程在將數據從硬碟讀入內存。

官方文檔中關於queue的介紹和對應的代碼:

A typical queue-based pipeline for reading records from files has the following stages:

(1)The list of filenames

self.image_list, self.label_list = read_labeled_image_list(self.data_dir, self.data_list)nself.images = tf.convert_to_tensor(self.image_list, dtype=tf.string)nself.labels = tf.convert_to_tensor(self.label_list, dtype=tf.string)n

(2)Optional filename shuffling

Optional epoch limit

Filename queue

self.queue = tf.train.slice_input_producer([self.images, self.labels],n shuffle=input_size is not None)n

(3)A Reader for the file format

img_contents = tf.read_file(input_queue[0])nlabel_contents = tf.read_file(input_queue[1])n

(4)A decoder for a record read by the reader

img = tf.image.decode_png(img_contents, channels=3)nlabel = tf.image.decode_png(label_contents, channels=1)n

(5)Optional preprocessing

img_r, img_g, img_b = tf.split(axis=2, num_or_size_splits=3, value=img)n img = tf.cast(tf.concat(axis=2, values=[img_b, img_g, img_r]), dtype=tf.float32)n # Extract mean.n img -= img_meann if input_size is not None:n h, w = input_sizenn if random_scale:n img, label = image_scaling(img, label)nn if random_mirror:n img, label = image_mirroring(img, label)n n img, label = random_crop_and_pad_image_and_labels(img, label, h, w, ignore_label)n

Filenames, shuffling, and epoch limits

For the list of filenames, use either a constant string Tensor (like ["file0", "file1"] or [("file%d" % i) for i in range(2)]) or the tf.train.match_filenames_once function.

Pass the list of filenames to the tf.train.string_input_producer function. string_input_producer creates a FIFO queue for holding the filenames until the reader needs them. 【tf.train.slice_input_producer()應該也是「creates a FIFO queue」的。】

self.queue = tf.train.slice_input_producer([self.images, self.labels],n shuffle=input_size is not None)n

string_input_producer has options for shuffling and setting a maximum number of epochs. 【tf.train.slice_input_producer()應該也是】

A queue runner adds the whole list of filenames to the queue once for each epoch,【類似於enqueue_many(),每個epoch就enqueue_many()全部的list】 shuffling the filenames within an epoch if shuffle=True. 【shuffle】This procedure provides a uniform sampling of files, so that examples are not under- or over- sampled relative to each other.【統一的採樣】

The queue runner works in a thread separate from the reader that pulls filenames from the queue【應該就是負責計算的線程】, so the shuffling and enqueuing process does not block the reader.

File formats

Select the reader that matches your input file format and pass the filename queue to the readers read method. The read method outputs a key identifying the file and record (useful for debugging if you have some weird records), and a scalar string value. Use one (or more) of the decoder and conversion ops to decode this string into the tensors that make up an example.【講的是文件的reader】

CSV files

To read text files in comma-separated value (CSV) format, use a tf.TextLineReader with the tf.decode_csvoperation. For example:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"]) #在圖上添加一個Queue

reader = tf.TextLineReader()

key, value = reader.read(filename_queue) #Reader

# Default values, in case of empty columns. Also specifies the type of the

# decoded result.

record_defaults = [[1], [1], [1], [1], [1]]

col1, col2, col3, col4, col5 = tf.decode_csv(

value, record_defaults=record_defaults)

features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:

# Start populating the filename queue.

coord = tf.train.Coordinator()

threads = tf.train.start_queue_runners(coord=coord)

for i in range(1200):

# Retrieve a single instance:

example, label = sess.run([features, col5])

coord.request_stop()

coord.join(threads)

Each execution of read reads a single line from the file. The decode_csv op then parses the result into a list of tensors. The record_defaults argument determines the type of the resulting tensors and sets the default value to use if a value is missing in the input string.

You must call tf.train.start_queue_runners to populate the queue 【填充隊列】 before you call run or eval to execute the read【這裡的read是圖上的一個op】. Otherwise read will block 【遇到的「卡住」問題有可能是這個原因引起的】 while it waits for filenames from the queue.

Fixed length records

To read binary files in which each record is a fixed number of bytes, use tf.FixedLengthRecordReader with the tf.decode_raw operation. The decode_raw op converts from a string to a uint8 tensor.

For example, the CIFAR-10 dataset uses a file format where each record is represented using a fixed number of bytes: 1 byte for the label followed by 3072 bytes of image data. Once you have a uint8 tensor, standard operations can slice out each piece and reformat as needed. For CIFAR-10, you can see how to do the reading and decoding intensorflow_models/tutorials/image/cifar10/cifar10_input.py and described in this tutorial.

【這個不用著急,一般數據集都會提供,或者能找得到。】

Standard TensorFlow format

Another approach is to convert whatever data you have into a supported format. This approach makes it easier to mix and match data sets and network architectures. The recommended format for TensorFlow is a TFRecords file containingtf.train.Example protocol buffers (which contain Features as a field). You write a little program that gets your data, stuffs it in an Example protocol buffer, serializes the protocol buffer to a string, and then writes the string to a TFRecords file using the tf.python_io.TFRecordWriter. For example,tensorflow/examples/how_tos/reading_data/convert_to_records.py converts MNIST data to this format.

To read a file of TFRecords, use tf.TFRecordReader with the tf.parse_single_example decoder. The parse_single_example op decodes the example protocol buffers into tensors. An MNIST example using the data produced by convert_to_records can be found intensorflow/examples/how_tos/reading_data/fully_connected_reader.py, which you can compare with the fully_connected_feed version.

【TFRecord,一種TensorFlow的標準文件結構】

Preprocessing

You can then do any preprocessing of these examples you want. This would be any processing that doesnt depend on trainable parameters. Examples include normalization of your data, picking a random slice, adding noise or distortions, etc. See tensorflow_models/tutorials/image/cifar10/cifar10_input.py for an example.

【(1)可以看看example;(2)以下代碼就是preprocessing】

img_r, img_g, img_b = tf.split(axis=2, num_or_size_splits=3, value=img)n img = tf.cast(tf.concat(axis=2, values=[img_b, img_g, img_r]), dtype=tf.float32)n # Extract mean.n img -= img_meann # img = tf.nn label = tf.image.decode_png(label_contents, channels=1)nn if input_size is not None:n h, w = input_sizenn if random_scale:n img, label = image_scaling(img, label)nn if random_mirror:n img, label = image_mirroring(img, label)n n img, label = random_crop_and_pad_image_and_labels(img, label, h, w, ignore_label)n

Batching

At the end of the pipeline we use another queue to batch together examples for training, evaluation, or inference. For this we use a queue that randomizes the order of examples, using the tf.train.shuffle_batch.

Example:

def read_my_file_format(filename_queue):

reader = tf.SomeReader()

key, record_string = reader.read(filename_queue)

example, label = tf.some_decoder(record_string)

processed_example = some_processing(example)

return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):

filename_queue = tf.train.string_input_producer(

filenames, num_epochs=num_epochs, shuffle=True)#第一個Queue

example, label = read_my_file_format(filename_queue)

# min_after_dequeue defines how big a buffer we will randomly sample

# from -- bigger means better shuffling but slower start up and more

# memory used.

# capacity must be larger than min_after_dequeue and the amount larger

# determines the maximum we will prefetch. Recommendation:

# min_after_dequeue + (num_threads + a small safety margin) * batch_size

min_after_dequeue = 10000

capacity = min_after_dequeue + 3 * batch_size

example_batch, label_batch = tf.train.shuffle_batch(

[example, label], batch_size=batch_size, capacity=capacity,

min_after_dequeue=min_after_dequeue) #第二個Queue

return example_batch, label_batch

If you need more parallelism or shuffling of examples between files, use multiple reader instances using thetf.train.shuffle_batch_join. For example:

def read_my_file_format(filename_queue):

# Same as above

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):

filename_queue = tf.train.string_input_producer(

filenames, num_epochs=num_epochs, shuffle=True)

example_list = [read_my_file_format(filename_queue)

for _ in range(read_threads)]

min_after_dequeue = 10000

capacity = min_after_dequeue + 3 * batch_size

example_batch, label_batch = tf.train.shuffle_batch_join(

example_list, batch_size=batch_size, capacity=capacity,

min_after_dequeue=min_after_dequeue)

return example_batch, label_batch

You still only use a single filename queue 【就是第一個Queue,即「filename_queue = tf.train.string_input_producer()」】 that is shared by all the readers. That way we ensure 【來自TensorFlow官方的保證...】that the different readers use different files from the same epoch until all the files from the epoch have been started. (It is also usually sufficient to have a single thread filling the filename queue.)

An alternative is to use a single reader via the tf.train.shuffle_batch with num_threads bigger than 1. This will make it read from a single file at the same time (but faster than with 1 thread), instead of N files at once. This can be important:

  • If you have more reading threads than input files, to avoid the risk that you will have two threads reading the same example from the same file near each other.
  • Or if reading N files in parallel causes too many disk seeks.

How many threads do you need? the tf.train.shuffle_batch* functions add a summary to the graph that indicates how full the example queue is. If you have enough reading threads, that summary will stay above zero. You can view your summaries as training progresses using TensorBoard.【有一個summary可以顯示threads設置的是否適當。】

Creating threads to prefetch using QueueRunner objects

The short version: many of the tf.train functions listed above add tf.train.QueueRunner objects to your graph. These require that you call tf.train.start_queue_runners before running any training or inference steps, or it will hang forever. This will start threads that run the input pipeline, filling the example queue so that the dequeue to get the examples will succeed. This is best combined with a tf.train.Coordinator to cleanly shut down these threads when there are errors. If you set a limit on the number of epochs, that will use an epoch counter that will need to be initialized. The recommended code pattern combining these is:

# Create the graph, etc.

init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.

sess = tf.Session()

# Initialize the variables (like the epoch counter).

sess.run(init_op)

# Start input enqueue threads.

coord = tf.train.Coordinator()

threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:

while not coord.should_stop():

# Run training steps or whatever

sess.run(train_op)

except tf.errors.OutOfRangeError:

print(Done training -- epoch limit reached)

finally:

# When done, ask the threads to stop.

coord.request_stop()

# Wait for threads to finish.

coord.join(threads)

sess.close()

Aside: What is happening here?

First we create the graph. It will have a few pipeline stages that are connected by queues. The first stage will generate filenames to read and enqueue them in the filename queue. The second stage consumes filenames (using a Reader), produces examples, and enqueues them in an example queue. Depending on how you have set things up, you may actually have a few independent copies of the second stage, so that you can read from multiple files in parallel. 【創建兩個Queue,一個是filename_queue,一個是example_queue】

At the end of these stages is an enqueue operation, which enqueues into a queue that the next stage dequeues from. We want to start threads running these enqueuing operations, so that our training loop can dequeue examples from the example queue.【創建完兩個Queue,添加一個enqueue的op,用類似tf.train.start_queue_runners()開一個線程,進行enqueue。】

The helpers in tf.train that create these queues and enqueuing operations add a tf.train.QueueRunner to the graph using the tf.train.add_queue_runner function. Each QueueRunner is responsible for one stage【或者說,一個QueueRunner負責一個Queue】, and holds the list of enqueue operations that need to be run in threads【保存了enqueue operations的列表】. Once the graph is constructed, thetf.train.start_queue_runners function asks each QueueRunner in the graph to start its threads running the enqueuing operations.

If all goes well, you can now run your training steps and the queues will be filled by the background threads. If you have set an epoch limit, at some point an attempt to dequeue examples will get an tf.errors.OutOfRangeError. This is the TensorFlow equivalent of "end of file" (EOF) -- this means the epoch limit has been reached and no more examples are available.

The last ingredient is the tf.train.Coordinator. This is responsible for letting all the threads know if anything has signaled a shut down. 【管理線程的關閉】Most commonly this would be because an exception was raised, for example one of the threads got an error when running some operation (or an ordinary Python exception).

Aside: How clean shut-down when limiting epochs works

Imagine you have a model that has set a limit on the number of epochs to train on. That means that the thread generating filenames will only run that many times before generating an OutOfRange error. The QueueRunner will catch that error, close the filename queue, and exit the thread. Closing the queue does two things:

  • Any future attempt to enqueue in the filename queue will generate an error. 【不能enqueue filename_queue】At this point there shouldnt be any threads trying to do that, but this is helpful when queues are closed due to other errors.
  • Any current or future dequeue will either succeed (if there are enough elements left) or fail (with an OutOfRangeerror) immediately.【succeed or error】 They wont block waiting for more elements to be enqueued,【因為不會再有enqueue了】 since by the previous point that cant happen.

The point is that when the filename queue is closed, there will likely still be many filenames in that queue,【filename_queue剩餘的element】 so the next stage of the pipeline (with the reader and other preprocessing) may continue running for some time.【接著運行一段時間】 Once the filename queue is exhausted, though, the next attempt to dequeue a filename (e.g. from a reader that has finished with the file it was working on) will trigger an OutOfRange error. In this case, though, you might have multiple threads associated with a single QueueRunner. If this isnt the last thread in the QueueRunner, 【QueueRunner裡面有一堆thread】 the OutOfRange error just causes the one thread to exit. 【只讓一個thread結束】This allows the other threads, which are still finishing up their last file, to proceed until they finish as well.【其他的thread可以接著完成工作】 (Assuming you are using a tf.train.Coordinator, other types of errors will cause all the threads to stop.) Once all the reader threads hit the OutOfRange error, only then does the next queue, the example queue, gets closed.

Again, the example queue will have some elements queued, so training will continue until those are exhausted. If the example queue is a tf.RandomShuffleQueue, say because you are using shuffle_batch or shuffle_batch_join, it normally will avoid ever having fewer than its min_after_dequeue attr elements buffered. However, once the queue is closed that restriction will be lifted and the queue will eventually empty. At that point the actual training threads, when they try and dequeue from example queue, will start getting OutOfRange errors and exiting. Once all the training threads are done, tf.train.Coordinator.join will return and you can exit cleanly.

Filtering records or producing multiple examples per record

Instead of examples with shapes [x, y, z], you will produce a batch of examples with shape [batch, x, y, z]. The batch size can be 0 if you want to filter this record out (maybe it is in a hold-out set?), or bigger than 1 if you are producing multiple examples per record. 【batch SGD】Then simply set enqueue_many=True when calling one of the batching functions (such as shuffle_batch or shuffle_batch_join).

Sparse input data

SparseTensors dont play well with queues. If you use SparseTensors you have to decode the string records usingtf.parse_example after batching (instead of using tf.parse_single_example before batching). SparseTensors在batching之後decode.

參考資料:

tensorflow學習筆記(四十二):輸入流水線 - CSDN博客

tensorflow中關於隊列使用的實驗 - CSDN博客

tensorflow.org/api_docs


推薦閱讀:

如何看待Face++出品的小型化網路ShuffleNet?
anaconda安裝tensorflow,在import tensorflow時報錯,要怎麼解決?
學習筆記TF037:實現強化學習策略網路
【博客存檔】TensorFlow之深入理解Neural Style

TAG:TensorFlow |