11
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

学習データをTFRecordにした話 : TensorFlow将棋ソフト開発日誌 #12

Posted at

前回 マルチGPUを導入した話とseparable_conv2dの話など : TensorFlow将棋ソフト開発日誌 #11
目次 TensorFlow将棋ソフト開発日誌 目次
ソースはgithubにあります(俺が読めればいいというレベル)

なうりぶーてぃんぐ

目次

  • 最近何をしていたか
  • TFRecordを使用する理由
  • 非同期読み込みにこだわる理由
  • TFRecord関係の参考ページ
  • TFRecordを使用する際の注意点(ここだけ読めば良い)
  • 出力 : データをTFRecordに変換する
  • 入力 : TFRecordをTFRecordReaderでtf.Tensorにする
  • 参考 : 自前のファイル形式と自前の非同期読み込みのコード

最近何をしていたか

  • 年初あたりにコンピュータ将棋にPFNが参入というニュースが出て非常に非常にモチベーションが下がりました。勝てるわけねえじゃん。
  • TensorFlow将棋に関してはなんというか「続けるかー?でもなんか出来ても後追いみたいに見られるしー?俺のほうが早くはじめたのにー。しかし他に面白いネタもあんまないしー」というモヤモヤがしばらく続いていました。
  • GPUをGTX 1080 x2からGTX 1080 ti x2にしました。
  • DCGANやVAEをやってみて気分転換をしていました。CelebAデータセットでやってみて「ほうほう面白いねー」みたいな。DCGANでアドホックにDやGに制限を入れる研究をしてみたり。
  • で、ようやくやる気も戻ってきたので最近の進捗です。

TFRecord関係の参考ページ

TFRecordを使用する理由

今までは棋譜の学習において以下のようにデータを扱っていました。

  1. SFEN形式(参考)の盤面と追加データ(勝敗結果や終了手数)を1行のレコードとしたファイルを入力データとする
  2. 学習時に複数のスレッドでファイル読み込み、TensorFlowのTensor化を行う
  3. TensorFlowのQueueを使用して非同期読み込みを実現する

2と3のコードが煩雑になること、CPUリソースを相当に消費することからTFRecord化することにしました。学習データは動的に変化しないので読み込みに最適な形にするべきです。学習データをTFRecord形式にしておけばTensorFlow標準のTFRecordReaderで画像などと同じような感覚で非同期読み込みを行うことができ、自分でスレッドやGCを管理する必要がなくなります。

非同期読み込みにこだわる理由

ここで言う非同期読み込みっていうのはTensorFlowのQueueやBatchを使用することです。なぜこれが必要かというと学習時の計算効率が理由です。

非同期読み込みをしない場合、つまりtf.Placeholderを使用して学習データをグラフに渡す場合は以下のようなシーケンスになります。

# C : CPU Reading Data, G : GPU Learning
processing time ->
|----C----|----G----|----C----|----G----|----C----|----G----| ...

CPUでデータを読んでGPUで計算しての繰り返しですがCPUでデータを読んでいる間はGPUは寝ています。非効率です。「TensorFlowでGPUを使い始めたけれどnvidia-smiでGPU稼働率が上がらないよー」というのは大体CPUに処理の比重が寄っているからです。

ちなみにですが画像の読み込みやデータオーグメンテーションについても「サイズが決まっているならそのサイズにリサイズしたファイルを用意しておく」「予め変形したものをファイルに落としておく」などのチューニングの余地があります。ファイルサイズやデータ数にもよりますが「CPUばっか動いてない?」と感じたらチューニングを検討しましょう。

TensorFlowの非同期読み込みを使うと以下の様になります。

# C : CPU Reading Data, G : GPU Learning
processing time ->
|----C----|----C----|----C----|----C----|----C----|----C----| ...
|----G----|----G----|----G----|----G----|----G----|----G----| ...

GPUが計算している間にCPUが別スレッドで同時にデータを読み込みます。GPUの計算が1ステップ終わるとCPUがすでに読み込んだデータをGPUに渡すだけでGPUが次の計算をはじめられます。

「tf.Placeholderを使わずに非同期読み込みをする」などのパフォーマンス向上に関してはTensorFlow公式ドキュメントのPerformance Guideが参考になります。

TFRecordを使用する際の注意点(ここだけ読めば良い)

以下のことに気づかずに結構ハマりました。ただこれを気をつければ参考ページを読むだけでいけると思います。

  • TFRecordWriterで圧縮オプションを指定したらTFRecordReaderでも同様のオプションを付ける。
  • numpy.arrayなどをTFRecordに保存する場合は数値の型に気をつける。

2つ目の数値の型について。書き出したnumpy.arrayがfloat64だとTensorFlowでもfloat64で読み込まないといけません。そうでないとエラーになります。大抵の場合はfloat32で学習処理をしたいでしょうからtf.castでキャストしてやらないといけません。無駄です。またfloat64でファイルに書きだせばそれだけファイルが大きくなります。これも無駄です。しかしながらnumpy.arrayを型を指定せずに作るとfloat64がデフォルトになります。この点を忘れると割と面倒なバグや非効率が起きるので気をつけてください。

出力 : データをTFRecordに変換する

先に述べた形の棋譜データファイルをTFRecordsに変換するスクリプトです。データ変換とファイル処理だけなのでTensorFlow的なセッションやらグラフやらは出てきません。要点はコメントに入れておきます。

from argparse import ArgumentParser
from pathlib import Path
import tensorflow as tf
pio = tf.python_io

import ginkgo.shogi_records as sr
import ginkgo.numpy_shogi as ns

def create_argument_parser():
    parser = ArgumentParser()
    parser.add_argument('-i', '--input', type=str, default=None)
    parser.add_argument('-o', '--output-dir', type=str, default='./')
    return parser

# 整数をレコードに入れるための変換
def feature_int64(a):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[a]))

# バイト列をレコードに入れるための変換、Numpy arrayもこれを使用する
def feature_bytes(a):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[a]))

def main():
    parser = create_argument_parser()
    args = parser.parse_args()
    proc(args)

def proc(args):
    input_path = Path(args.input)
    convert_file(args, input_path)

def convert_file(args, file_path):
    # 自前データ形式のファイルからレコード(棋譜の各盤面)を読み込む
    records = list(map(sr.to_data, sr.load_file(file_path)))
    output_file = Path(args.output_dir) / (file_path.stem + '.tfrecords')

    # TFRecordを書き出すためのTFRecordWriterを作成する
    # Numpy arrayのようなデータを書き出す場合はファイルサイズが大きくなりがちなので
    # TFRecordCompressionType.GZIP オプションを指定しておく
    writer = pio.TFRecordWriter(
        str(output_file), options=pio.TFRecordOptions(pio.TFRecordCompressionType.GZIP))

    for r in records:
        # record tuple
        # 0 .. sfen
        # 1 .. side (first black, second white)
        # 2 .. turn number
        # 3 .. total turn number
        # 4 .. next movement
        # 5 .. winner side


        # vecが盤面を特徴テンサーに落とし込んだ numpy.array
        # 他は付随データのスカラー
        vec = ns.sfen_to_vector(r[0], usi=r[4])

        if r[1] == 'w':
            vec = ns.player_inverse(vec)

        # 1レコード分のデータを持つExampleオブジェクトを作成する
        record = tf.train.Example(features=tf.train.Features(feature={
            'vec': feature_bytes(vec.tostring()),
            'turn_number' : feature_int64(int(r[2])),
            'total_turn_number' : feature_int64(int(r[3])),
            'side' : feature_int64(0 if r[1] == 'b' else 1),
            'winner_side' : feature_int64(0 if r[5] == 'b' else 1),
            }))
        # ExampleオブジェクトをシリアライズしてTFRecordWriterで書き出す
        writer.write(record.SerializeToString())

    writer.close()

if __name__ == '__main__':
    main()

入力 : TFRecordをTFRecordReaderでtf.Tensorにする

以下は読み込みオペレーションを作成する関数です。チュートリアルなどで見かける画像読み込みとほとんど変わりません。本記事末尾につけた今までのコードと比べると全然すっきりしています。

def train_read_op(filename_queue):
    # TFRecordReader の作成。書き出し時にGZIPオプションを指定したので読み込み時にも指定する。
    # 指定し忘れると原因がわかりにくいメッセージでエラーになるのでつらい。
    reader_option = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.GZIP)
    reader = tf.TFRecordReader(options=reader_option)

    _, serialized_example = reader.read(filename_queue)

    # readするとシリアライズされた形で出力されるのでパースしてやる
    features = tf.parse_single_example(serialized_example,
        features={
            'vec': tf.FixedLenFeature([], tf.string),
            'turn_number': tf.FixedLenFeature([], tf.int64),
            'total_turn_number': tf.FixedLenFeature([], tf.int64),
            'side': tf.FixedLenFeature([], tf.int64),
            'winner_side': tf.FixedLenFeature([], tf.int64),
            })

    vec = tf.decode_raw(features['vec'], tf.float64)

    # 書き出し時にnumpy.arrayをfloat32に指定しなかったためキャストする。
    # キャストをし忘れるとエラーになる。
    # 本来なら書き出し側でfloat32にするべき。
    vec = tf.cast(tf.reshape(vec, [9,9,360]), tf.float32)

    s = tf.cast(features['side'], tf.float32)
    w = tf.cast(features['winner_side'], tf.float32)

    label = tf.cond( tf.equal(s,w),
        lambda: tf.cast(np.array([1.0, 0.0]), tf.float32) ,
        lambda: tf.cast(np.array([0.0, 1.0]), tf.float32) )

    return vec, label

学習スクリプトで入力を作る部分です(GPUの数だけ作っています)。 tf.train.shuffle_batch を使用していますが完全に画像読み込みと同じ形のコードで書けます。

            input_batches = [tf.train.shuffle_batch(
                shogi_tfrecords.train_read_op(filename_queue),
                ns.minibatch_size, 4000, 1000,
                num_threads=ns.num_read_threads) for _ in range(ns.num_gpus)]

参考 : 自前のファイル形式と自前の非同期読み込みのコード

参考にPythonのプロセスプールで非同期複数プロセスで棋譜を読み込みTensorFlowのQueueにEnqueueする部分を載せておきます。スレッドプール作ってー、ループを書いてー、不要なオブジェクトをdelしてー、と、あまりメンテナンスしたくない感じのソースになります。

import ginkgo.numpy_shogi as numpy_shogi
import ginkgo.shogi_records as sr

import tensorflow as tf
import numpy as np

import threading
from threading import Thread
import queue
from queue import Queue
import math
from concurrent.futures import ProcessPoolExecutor as Executor
import gc

def record_to_vec(r):
    sfen, side, turn, total, move, winner = r

    board_vec = numpy_shogi.sfen_to_vector(sfen, usi=move)

    if side == 'w':
        board_vec = numpy_shogi.player_inverse(board_vec)
    
    match_vec = np.array([
        1.0 if side == winner else 0.0,
        1.0 if side != winner else 0.0])

    weight_vec = np.array([math.sqrt(float(turn)/float(total))])

    return (np.squeeze(board_vec, axis=0), match_vec, weight_vec)

def map_func(r):
    sfen, side, turn, total, move, winner = r = sr.to_data(r)
    if int(turn) < 30: return None

    #if side != 'b': return None

    board_vec, label_vec, weight_vec = record_to_vec(r)

    return (
        np.expand_dims(board_vec,0),
        np.expand_dims(label_vec,0),
        np.expand_dims(weight_vec,0))

def flipdata(r):
    return (numpy_shogi.fliplr(r[0]), r[1], r[2])

def load_loop(coord, sess, enqueue_op, close_op,  path_q, pool, loop,
        input_vector_ph, label_ph, turn_weight_ph):

    while not coord.should_stop():
        try:
            path = path_q.get(timeout=10)

            records = sr.load_file(path)

            #sfen, side, turn, total, move, winner = r = sr.to_data(r)
            data_list = list(pool.map(map_func, records))
            data_list = list(filter(lambda x: x is not None, data_list))

            #data_list2 = list(pool.map(flipdata, data_list))
            #data_list.extend(data_list2)

            vecs = [list(t) for t in zip(*data_list)]
            vecs = list(map(np.concatenate, vecs))

            if len(vecs) != 3:
                print('some error occured in reading file. skip this file: {}'.format(path))
                continue

            sess.run(enqueue_op, feed_dict={
                input_vector_ph: vecs[0],
                label_ph: vecs[1],
                turn_weight_ph: vecs[2]})

            if loop:
                path_q.put(path)

            del data_list, vecs, records,
            gc.collect()

        except queue.Empty  as e:
            try:
                sess.run(close_op)
            except tf.errors.CancelledError:
                pass
            break
        except tf.errors.AbortedError as e:
            break
        except tf.errors.CancelledError as e:
            break

def load_sfenx_threads_and_queue(
        coord, sess, path_list, batch_size, loop=False, threads_num=1, queue_max=50000, queue_min=8000):

    input_vector_ph = tf.placeholder(tf.float32, [None,9,9,360])
    label_ph = tf.placeholder(tf.float32, [None,2])
    turn_weight_ph = tf.placeholder(tf.float32, [None,1])

    q = tf.RandomShuffleQueue(queue_max, queue_min,
        [tf.float32, tf.float32, tf.float32], [[9,9,360], [2], [1]])
    enqueue_op = q.enqueue_many(
        [input_vector_ph, label_ph, turn_weight_ph])
    close_op = q.close()
    path_q = Queue()

    for p in path_list:
        path_q.put(p)

    pool = Executor(max_workers=threads_num+2)

    threads = [Thread(target=load_loop,
        args=(coord, sess, enqueue_op, close_op, path_q, pool, loop,
            input_vector_ph, label_ph, turn_weight_ph))
        for i in range(threads_num)]

    tf.summary.scalar('shogi_loader/size', q.size())

#    input_batch, label_batch, turn_weight_batch = q.dequeue_many(batch_size)
#    return threads, input_batch, label_batch, turn_weight_batch
    return threads, q
11
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?