はじめに
TensorFlowで大量のデータを学習させるときには、Dataset APIを使ってTFRecordに保存した特徴量を読み込むようにすると便利です。
[TensorFlow 2.x対応版] TensorFlow (Keras) で TFRecord & DataSetを使って大量のデータを学習させる方法 - Qiita
検索するとサンプルコードがたくさん見つかりますが、実は読み込み方を工夫することで、よく見る方法よりも格段に速く読み込めるようになる可能性があることが分かりました。
検証環境
- Ubuntu 18.04
- Python 3.6.9
- TensorFlow 2.1.0 (CPU使用)
よく紹介されている読み込み方法
上の記事の他、公式ドキュメントや他のサイトでよく紹介されている読み込み方として、tf.io.parse_single_example()
を使う方法があります。
TFRecords と tf.Example の使用法 | TensorFlow Core
import tensorflow as tf
import numpy as np
feature_dim = 784
def parse_example(example):
features = tf.io.parse_single_example(
example,
features={
"x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})
x = features["x"]
y = features["y"]
return x, y
ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1)
print(next(iter(ds1)))
こんな感じで、Datasetに対し各レコードを特徴量に変換する処理を map()
で入れてやります。
おそらく最もメジャーな使い方ではないでしょうか。
でも、どうも処理が遅い気がするんですよね…。GPUで学習していても、GPU使用率が100%近くに張り付いているわけでもないのに、CPUの使用率がいまいち伸びていません。I/Oがボトルネックになっている気がします。
バッチ単位で読み込めないの?
公式ドキュメントを見ると、Datasetを変換するときの一般論として
Invoking a user-defined function passed into the map transformation has overhead related to scheduling and executing the user-defined function. We recommend vectorizing the user-defined function (that is, have it operate over a batch of inputs at once) and apply the batch transformation before the map transformation.
と書いてあります。
Better performance with the tf.data API | TensorFlow Core
要するに「ユーザ定義関数を使った map()
はバッチ単位でやるのがオススメ」とのことです。
それなら、データの読み込み・デコードもバッチ単位でできたらパフォーマンスが上がるのでしょうか?
全く日本語の資料が出てきませんでしたが、tf.data.experimental.parse_example_dataset()
を使うことで、バッチ単位で特徴量のデコードができるみたいです。1
以下のように、デコード処理はバッチ化した後に入ります。
feature_dim = 784
ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
}))
print(ds2)
print(next(iter(ds2)))
各レコードは dict
形式で返ってくるので、keras.Model.fit()
で学習するときには別途タプルに変換しないといけません。レコード単位のときには parse_example()
の中でタプルへの変換まで一気に書けますが、こちらでは別途 map()
でタプルへの変換処理を追加する必要があります。
パフォーマンス比較
実際にやってみました。
MNISTのテストデータ10000件を書き込んで、それを読み出す部分の処理時間を測ります。
今回は学習までは試しませんが、このあと学習に使うことを前提とするので、バッチ単位の場合はレコードをタプルに変換する処理も含めます。
まずはデータをTFRecordファイルに書き込みます。
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
def feature_float_list(l):
return tf.train.Feature(float_list=tf.train.FloatList(value=l))
def record2example(r_x, r_y):
return tf.train.Example(features=tf.train.Features(feature={
"x": feature_float_list(r_x),
"y": feature_float_list(r_y)
}))
filename_test = "test.tfrecords"
# MNISTの評価データ10000件を書き込む
_, (x_test, y_test) = mnist.load_data()
print("x_test : ", x_test.shape) # x_test : (10000, 28, 28)
print("y_test : ", y_test.shape) # y_test : (10000,)
x_test = x_test.reshape((-1, 28*28)).astype("float32") / 255.0
y_test = y_test.reshape((-1, 1)).astype("float32")
with tf.io.TFRecordWriter(filename_test) as writer:
for r_x, r_y in zip(x_test, y_test):
ex = record2example(r_x, r_y)
writer.write(ex.SerializeToString())
続いて、これを2種類の方法で読み込みます。
import tensorflow as tf
import numpy as np
feature_dim = 784
def parse_example(example):
features = tf.io.parse_single_example(example, features={
"x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})
x = features["x"]
y = features["y"]
return x, y
ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1) # <BatchDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>
def dict2tuple(feat):
return feat["x"], feat["y"]
ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})) \
.map(dict2tuple)
print(ds2) # <MapDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>
ds1
と ds2
は、作り方こそ違いますが、最終的には全く同じデータになっていることに注意してください。バッチサイズも、返ってくるデータも同じになります。
ipython -i read_tfrecord.py
で対話シェルを起動して、10000件をすべてデコードするのに必要な処理時間を測ってみます。
In [1]: %timeit [1 for _ in iter(ds1)]
1 loop, best of 3: 1.4 s per loop
In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 56.3 ms per loop
バッチ単位で読み込む方法の圧勝ですね…!
もっと速い方法 (2020/07/21追記)
tf.io.parse_single_example()
のバッチ版として tf.io.parse_example()
があり、使い所が分からずにいましたが、ようやく使えたのでメモします。
そして、普通にこちらを使ったほうが速い模様です。
以下の実行結果はTensorFlow 2.2.0のものです。計算時間は改めて測定しています。
def parse_batch_example(example):
features = tf.io.parse_example(example, features={
"x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})
x = features["x"]
y = features["y"]
return x, y
ds3 = tf.data.TFRecordDataset(["test.tfrecords"]) \
.batch(512) \
.map(parse_batch_example)
%timeit [1 for _ in iter(ds1)]
# 1.68 s ± 46.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit [1 for _ in iter(ds2)]
# 71.8 ms ± 7.99 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit [1 for _ in iter(ds3)]
# 56.3 ms ± 1.73 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
一回パースした後に map()
をもう一回実行するよりは、map()
一発で済む方法のほうが直感的にも速そうではあります。
この記事の他の例も、おそらく tf.io.parse_example()
で書くともう少し速くなるのではと思います。
特徴量が可変長だったらどうする?
先の例では、x
は固定の長さ(784次元)を持っていたのですが、これが可変長(レコードによって変わる)になるとちょっと面倒です。
一般的には、可変長のデータはシリアライズして tf.string
として扱う方法がメジャーのようです。
import numpy as np
import tensorflow as tf
def feature_bytes_list(l):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=l))
def feature_float_list(l):
return tf.train.Feature(float_list=tf.train.FloatList(value=l))
def record2example(r_x, r_y):
return tf.train.Example(features=tf.train.Features(feature={
"x": feature_bytes_list(r_x),
"y": feature_float_list(r_y)
}))
filename = "random.tfrecords"
# 可変長のデータ1000件を書き込む
with tf.io.TFRecordWriter(filename) as writer:
for i in range(1000):
r_x = np.random.random(i+1).astype("float32")
r_y = np.random.random(1)
ex = record2example([r_x.tostring()], r_y)
writer.write(ex.SerializeToString())
レコード単位でデコードする場合、以下のように読み込みます。
import tensorflow as tf
import numpy as np
def parse_example(example):
features = tf.io.parse_single_example(
example,
features={
"x": tf.io.FixedLenFeature([], dtype=tf.string),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})
x = tf.io.decode_raw(features["x"], tf.float32)
y = [features["y"]]
return x, y
ds1 = tf.data.TFRecordDataset(["random.tfrecords"]).map(parse_example).padded_batch(512, ([None], [1]))
print(ds1) # <PaddedBatchDataset shapes: ((None, None), (None, 1)), types: (tf.float32, tf.float32)>
バッチ単位では x
の列の数は一番長い特徴量に合わせられ、長さの足りない部分は0埋めされます。
In [1]: %timeit [1 for _ in iter(ds1)]
10 loops, best of 3: 153 ms per loop
バッチ単位でやる場合はどうなるのでしょうか?
x
の次元数はレコードごとに異なるので、Datasetをバッチ化してから map()
で decode_raw
する方法だと失敗します。
def dict2tuple(feature):
return tf.io.decode_raw(feature["x"], tf.float32), [feature["y"]]
ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.FixedLenFeature([], dtype=tf.string),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})) \
.map(dict2tuple)
print(next(iter(ds2)))
# InvalidArgumentError: DecodeRaw requires input strings to all be the same size, but element 1 has size 4 != 8
だからといって unbatch()
してから decode_raw
するのでは、せっかくの高速化メリットが失われてしまいます。
ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.FixedLenFeature([], dtype=tf.string),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})).unbatch().map(dict2tuple).padded_batch(512, ([None], [1]))
In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 136 ms per loop
RaggedFeature
ここで救世主が登場します。
TensorFlow 2.1以降でしか使えないのですが、データの読み込み時に RaggedFeature
と呼ばれる新たな種類の特徴量を指定できるようになりました。
tf.io.RaggedFeature | TensorFlow Core v2.1.0
これを使うと、デコードされた特徴量が RaggedTensor
となります。普通の Tensor
は、行ごとに列の数が同じである必要がありますが、RaggedTensor
ではその必要がありません。行ごとに列の数が異なるような Tensor
を表現することができます。
tf.RaggedTensor | TensorFlow Core v2.1.0
まず、データの書き込み時には、可変長の特徴量を float32
のリストのまま使って Features
を作成します。
def feature_float_list(l):
return tf.train.Feature(float_list=tf.train.FloatList(value=l))
def record2example(r_x, r_y):
return tf.train.Example(features=tf.train.Features(feature={
"x": feature_float_list(r_x),
"y": feature_float_list(r_y)
}))
filename = "random2.tfrecords" # 先ほどと名前を変えました
with tf.io.TFRecordWriter(filename) as writer:
for i in range(1000):
r_x = np.random.random(i+1).astype("float32")
r_y = np.random.random(1)
ex = record2example(r_x, r_y)
writer.write(ex.SerializeToString())
読み込み時には、特徴量として RaggedFeature
を指定します。
ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.RaggedFeature(tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
}))
ここで ds2
の各レコードが dict
になるのは固定長の場合と同じなのですが、x
が RaggedTensor
になる点が異なります。RaggedTensor
の各行をスライスすると、以下のようにサイズがバラバラの Tensor
が出てきます。
In [1]: next(iter(ds2))["x"][0]
Out[1]: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.8635351], dtype=float32)>
In [2]: next(iter(ds2))["x"][1]
Out[2]: <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0.66411597, 0.8526721 ], dtype=float32)>
In [3]: next(iter(ds2))["x"][2]
Out[3]: <tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.7902446 , 0.13108689, 0.05331135], dtype=float32)>
短い特徴量の末尾を0埋めして、バッチ単位で普通の Tensor
にすることができます。これで、レコード単位でデコードする場合と同じバッチが得られます。
def dict2tuple(feature):
return feature["x"].to_tensor(), [feature["y"]]
ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.RaggedFeature(tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})).map(dict2tuple)
In [4]: %timeit [1 for _ in iter(ds2)]
100 loops, best of 3: 18.6 ms per loop
レコード単位で処理した場合の10分の1近くに短縮されました。素晴らしい!
VarLenFeature
実は、TensorFlow 1.x/2.0にも可変長の特徴量を読み込む方法があります。
特徴量の種類を VarLenFeature
とすると、SparseTensor
として特徴量を読むことができます。
TFRecordの作り方は RaggedFeature
と同じです。
def dict2tuple(feature):
return tf.sparse.to_dense(feature["x"]), [feature["y"]]
ds3 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
.batch(512) \
.apply(tf.data.experimental.parse_example_dataset({
"x": tf.io.VarLenFeature(tf.float32),
"y": tf.io.FixedLenFeature([], dtype=tf.float32)
})) \
.map(dict2tuple)
In [5]: %timeit [1 for _ in iter(ds3)]
10 loops, best of 3: 39.9 ms per loop
確かにレコード単位の場合よりは格段に速いのですが、RaggedFeature
と比べると遅いです。
できればTensorFlow 2.1以降で RaggedFeature
を使いたいですね。
まとめ
- TFRecordからの読み込みはバッチ単位でやりましょう。
- バッチ化した後に
parse_example_dataset()
を使って変換しましょう。この関数の戻り値を、Datasetのapply()
の引数に指定します。 - 可変長の特徴量は、TensorFlow 2.1以降なら
RaggedFeature
を指定して読み込みましょう。