前回 マルチGPUを導入した話とseparable_conv2dの話など : TensorFlow将棋ソフト開発日誌 #11
目次 TensorFlow将棋ソフト開発日誌 目次
ソースはgithubにあります(俺が読めればいいというレベル)
なうりぶーてぃんぐ
目次
- 最近何をしていたか
- TFRecordを使用する理由
- 非同期読み込みにこだわる理由
- TFRecord関係の参考ページ
- TFRecordを使用する際の注意点(ここだけ読めば良い)
- 出力 : データをTFRecordに変換する
- 入力 : TFRecordをTFRecordReaderでtf.Tensorにする
- 参考 : 自前のファイル形式と自前の非同期読み込みのコード
最近何をしていたか
- 年初あたりにコンピュータ将棋にPFNが参入というニュースが出て非常に非常にモチベーションが下がりました。勝てるわけねえじゃん。
- TensorFlow将棋に関してはなんというか「続けるかー?でもなんか出来ても後追いみたいに見られるしー?俺のほうが早くはじめたのにー。しかし他に面白いネタもあんまないしー」というモヤモヤがしばらく続いていました。
- GPUをGTX 1080 x2からGTX 1080 ti x2にしました。
- DCGANやVAEをやってみて気分転換をしていました。CelebAデータセットでやってみて「ほうほう面白いねー」みたいな。DCGANでアドホックにDやGに制限を入れる研究をしてみたり。
- で、ようやくやる気も戻ってきたので最近の進捗です。
TFRecord関係の参考ページ
- Reading data : TensorFlow公式ドキュメント
- Tfrecords Guide : ほとんどここを参考にしました。
TFRecordを使用する理由
今までは棋譜の学習において以下のようにデータを扱っていました。
- SFEN形式(参考)の盤面と追加データ(勝敗結果や終了手数)を1行のレコードとしたファイルを入力データとする
- 学習時に複数のスレッドでファイル読み込み、TensorFlowのTensor化を行う
- TensorFlowのQueueを使用して非同期読み込みを実現する
2と3のコードが煩雑になること、CPUリソースを相当に消費することからTFRecord化することにしました。学習データは動的に変化しないので読み込みに最適な形にするべきです。学習データをTFRecord形式にしておけばTensorFlow標準のTFRecordReader
で画像などと同じような感覚で非同期読み込みを行うことができ、自分でスレッドやGCを管理する必要がなくなります。
非同期読み込みにこだわる理由
ここで言う非同期読み込みっていうのはTensorFlowのQueueやBatchを使用することです。なぜこれが必要かというと学習時の計算効率が理由です。
非同期読み込みをしない場合、つまりtf.Placeholderを使用して学習データをグラフに渡す場合は以下のようなシーケンスになります。
# C : CPU Reading Data, G : GPU Learning
processing time ->
|----C----|----G----|----C----|----G----|----C----|----G----| ...
CPUでデータを読んでGPUで計算しての繰り返しですがCPUでデータを読んでいる間はGPUは寝ています。非効率です。「TensorFlowでGPUを使い始めたけれどnvidia-smiでGPU稼働率が上がらないよー」というのは大体CPUに処理の比重が寄っているからです。
ちなみにですが画像の読み込みやデータオーグメンテーションについても「サイズが決まっているならそのサイズにリサイズしたファイルを用意しておく」「予め変形したものをファイルに落としておく」などのチューニングの余地があります。ファイルサイズやデータ数にもよりますが「CPUばっか動いてない?」と感じたらチューニングを検討しましょう。
TensorFlowの非同期読み込みを使うと以下の様になります。
# C : CPU Reading Data, G : GPU Learning
processing time ->
|----C----|----C----|----C----|----C----|----C----|----C----| ...
|----G----|----G----|----G----|----G----|----G----|----G----| ...
GPUが計算している間にCPUが別スレッドで同時にデータを読み込みます。GPUの計算が1ステップ終わるとCPUがすでに読み込んだデータをGPUに渡すだけでGPUが次の計算をはじめられます。
「tf.Placeholderを使わずに非同期読み込みをする」などのパフォーマンス向上に関してはTensorFlow公式ドキュメントのPerformance Guideが参考になります。
TFRecordを使用する際の注意点(ここだけ読めば良い)
以下のことに気づかずに結構ハマりました。ただこれを気をつければ参考ページを読むだけでいけると思います。
-
TFRecordWriter
で圧縮オプションを指定したらTFRecordReader
でも同様のオプションを付ける。 - numpy.arrayなどをTFRecordに保存する場合は数値の型に気をつける。
2つ目の数値の型について。書き出したnumpy.arrayがfloat64だとTensorFlowでもfloat64で読み込まないといけません。そうでないとエラーになります。大抵の場合はfloat32で学習処理をしたいでしょうからtf.cast
でキャストしてやらないといけません。無駄です。またfloat64でファイルに書きだせばそれだけファイルが大きくなります。これも無駄です。しかしながらnumpy.arrayを型を指定せずに作るとfloat64がデフォルトになります。この点を忘れると割と面倒なバグや非効率が起きるので気をつけてください。
出力 : データをTFRecordに変換する
先に述べた形の棋譜データファイルをTFRecordsに変換するスクリプトです。データ変換とファイル処理だけなのでTensorFlow的なセッションやらグラフやらは出てきません。要点はコメントに入れておきます。
from argparse import ArgumentParser
from pathlib import Path
import tensorflow as tf
pio = tf.python_io
import ginkgo.shogi_records as sr
import ginkgo.numpy_shogi as ns
def create_argument_parser():
parser = ArgumentParser()
parser.add_argument('-i', '--input', type=str, default=None)
parser.add_argument('-o', '--output-dir', type=str, default='./')
return parser
# 整数をレコードに入れるための変換
def feature_int64(a):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[a]))
# バイト列をレコードに入れるための変換、Numpy arrayもこれを使用する
def feature_bytes(a):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[a]))
def main():
parser = create_argument_parser()
args = parser.parse_args()
proc(args)
def proc(args):
input_path = Path(args.input)
convert_file(args, input_path)
def convert_file(args, file_path):
# 自前データ形式のファイルからレコード(棋譜の各盤面)を読み込む
records = list(map(sr.to_data, sr.load_file(file_path)))
output_file = Path(args.output_dir) / (file_path.stem + '.tfrecords')
# TFRecordを書き出すためのTFRecordWriterを作成する
# Numpy arrayのようなデータを書き出す場合はファイルサイズが大きくなりがちなので
# TFRecordCompressionType.GZIP オプションを指定しておく
writer = pio.TFRecordWriter(
str(output_file), options=pio.TFRecordOptions(pio.TFRecordCompressionType.GZIP))
for r in records:
# record tuple
# 0 .. sfen
# 1 .. side (first black, second white)
# 2 .. turn number
# 3 .. total turn number
# 4 .. next movement
# 5 .. winner side
# vecが盤面を特徴テンサーに落とし込んだ numpy.array
# 他は付随データのスカラー
vec = ns.sfen_to_vector(r[0], usi=r[4])
if r[1] == 'w':
vec = ns.player_inverse(vec)
# 1レコード分のデータを持つExampleオブジェクトを作成する
record = tf.train.Example(features=tf.train.Features(feature={
'vec': feature_bytes(vec.tostring()),
'turn_number' : feature_int64(int(r[2])),
'total_turn_number' : feature_int64(int(r[3])),
'side' : feature_int64(0 if r[1] == 'b' else 1),
'winner_side' : feature_int64(0 if r[5] == 'b' else 1),
}))
# ExampleオブジェクトをシリアライズしてTFRecordWriterで書き出す
writer.write(record.SerializeToString())
writer.close()
if __name__ == '__main__':
main()
入力 : TFRecordをTFRecordReaderでtf.Tensorにする
以下は読み込みオペレーションを作成する関数です。チュートリアルなどで見かける画像読み込みとほとんど変わりません。本記事末尾につけた今までのコードと比べると全然すっきりしています。
def train_read_op(filename_queue):
# TFRecordReader の作成。書き出し時にGZIPオプションを指定したので読み込み時にも指定する。
# 指定し忘れると原因がわかりにくいメッセージでエラーになるのでつらい。
reader_option = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.GZIP)
reader = tf.TFRecordReader(options=reader_option)
_, serialized_example = reader.read(filename_queue)
# readするとシリアライズされた形で出力されるのでパースしてやる
features = tf.parse_single_example(serialized_example,
features={
'vec': tf.FixedLenFeature([], tf.string),
'turn_number': tf.FixedLenFeature([], tf.int64),
'total_turn_number': tf.FixedLenFeature([], tf.int64),
'side': tf.FixedLenFeature([], tf.int64),
'winner_side': tf.FixedLenFeature([], tf.int64),
})
vec = tf.decode_raw(features['vec'], tf.float64)
# 書き出し時にnumpy.arrayをfloat32に指定しなかったためキャストする。
# キャストをし忘れるとエラーになる。
# 本来なら書き出し側でfloat32にするべき。
vec = tf.cast(tf.reshape(vec, [9,9,360]), tf.float32)
s = tf.cast(features['side'], tf.float32)
w = tf.cast(features['winner_side'], tf.float32)
label = tf.cond( tf.equal(s,w),
lambda: tf.cast(np.array([1.0, 0.0]), tf.float32) ,
lambda: tf.cast(np.array([0.0, 1.0]), tf.float32) )
return vec, label
学習スクリプトで入力を作る部分です(GPUの数だけ作っています)。 tf.train.shuffle_batch
を使用していますが完全に画像読み込みと同じ形のコードで書けます。
input_batches = [tf.train.shuffle_batch(
shogi_tfrecords.train_read_op(filename_queue),
ns.minibatch_size, 4000, 1000,
num_threads=ns.num_read_threads) for _ in range(ns.num_gpus)]
参考 : 自前のファイル形式と自前の非同期読み込みのコード
参考にPythonのプロセスプールで非同期複数プロセスで棋譜を読み込みTensorFlowのQueueにEnqueueする部分を載せておきます。スレッドプール作ってー、ループを書いてー、不要なオブジェクトをdelしてー、と、あまりメンテナンスしたくない感じのソースになります。
import ginkgo.numpy_shogi as numpy_shogi
import ginkgo.shogi_records as sr
import tensorflow as tf
import numpy as np
import threading
from threading import Thread
import queue
from queue import Queue
import math
from concurrent.futures import ProcessPoolExecutor as Executor
import gc
def record_to_vec(r):
sfen, side, turn, total, move, winner = r
board_vec = numpy_shogi.sfen_to_vector(sfen, usi=move)
if side == 'w':
board_vec = numpy_shogi.player_inverse(board_vec)
match_vec = np.array([
1.0 if side == winner else 0.0,
1.0 if side != winner else 0.0])
weight_vec = np.array([math.sqrt(float(turn)/float(total))])
return (np.squeeze(board_vec, axis=0), match_vec, weight_vec)
def map_func(r):
sfen, side, turn, total, move, winner = r = sr.to_data(r)
if int(turn) < 30: return None
#if side != 'b': return None
board_vec, label_vec, weight_vec = record_to_vec(r)
return (
np.expand_dims(board_vec,0),
np.expand_dims(label_vec,0),
np.expand_dims(weight_vec,0))
def flipdata(r):
return (numpy_shogi.fliplr(r[0]), r[1], r[2])
def load_loop(coord, sess, enqueue_op, close_op, path_q, pool, loop,
input_vector_ph, label_ph, turn_weight_ph):
while not coord.should_stop():
try:
path = path_q.get(timeout=10)
records = sr.load_file(path)
#sfen, side, turn, total, move, winner = r = sr.to_data(r)
data_list = list(pool.map(map_func, records))
data_list = list(filter(lambda x: x is not None, data_list))
#data_list2 = list(pool.map(flipdata, data_list))
#data_list.extend(data_list2)
vecs = [list(t) for t in zip(*data_list)]
vecs = list(map(np.concatenate, vecs))
if len(vecs) != 3:
print('some error occured in reading file. skip this file: {}'.format(path))
continue
sess.run(enqueue_op, feed_dict={
input_vector_ph: vecs[0],
label_ph: vecs[1],
turn_weight_ph: vecs[2]})
if loop:
path_q.put(path)
del data_list, vecs, records,
gc.collect()
except queue.Empty as e:
try:
sess.run(close_op)
except tf.errors.CancelledError:
pass
break
except tf.errors.AbortedError as e:
break
except tf.errors.CancelledError as e:
break
def load_sfenx_threads_and_queue(
coord, sess, path_list, batch_size, loop=False, threads_num=1, queue_max=50000, queue_min=8000):
input_vector_ph = tf.placeholder(tf.float32, [None,9,9,360])
label_ph = tf.placeholder(tf.float32, [None,2])
turn_weight_ph = tf.placeholder(tf.float32, [None,1])
q = tf.RandomShuffleQueue(queue_max, queue_min,
[tf.float32, tf.float32, tf.float32], [[9,9,360], [2], [1]])
enqueue_op = q.enqueue_many(
[input_vector_ph, label_ph, turn_weight_ph])
close_op = q.close()
path_q = Queue()
for p in path_list:
path_q.put(p)
pool = Executor(max_workers=threads_num+2)
threads = [Thread(target=load_loop,
args=(coord, sess, enqueue_op, close_op, path_q, pool, loop,
input_vector_ph, label_ph, turn_weight_ph))
for i in range(threads_num)]
tf.summary.scalar('shogi_loader/size', q.size())
# input_batch, label_batch, turn_weight_batch = q.dequeue_many(batch_size)
# return threads, input_batch, label_batch, turn_weight_batch
return threads, q