TensorFlow 公式ドキュメントの Reading data を参考に、Queue を利用したファイルからの入力を試します。
TensorFlow のバージョンは 0.11.0rc2 です。
訓練データのファイルからの入力
Reading data ではデータの入力方法として、feed を用いる方法、tf.constant
として定義する方法と、ファイルからの入力 として Queue を用いて pipeline を構築する方法が紹介されています。
このうち tf.constant
はメモリにのりきる小規模なデータ用で、feed による方法も1回の tf.Session.run() で渡すデータ量には同様の制限があると思います。feed による方法は同じ Session で複数回に分けて feed を渡すことでメモリに乗り切らない訓練データを渡してループすることも可能ではないかと思いますが、効率や Distributed TensorFlow の対応も考慮に入れると Queue を用いる方法が大規模なデータの入力方法としてもっとも推奨される方法ではないかと考えます。
今回はサンプルとして XOR 回路を2層の隠れ層を持つ NN の学習を入力を CSV ファイルにしてみました。もちろんこのデータ量であれば feed や constant を用いて充分学習可能なのですが、そこは目をつむってわざわざ CSV ファイルから入力します。
input.csv
0,0,0
0,1,1
1,0,1
1,1,0
train.py
import tensorflow as tf
import tensorflow.contrib.slim as slim
# ファイル名の Queue を作成
filename_queue = tf.train.string_input_producer(["input.csv"])
# CSV を parse
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
input1, input2, output = tf.decode_csv(value, record_defaults=[[0.], [0.], [0.]])
inputs = tf.pack([input1, input2])
output = tf.pack([output])
# 4サンプル毎の Batch 化
inputs_batch, output_batch = tf.train.shuffle_batch([inputs, output], 4, capacity=40, min_after_dequeue=4)
## NN のグラフ生成
hiddens = slim.stack(inputs_batch, slim.fully_connected, [2, 2], activation_fn=tf.nn.sigmoid, scope="hidden")
prediction = slim.fully_connected(hiddens, 1, activation_fn=tf.nn.sigmoid, scope="output")
loss = tf.contrib.losses.mean_squared_error(prediction, output_batch)
train_op = slim.learning.create_train_op(loss, tf.train.AdamOptimizer(0.01))
init_op = tf.initialize_all_variables()
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
sess.run(init_op)
for i in range(2000):
_, t_loss = sess.run([train_op, loss])
if (i+1) % 100 == 0:
print("step %d: loss=%f" % (i+1, t_loss))
finally:
coord.request_stop()
coord.join(threads)
実行結果
% python train.py
WARNING:tensorflow:sum_of_squares (from tensorflow.contrib.losses.python.losses.loss_ops) is deprecated and will be removed after 2016-10-01.
Instructions for updating:
Use mean_squared_error.
step 100: loss=0.249255
step 200: loss=0.250287
step 300: loss=0.250001
step 400: loss=0.249251
step 500: loss=0.251624
step 600: loss=0.249414
step 700: loss=0.247297
step 800: loss=0.251101
step 900: loss=0.250473
step 1000: loss=0.249949
step 1100: loss=0.249301
step 1200: loss=0.247786
step 1300: loss=0.226513
step 1400: loss=0.214449
step 1500: loss=0.216028
step 1600: loss=0.132697
step 1700: loss=0.055622
step 1800: loss=0.037558
step 1900: loss=0.018404
step 2000: loss=0.015011
動作しました。
loss 関数として使っている tf.contrib.losses.sum_of_squares()
が deprecated なのでかわりに mean_squared_error
を使え、と警告メッセージが出ていますが、使っているのは tf.contrib.losses.mean_squared_error()
なので、一体これはどうすれば…。
追記
ソースコードを調べてみました。
2016-11-05 現在の r0.11
ブランチをみると、mean_squared_error
はたんに sum_of_squares
の別名にされていて、さらに sum_of_squares
には @deprecated
デコレータでマークされているため、たとえ指示通り mean_squared_error
を利用していても警告が出てしまうようです。
https://github.com/tensorflow/tensorflow/blob/r0.11/tensorflow/contrib/losses/python/losses/loss_ops.py#L487-L524
しかし master
ブランチをみると既に sum_of_squares
は削除されていて、mean_squared_error
に関数名が変更されていました。
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/losses/python/losses/loss_ops.py#L558-L600
この変更は https://github.com/tensorflow/tensorflow/commit/48da07d60c20f47868eb1fdc752d42b74cce95f8 で入っています。要するに関数の名前を変更したかったので 0.11 では古い名前を deprecated にして代替を示していたけど、手抜きをして新しい名前も同じ関数を代入していたので警告が出てしまっている状態で、次のバージョンでは出なくなるはずということで、当面は mean_squared_error
を利用すれば問題ないといえそうです.
tf.train.string_input_producer()
に placeholder を渡す場合(うまくいかない)
さきほどの例では tf.train.string_input_producer()
の第1引数に ["input.csv"]
と Python のリストをそのまま渡していました。ドキュメントによれば正確には第1引数は一次元の tf.string の Tensor を受けとるようになっていて、これは内部で https://github.com/tensorflow/tensorflow/blob/r0.11/tensorflow/python/training/input.py#L125 の部分で tf.convert_to_tensor
で Tensor に変換されています。
ということは、コマンドライン引数でファイル名を受け取り、それを feed_dict 経由で placeholder に入れたものを指定することも可能に思えます。こうすればファイル名を可変にできます。
そこで train.py
の input_string_producer()
の部分を以下のようにしてみます。
input_files = tf.placeholder(tf.string, shape=(None,))
filename_queue = tf.train.string_input_producer(input_files)
そして sess.run()
に feed_dict={input_files: ["input.csv"]}
と渡して実行してみましたが、以下のようなエラーになってしまいました。
tensorflow.python.framework.errors.OutOfRangeError: RandomShuffleQueue '_1_shuffle_batch/random_shuffle_queue' is closed and has insufficient elements (requested 4, current size 0)
[[Node: shuffle_batch = QueueDequeueMany[_class=["loc:@shuffle_batch/random_shuffle_queue"], component_types=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](shuffle_batch/random_shuffle_queue, shuffle_batch/n)]]
どうも入力ファイル数が 0 になってしまって batch 化の部分で tf.errors.OutOfRangeError が発生してしまうようです。
epoch 数を限定する場合(追記を参照)
さきほどの例では訓練データを入力する前提としていましたので、tf.train.string_input_producer()
で num_epochs
は指定せず無限に同じファイルを読み続けるようにしていました。
検証用のデータも Queue を使った同様の方法で読み込むことを考えると、1 epoch 読み込むだけでよく、また shuffle も不要になります。
この場合は tf.train.string_input_producer()
に num_epochs=1
を指定して、Batch 化も tf.train.batch()
を使い allow_smaller_final_batch=True
を指定することでバッチ数の余りのデータも読み込めるようにすればいいように思えます。
以下のようなコードで試してみます。とりあえず batch 化した Tensor の内容を表示するだけの内容です。
import tensorflow as tf
filename_queue = tf.train.string_input_producer(["input.csv"], num_epochs=1)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
input1, input2, output = tf.decode_csv(value, record_defaults=[[0], [0], [0]])
inputs = tf.pack([input1, input2])
output = tf.pack([output])
inputs_batch, output_batch = tf.train.batch([inputs, output], 4, allow_smaller_final_batch=True)
init_op = tf.initialize_all_variables()
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(init_op)
try:
i, o = sess.run([inputs_batch, output_batch])
print([i, o])
except tf.errors.OutOfRangeError:
print("out of range.")
finally:
coord.request_stop()
coord.join(threads)
しかしこれを実行すると、以下のように "out of range." が表示されます。バッチサイズは 4 にしているので、1回は評価できて2回目に OutOfRangeError が発生することを期待していたのですが、いきなり発生してしまいます。しかもその後 coord.join(threads)
の部分で Attempting to use uninitialized value input_producer/limit_epochs/epochs
というエラーが発生します。
out of range.
FailedPreconditionError (see above for traceback): Attempting to use uninitialized value input_producer/limit_epochs/epochs
[[Node: input_producer/limit_epochs/CountUpTo = CountUpTo[T=DT_INT64, _class=["loc:@input_producer/limit_epochs/epochs"], limit=1, _device="/job:localhost/replica:0/task:0/cpu:0"](input_producer/limit_epochs/epochs)]]
Queue を使う入力でちょうど1回だけ読み込みたい(検証用)場合はどうすれば良いのでしょうか。
string_input_producer の num_epochs について追記
エラーメッセージから推測するに、string_input_producer の thread で input_producer/limit_epochs/epochs
という変数が未初期化なためにエラーが発生して停止してしまうため、1つもファイル名が Queue に送信されないまま止まってしまうので OutOfRangeError が発生して、その後 coord.join(threads)
で string_input_producer の thread の例外がメインスレッドで再発生しているということではないかと考えられます。 string_input_producer() の num_epochs 引数を渡した時の処理に不具合があるのではないかと考えて検索してみた結果以下の issue を発見しました。
input producer の epoch 数を tf.trainging.Saver が保存するべきかどうか、というあたりで epoch 数を管理する変数を local variable (Python のローカル変数という意味ではなく TensorFlow の GraphKeys.LOCAL_VARIABLES という種類の collection の変数という意味みたいです)にしたために起きてしまった問題で、まだ現時点(2016/11/07)で master ブランチでも修正はされていないようです。
https://github.com/tensorflow/tensorflow/issues/1045#issuecomment-253712328 のコメントを読むと tf.contrib.learn.read_batch_features
のドキュメントでは num_epochs を指定する時には tf.initialize_all_variables()
とは別に tf.initialize_local_variables()
という op を作成してセッションの最初に実行しておかないといけないとあります。 initialize_all_variables()
は実際には全ての変数を初期化はしてくれないのですね。
この workaround を取り入れて、以下のように書き直したら期待したように動きました。
start_queue_runners()
よりも前に init_op を実行する必要があったのでそこも修正しています。
import tensorflow as tf
filename_queue = tf.train.string_input_producer(["input.csv"], num_epochs=1)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
input1, input2, output = tf.decode_csv(value, record_defaults=[[0], [0], [0]])
inputs = tf.pack([input1, input2])
output = tf.pack([output])
inputs_batch, output_batch = tf.train.batch([inputs, output], 4, allow_smaller_final_batch=True)
init_op = [tf.initialize_all_variables(), tf.initialize_local_variables()]
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run(init_op)
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
i, o = sess.run([inputs_batch, output_batch])
print([i, o])
i, o = sess.run([inputs_batch, output_batch])
print([i, o])
except tf.errors.OutOfRangeError:
print("out of range.")
finally:
coord.request_stop()
coord.join(threads)
実行結果
[array([[0, 0],
[0, 1],
[1, 0],
[1, 1]], dtype=int32), array([[0],
[1],
[1],
[0]], dtype=int32)]
out of range.
TODO
-
→ Donemean_squared_error()
の deprecated 警告について調査 -
tf.train.string_input_producer()
の入力に placeholder を使う場合の調査 -
ファイルからの入力の epoch 数を限定する時の調査→ Done