LoginSignup
39
30

More than 3 years have passed since last update.

TensorFlow & Keras で TFRecord & DataSetを使って大量のデータを学習させる方法

Last updated at Posted at 2019-04-30

(2020/03/12) TensorFlow 2.x対応版の記事を書きました。
[TensorFlow 2.x対応版] TensorFlow (Keras) で TFRecord & DataSetを使って大量のデータを学習させる方法 - Qiita

本記事の内容はTensorFlow 1.x版を想定していますので、最新の2.x版とは異なる点があります。ご注意ください。

はじめに

ディープラーニング技術が身近になって久しいですが、今からでも始めてみようという人も多いはず。
しかし、意外と取っ掛かりが大変なんですよね。ディープラーニングのフレームワークがいろいろ出てきたので、とりあえず始めるまでは良いのですが、データが増えると

  • メモリが足りなくなって失敗する
  • データを逐次ディスクから読み込んで入力することを試みるも、せっかくGPUで高速化させてもディスクI/Oのせいで学習速度が頭打ちになる

などを経験します。言い換えれば

  • 学習データと評価データがメモリに入り切らない場合でも、ディスクI/Oに支配されずにGPUによる高速化の恩恵を受けたい

ということになります。ディープラーニングは大量のデータを必要とするものですから、至極当たり前の話ではありますが、この状況を実現するまでに割と苦労します。

自分用のメモも兼ねて、Keras(TensorFlowバックエンド)のフレームワークを使った場合に、TFRecord & DataSetを活用して大量のデータを学習させるまでの手順をまとめてみます。なかなか情報がフルセットでまとまっていなかったりするので、いろいろなページから少しずつ参考にしたものを書いておきます。

事前準備

Linux (私はCentOS 7で試しました) 環境を仮定します。
Python 3が入っていなければ、pyenvなどを使ってインストールします。
次に、Python 3 (私は3.6を使っています) の環境に必要なパッケージを入れます。

pip3 install tensorflow keras

学習にGPUを使いたい方は

pip3 install tensorflow-gpu keras

となります。
GPUを使う場合、CUDAのバージョンによっては最新のTensorFlowで動作しないかもしれません。
Build from source | TensorFlow
を参考に、

  • CUDA 10を使うなら tensorflow_gpu-1.13.1
  • CUDA 9を使うなら tensorflow_gpu-1.12.0
  • CUDA 8を使うなら tensorflow_gpu-1.4.0

といった具合に選ぶとよいでしょう。

# CUDA 9の環境の場合
pip3 install tensorflow-gpu==1.12.0 keras

CPUを使う場合も、CPUがサポートしている命令セットによってはバージョンを落とさないといけないかもしれません。

データ整備

TensorFlowにはTFRecordと呼ばれる独自のデータフォーマットがあり、これを活用することで大量のデータを使った学習が容易になります。
データを読み込んで計算、ということを逐次的に繰り返す方法だと、データの読み込み中に計算が止まってしまい、GPUの計算能力を活かしきれません。それを防ぐためには、計算している間に裏で次のデータを読み込むようにすればよいですね。TFRecordから作成したbatchを使って学習するようにすれば、それができます。

(参考:学習データをTFRecordにした話 : TensorFlow将棋ソフト開発日誌 #12 - Qiita

何はともあれやってみましょう。
今回は手書き文字を分類するMNISTのデータを例にとっているので、メモリに入り切るサイズではありますが、実際には自分で学習させたいデータを大量に準備することになるでしょう。データの変換と書き込みの要領だけを押さえていただければよいと思います。

data2tfrecord.py
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
from keras.datasets import mnist

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_float_list(r[0:-1]),
        "y": feature_float_list([r[-1]])
    }))

filename_train = "train.tfrecords"
filename_test  = "test.tfrecords"

# === MNISTデータを読み込む ===
# 簡単のため、学習中の検証データに評価データと同じものを使うとする
(x_train, y_train), (x_test, y_test) = mnist.load_data()
print("x_train   : ", x_train.shape) # x_train   :  (60000, 28, 28)
print("y_train   : ", y_train.shape) # y_train   :  (60000,)
print("x_test    : ", x_test.shape)  # x_test    :  (10000, 28, 28)
print("y_test    : ", y_test.shape)  # y_test    :  (10000,)

# 前処理をする
# 画素は[0, 1]のfloat32型に変換する
# さらに、TFRecord化のために、特徴量を1次元にしておく(行がレコードに対応)
x_train = x_train.reshape((-1, 28*28)).astype("float32") / 255.0
x_test  = x_test.reshape((-1, 28*28)).astype("float32") / 255.0
# ラベルもfloat32型にする(one-hotベクトルにしなくてもよい:後述)
y_train = y_train.reshape((-1, 1)).astype("float32")
y_test  = y_test.reshape((-1, 1)).astype("float32")
# TFRecord化するために、特徴量とラベルを結合する
data_train = np.c_[x_train, y_train]
data_test = np.c_[x_test,  y_test]

# 実際には、学習したいデータを同じ形式に変換して作る。
# 全データがメモリに乗り切らない場合は、以下の書き込みフェーズで
# 少しずつ作って書き込むことを繰り返せばよい。

# 学習データをTFRecordに書き込む
with tf.python_io.TFRecordWriter(filename_train) as writer:
    for r in data_train:
        ex = record2example(r)
        writer.write(ex.SerializeToString())

# 評価データをTFRecordに書き込む
with tf.python_io.TFRecordWriter(filename_test) as writer:
    for r in data_test:
        ex = record2example(r)
        writer.write(ex.SerializeToString())

(参考:KerasでMNIST - Qiita

学習時に損失関数の設定に注意しさえすれば、ラベルはone-hotベクトルにしなくても学習できます。
それについてはこの後の学習の手順で述べます。

TFRecordファイルを分割したい

あまりにデータが大量になってくると、TFRecordファイルを複数のファイルに分割したくなります。数百GB以上のデータが1ファイルに入っているなんて、取り扱いが面倒で嫌ですね。(今回の例だと200MB弱しかありませんが)
必要に応じて、複数のファイルにデータを分散して書き込むこともできます。以下の例だと、train.tfrecords.0 から train.tfrecords.9 までの10個のファイルが作成されます。

# 学習データをTFRecordに書き込む
for i in range(10):
    with tf.python_io.TFRecordWriter(filename_train + "." + str(i)) as writer:
        for r in data_train[i::10]:
            ex = record2example(r)
            writer.write(ex.SerializeToString())

学習

train.py
#!/usr/bin/env python3

import numpy as np
import tensorflow as tf
from keras.layers import Dense, Input
from keras.optimizers import RMSprop
from keras.engine.network import Network
from keras.callbacks import Callback, ModelCheckpoint
from keras.models import Model

# 学習設定
batch_size = 32
epochs = 10
# 特徴量の設定
num_classes = 10    # ラベルの種類。0-9の10種類
feature_dim = 28*28 # 特徴量の次元。簡単のため1次元のままで扱う
# 学習・評価データ件数。事前に調べておく。
# 複数のTFRecordを使う場合、以下の件数は全ファイルの合計になることに注意。
num_records_train = 60000
num_records_test  = 10000
# 1エポックあたりのミニバッチ数。学習時に使う。
steps_per_epoch_train = (num_records_train-1) // batch_size + 1
steps_per_epoch_test  = (num_records_test-1) // batch_size + 1

# 学習中に検証を実施するためのコールバック
class EvaluateInputTensor(Callback):
    def __init__(self, model, steps, metrics_prefix="val", verbose=1):
        super(EvaluateInputTensor, self).__init__()
        self.val_model = model
        self.num_steps = steps
        self.verbose = verbose
        self.metrics_prefix = metrics_prefix

    def on_epoch_end(self, epoch, logs={}):
        # 重みパラメータは学習用と評価用で共有されているので、
        # そのまま評価用モデルで評価すればよい
        results = self.val_model.evaluate(
            steps=int(self.num_steps),
            verbose=self.verbose)
        # 評価結果を出力
        if self.verbose >= 0:
            metrics_str = ""
            for result, name in zip(results, self.val_model.metrics_names):
                metric_name = self.metrics_prefix + "_" + name
                logs[metric_name] = result
                if self.verbose > 0:
                    metrics_str += metric_name + ": " + str(result) + " "
            metrics_str += "\n"
            print(metrics_str)

# 1件のTFRecordをデコード
def parse_example(example):
    features = tf.parse_single_example(
        example,
        features={
            # リストを読み込む場合は次元数を指定する
            "x": tf.FixedLenFeature([feature_dim], dtype=tf.float32),
            "y": tf.FixedLenFeature([], dtype=tf.float32)
        })
    x = features["x"]
    y = features["y"]
    return x, y

# === TFRecordファイルのデータを学習・評価用に準備 ===
# イテレータはエポック間で使い回されるので、
# バッチ化した後に repeat(-1) をつける。
# 何ステップ実行すれば1エポックになるかは事前に計算する必要がある。
dataset_train = tf.data.TFRecordDataset(["train.tfrecords"]) \
    .map(parse_example) \
    .shuffle(batch_size * 100) \
    .batch(batch_size).repeat(-1)
iterator_train = dataset_train.make_one_shot_iterator()
# 上で複数のTFRecordファイルを用いる場合は、ファイル名のリストを指定する。
# dataset_train = tf.data.TFRecordDataset(["train.tfrecords.{}".format(i) for i in range(10)]) \

dataset_test = tf.data.TFRecordDataset(["test.tfrecords"]) \
    .map(parse_example) \
    .batch(batch_size).repeat(-1)
iterator_test = dataset_test.make_one_shot_iterator()

# 各バッチが返ってくる
# この後の処理は、必要になるまで実行されない(遅延評価される)
x_train, y_train = iterator_train.get_next()
x_test, y_test = iterator_test.get_next()
# 特徴量はバッチ単位で1次元リストになっているので、
# 各行が1個のデータになるようにreshapeする
x_train = tf.reshape(x_train, (-1, feature_dim))
x_test = tf.reshape(x_test, (-1, feature_dim))

# === モデル定義 ===
# 学習用と評価用で入力を変える必要があるため
# モデルをそれぞれ構築する

# 学習用と評価用の共通部分を定義
# 今回は512次元の中間層を1層だけ指定している
layer_input = Input(shape=(feature_dim,))
fc1 = Dense(512, activation="relu")(layer_input)
layer_output = Dense(num_classes, activation="softmax")(fc1)
common_network = Network(layer_input, layer_output)

# 学習モデル
# tensor引数にイテレータから作成した学習データを指定する
layer_input_train = Input(tensor=x_train)
layer_output_train = common_network(layer_input_train)
model_train = Model(inputs=layer_input_train, outputs=layer_output_train)
model_train.summary()
# ラベルがカテゴリ変数の場合でも loss="sparse_categorical_crossentropy" で学習できる
# ラベルをone-hotベクトル化した場合は、loss="categorical_crossentropy" になる
# target_tensors引数にイテレータから作成したラベルデータを指定する
model_train.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=RMSprop(),
    metrics=["accuracy"],
    target_tensors=[y_train])

# 評価モデル
layer_input_test = Input(tensor=x_test)
layer_output_test = common_network(layer_input_test)
model_test = Model(inputs=layer_input_test, outputs=layer_output_test)
model_test.summary()
model_test.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=RMSprop(),
    metrics=["accuracy"],
    target_tensors=[y_test])

# === 学習 ===

# 途中のモデルを保存しておく
cp_cb = ModelCheckpoint(
    filepath="weights.{epoch:02d}-{loss:.4f}-{val_loss:.4f}.hdf5",
    monitor="val_loss",
    verbose=1,
    save_best_only=True,
    mode="auto")

# 評価のために、validation_dataの代わりにcallbacksを指定する。
# 評価データはモデルに設定されているので、ここでは指定不要。
model_train.fit(
    epochs=epochs,
    verbose=1,
    steps_per_epoch=steps_per_epoch_train,
    # callbacksはこの順で指定しないと、cp_cbでval_lossが利用できずエラーになる。
    callbacks=[EvaluateInputTensor(model_test, steps=steps_per_epoch_test), cp_cb])

出力は例えば以下のようになります。

Using TensorFlow backend.
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
input_2 (InputLayer)         (None, 784)               0
_________________________________________________________________
network_1 (Network)          (None, 10)                407050
=================================================================
Total params: 407,050
Trainable params: 407,050
Non-trainable params: 0
_________________________________________________________________
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
input_3 (InputLayer)         (None, 784)               0
_________________________________________________________________
network_1 (Network)          (None, 10)                407050
=================================================================
Total params: 407,050
Trainable params: 407,050
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.1972 - acc: 0.9421
313/313 [==============================] - 2s 7ms/step
val_loss: 0.1329329576027712 val_acc: 0.9614616613418531


Epoch 00001: val_loss improved from inf to 0.13293, saving model to weights.01-0.1972-0.1329.hdf5
Epoch 2/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0901 - acc: 0.9748
313/313 [==============================] - 2s 7ms/step
val_loss: 0.1170103194958538 val_acc: 0.9683506389776357


Epoch 00002: val_loss improved from 0.13293 to 0.11701, saving model to weights.02-0.0901-0.1170.hdf5
Epoch 3/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0648 - acc: 0.9830
313/313 [==============================] - 2s 7ms/step
val_loss: 0.1171457467998597 val_acc: 0.9708466453674122


Epoch 00003: val_loss did not improve from 0.11701
Epoch 4/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0494 - acc: 0.9879
313/313 [==============================] - 2s 7ms/step
val_loss: 0.12178693860342216 val_acc: 0.9730431309904153


Epoch 00004: val_loss did not improve from 0.11701
Epoch 5/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0404 - acc: 0.9901
313/313 [==============================] - 2s 7ms/step
val_loss: 0.11576927793215017 val_acc: 0.9745407348242812


Epoch 00005: val_loss improved from 0.11701 to 0.11577, saving model to weights.05-0.0404-0.1158.hdf5
Epoch 6/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0320 - acc: 0.9926
313/313 [==============================] - 2s 7ms/step
val_loss: 0.11907471388537187 val_acc: 0.9759384984025559


Epoch 00006: val_loss did not improve from 0.11577
Epoch 7/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0261 - acc: 0.9938
313/313 [==============================] - 2s 7ms/step
val_loss: 0.13151024946480896 val_acc: 0.9738418530351438


Epoch 00007: val_loss did not improve from 0.11577
Epoch 8/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0217 - acc: 0.9949
313/313 [==============================] - 2s 7ms/step
val_loss: 0.12405901103234035 val_acc: 0.976138178913738


Epoch 00008: val_loss did not improve from 0.11577
Epoch 9/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0183 - acc: 0.9957
313/313 [==============================] - 2s 7ms/step
val_loss: 0.13328980769384846 val_acc: 0.9763378594249201


Epoch 00009: val_loss did not improve from 0.11577
Epoch 10/10
1875/1875 [==============================] - 40s 21ms/step - loss: 0.0151 - acc: 0.9966
313/313 [==============================] - 2s 8ms/step
val_loss: 0.13160042888180018 val_acc: 0.9770367412140575


Epoch 00010: val_loss did not improve from 0.11577

大筋は以下のページの内容を参考にしています。
TFRecord、DataSet API を Keras で使う - taka5hi’s blog

(特に複数の)GPUを使って学習する場合、モデルを with tf.device("/cpu:0"): のブロックの中で作って、keras.utils.training_utils.multi_gpu_model() を呼び出して得られたモデルで学習を実行する必要があると思います。

TFRecord, DataSetのbatchを使って学習させる場合、学習時ではなくモデル作成時に入力データを指定する必要がありますが、実際には同じモデル・学習されたパラメータで評価も行いたいので、ネットワークの共通部分を定義した後に、異なる入力データを持つ入力層を結合して学習モデル・評価モデルをそれぞれ作ります。
先のページは学習データがbatch、評価データがメモリ上のデータの場合で説明されていますが、学習データと評価データがともにbatchであるときには以下のページを参考にして扱うことができます。
Mnist tfrecord - Keras Documentation

あとはほぼコメントに書いたことの繰り返しになりますが、ざっと以下のような部分がポイントになると思います。

  • KerasのModelを学習する場合、イテレータはエポックごと使い捨てではなく、次のエポックを実行するときには続きから読み込みが行われるので、イテレータを無限ループにする(またはエポック数以上の回数だけ繰り返す)必要がある。そのため、全データに対応するミニバッチ数が分かっていないと想定通りの学習ができない。
  • モデルの損失関数を sparse_categorical_crossentropy とすると、離散値(カテゴリ変数)のラベルを取り扱える。その場合でも、出力層(softmax)の次元数はカテゴリの個数になる(ラベルをone-hotベクトルとした場合と同じ)。
  • batchを入力とする場合、Model.fit() にvalidation_data引数を与える代わりに、1エポックの学習が終わるごとに評価を実施するためのコールバックを作成する。公式ドキュメントでは学習モデルの重みを評価モデルの重みに設定するようなコードがあるが、今回は初めから重みを両モデルで共有して作っているので、重みの設定は必要ない。
  • ModelCheckpointを使って学習中のモデルの重みを自動的に保存する場合、保存の要否およびファイル名は評価を行わなければ決められないので、callbacksの指定順に注意する必要がある。

最初のポイントについてもう少し補足すると、例えばレコード数が300件、ミニバッチサイズが100だとすると、

  • 1-100件目
  • 101-200件目
  • 201-300件目
  • 1-100件目
  • 101-200件目
  • 201-300件目
  • (以下略)

というミニバッチが順に取得できるようなイテレータを作成しています。
(2019/05/16追記)shuffle()を呼んでいるので、「1-100件目」の中身は毎回変わっています。単に1周300件をランダムに100件ずつ3回に分けるという意味と解釈してください。

1エポックで全データをちょうど1回ずつ使うためには、ミニバッチ3個で全データが一巡することを知った上で、学習時に1エポックあたりミニバッチ3個を使うことを指定する必要があります(それがsteps_per_epoch引数の役割です)。ちょっと面倒ですが、全データの件数は最初に一度数えておけば済むので、まあ許容範囲かと思います。もし複数のTFRecordファイルを用いる場合には、全ファイルの合計の件数を調べる必要があることに注意しましょう。
評価に関しては、バッチサイズにはあまり神経質にならなくてもよいですが、小さすぎると処理に時間がかかり、大きすぎるとミニバッチがメモリに乗らないかもしれません。今回は学習データのバッチサイズと同じにしましたが、実際にはメモリに入る程度の大きな値を指定するのがよいと思われます。

学習済みのパラメータ確認

カレントディレクトリに weights.xx-x.xxxx-x.xxxx.hdf5 という形式の名前のファイルがいくつか作成されていると思います。このうちweightsの後の数字が最も大きいものを最終的なモデルのパラメータとすればよいでしょう。今回の例では weights.05-0.0404-0.1158.hdf5 です。以下のようにして学習済みのパラメータを読み込めます。

check_model.py
#!/usr/bin/env python3

from keras.layers import Dense, Input
from keras.engine.network import Network
from keras.models import Model

# 特徴量の設定
num_classes = 10    # ラベルの種類。0-9の10種類
feature_dim = 28*28 # 特徴量の次元。簡単のため1次元のままで扱う

# === モデル定義 ===

# 学習時に作成したのと同じ方法でモデルを作る
layer_input = Input(shape=(feature_dim,))
fc1 = Dense(512, activation="relu")(layer_input)
layer_output = Dense(num_classes, activation="softmax")(fc1)
common_network = Network(layer_input, layer_output)
layer_input_train = Input(shape=(feature_dim,))
layer_output_train = common_network(layer_input_train)
model = Model(inputs=layer_input_train, outputs=layer_output_train)
# ファイルからパラメータを読み込み
model.load_weights("weights.05-0.0404-0.1158.hdf5")
weights = model.get_weights()
print(weights)
print(len(weights))      # 4
print(weights[0].shape)  # (784, 512)
print(weights[1].shape)  # (512,)
print(weights[2].shape)  # (512, 10)
print(weights[3].shape)  # (10,)

今回、中間層はfc1という512次元の全結合層が1つだけなので、weightsは先頭から順に

  • 入力→fc1の重み行列(784×512)
  • 入力→fc1のバイアス(512次元)
  • fc1→出力の重み行列(512×10)
  • 出力のバイアス(10次元)

の値を表しています。

横着して model = Model(inputs=layer_input, outputs=layer_output) とすると、パラメータの読み込み時にモデルの層の数が違うというエラーになってしまいました。学習時のモデルをNetworkを使って書いていれば、読み込み時にも同じようにNetworkを使って書けば大丈夫、ということを理解しておけば良いのですが。

パフォーマンスの改善 (2019/05/16追記)

もしかすると、今回の例を実践するだけでは学習がまだまだ遅いかもしれません。
CPUやGPUが十分回っていない場合には、ちょっと工夫すれば速くなるかもしれません。

Data Input Pipeline Performance | TensorFlow Core | TensorFlow

学習データセット dataset_train を作成するところで、TFRecordファイルを複数に分割することを前提にこんな感じに書き換えてみます。

dataset_train = tf.data.Dataset.from_tensor_slices(["train.tfrecords.{}".format(i) for i in range(10)]) \
    .apply(tf.contrib.data.parallel_interleave(
               lambda filename: tf.data.TFRecordDataset(filename).map(parse_example, num_parallel_calls=1),
               cycle_length=10,
               sloppy=True)) \
    .shuffle(batch_size * 100) \
    .batch(batch_size) \
    .prefetch(1) \
    .repeat(-1)

prefetch(1)が地味に効くものでして、GPUが計算をしている間にCPUが次のデータを取りに行けるようになるので、高速化が期待できます。
また、parallel_interleave() を使用すると、複数のファイルから並列でデータを取りに行けるようになります。CPUのコア数やGPUのスペックにもよりますが、うまくいくとさらに高速化されることでしょう。普通は各ファイルから取ったデータはローテーションで使われるようですが、sloppy=Trueを指定すると、ローテーションを無視して早く取れたものから使っていきます。高速化が期待できますが、どれが先に取れるかはその時によって変わるので、結果の再現性がなくなる点には留意しましょう。

39
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
30