27
12

More than 3 years have passed since last update.

[TensorFlow 2] TFRecordからの特徴量読み込みはバッチ単位でやるのがオススメ

Last updated at Posted at 2020-03-12

はじめに

TensorFlowで大量のデータを学習させるときには、Dataset APIを使ってTFRecordに保存した特徴量を読み込むようにすると便利です。
[TensorFlow 2.x対応版] TensorFlow (Keras) で TFRecord & DataSetを使って大量のデータを学習させる方法 - Qiita

検索するとサンプルコードがたくさん見つかりますが、実は読み込み方を工夫することで、よく見る方法よりも格段に速く読み込めるようになる可能性があることが分かりました。

検証環境

  • Ubuntu 18.04
  • Python 3.6.9
  • TensorFlow 2.1.0 (CPU使用)

よく紹介されている読み込み方法

上の記事の他、公式ドキュメントや他のサイトでよく紹介されている読み込み方として、tf.io.parse_single_example() を使う方法があります。
TFRecords と tf.Example の使用法 | TensorFlow Core

import tensorflow as tf
import numpy as np

feature_dim = 784
def parse_example(example):
    features = tf.io.parse_single_example(
        example,
        features={
            "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
            "y": tf.io.FixedLenFeature([], dtype=tf.float32)
        })
    x = features["x"]
    y = features["y"]
    return x, y

ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1)
print(next(iter(ds1)))

こんな感じで、Datasetに対し各レコードを特徴量に変換する処理を map() で入れてやります。
おそらく最もメジャーな使い方ではないでしょうか。

でも、どうも処理が遅い気がするんですよね…。GPUで学習していても、GPU使用率が100%近くに張り付いているわけでもないのに、CPUの使用率がいまいち伸びていません。I/Oがボトルネックになっている気がします。

バッチ単位で読み込めないの?

公式ドキュメントを見ると、Datasetを変換するときの一般論として

Invoking a user-defined function passed into the map transformation has overhead related to scheduling and executing the user-defined function. We recommend vectorizing the user-defined function (that is, have it operate over a batch of inputs at once) and apply the batch transformation before the map transformation.

と書いてあります。
Better performance with the tf.data API | TensorFlow Core

要するに「ユーザ定義関数を使った map() はバッチ単位でやるのがオススメ」とのことです。
それなら、データの読み込み・デコードもバッチ単位でできたらパフォーマンスが上がるのでしょうか?

全く日本語の資料が出てきませんでしたが、tf.data.experimental.parse_example_dataset() を使うことで、バッチ単位で特徴量のデコードができるみたいです。1
以下のように、デコード処理はバッチ化した後に入ります。

feature_dim = 784
ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          }))
print(ds2)
print(next(iter(ds2)))

各レコードは dict 形式で返ってくるので、keras.Model.fit() で学習するときには別途タプルに変換しないといけません。レコード単位のときには parse_example() の中でタプルへの変換まで一気に書けますが、こちらでは別途 map() でタプルへの変換処理を追加する必要があります。

パフォーマンス比較

実際にやってみました。
MNISTのテストデータ10000件を書き込んで、それを読み出す部分の処理時間を測ります。
今回は学習までは試しませんが、このあと学習に使うことを前提とするので、バッチ単位の場合はレコードをタプルに変換する処理も含めます。

まずはデータをTFRecordファイルに書き込みます。

data2tfrecord.py
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_float_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename_test  = "test.tfrecords"

# MNISTの評価データ10000件を書き込む
_, (x_test, y_test) = mnist.load_data()
print("x_test    : ", x_test.shape)  # x_test    :  (10000, 28, 28)
print("y_test    : ", y_test.shape)  # y_test    :  (10000,)
x_test  = x_test.reshape((-1, 28*28)).astype("float32") / 255.0
y_test  = y_test.reshape((-1, 1)).astype("float32")
with tf.io.TFRecordWriter(filename_test) as writer:
    for r_x, r_y in zip(x_test, y_test):
        ex = record2example(r_x, r_y)
        writer.write(ex.SerializeToString())

続いて、これを2種類の方法で読み込みます。

read_tfrecord.py
import tensorflow as tf
import numpy as np

feature_dim = 784

def parse_example(example):
    features = tf.io.parse_single_example(example, features={
        "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
        "y": tf.io.FixedLenFeature([], dtype=tf.float32)
    })
    x = features["x"]
    y = features["y"]
    return x, y

ds1 = tf.data.TFRecordDataset(["test.tfrecords"]).map(parse_example).batch(512)
print(ds1) # <BatchDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>

def dict2tuple(feat):
    return feat["x"], feat["y"]

ds2 = tf.data.TFRecordDataset(["test.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })) \
          .map(dict2tuple)
print(ds2) # <MapDataset shapes: ((None, 784), (None,)), types: (tf.float32, tf.float32)>

ds1ds2 は、作り方こそ違いますが、最終的には全く同じデータになっていることに注意してください。バッチサイズも、返ってくるデータも同じになります。

ipython -i read_tfrecord.py で対話シェルを起動して、10000件をすべてデコードするのに必要な処理時間を測ってみます。

ipython
In [1]: %timeit [1 for _ in iter(ds1)]
1 loop, best of 3: 1.4 s per loop

In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 56.3 ms per loop

バッチ単位で読み込む方法の圧勝ですね…!

もっと速い方法 (2020/07/21追記)

tf.io.parse_single_example() のバッチ版として tf.io.parse_example() があり、使い所が分からずにいましたが、ようやく使えたのでメモします。
そして、普通にこちらを使ったほうが速い模様です。

以下の実行結果はTensorFlow 2.2.0のものです。計算時間は改めて測定しています。

def parse_batch_example(example): 
    features = tf.io.parse_example(example, features={ 
        "x": tf.io.FixedLenFeature([feature_dim], dtype=tf.float32), 
        "y": tf.io.FixedLenFeature([], dtype=tf.float32) 
    }) 
    x = features["x"] 
    y = features["y"] 
    return x, y 

ds3 = tf.data.TFRecordDataset(["test.tfrecords"]) \ 
          .batch(512) \ 
          .map(parse_batch_example)

%timeit [1 for _ in iter(ds1)]                                                                                                                
# 1.68 s ± 46.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit [1 for _ in iter(ds2)]                                                                                                                
# 71.8 ms ± 7.99 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit [1 for _ in iter(ds3)]
# 56.3 ms ± 1.73 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

一回パースした後に map() をもう一回実行するよりは、map() 一発で済む方法のほうが直感的にも速そうではあります。
この記事の他の例も、おそらく tf.io.parse_example() で書くともう少し速くなるのではと思います。

特徴量が可変長だったらどうする?

先の例では、x は固定の長さ(784次元)を持っていたのですが、これが可変長(レコードによって変わる)になるとちょっと面倒です。
一般的には、可変長のデータはシリアライズして tf.string として扱う方法がメジャーのようです。

data2tfrecord_var.py
import numpy as np
import tensorflow as tf

def feature_bytes_list(l):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=l))

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_bytes_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename  = "random.tfrecords"
# 可変長のデータ1000件を書き込む
with tf.io.TFRecordWriter(filename) as writer:
    for i in range(1000):
        r_x = np.random.random(i+1).astype("float32")
        r_y = np.random.random(1)
        ex = record2example([r_x.tostring()], r_y)
        writer.write(ex.SerializeToString())

レコード単位でデコードする場合、以下のように読み込みます。

read_tfrecord_var.py
import tensorflow as tf
import numpy as np

def parse_example(example):
    features = tf.io.parse_single_example(
        example,
        features={
            "x": tf.io.FixedLenFeature([], dtype=tf.string),
            "y": tf.io.FixedLenFeature([], dtype=tf.float32)
        })
    x = tf.io.decode_raw(features["x"], tf.float32)
    y = [features["y"]]
    return x, y

ds1 = tf.data.TFRecordDataset(["random.tfrecords"]).map(parse_example).padded_batch(512, ([None], [1]))
print(ds1) # <PaddedBatchDataset shapes: ((None, None), (None, 1)), types: (tf.float32, tf.float32)>

バッチ単位では x の列の数は一番長い特徴量に合わせられ、長さの足りない部分は0埋めされます。

ipython
In [1]: %timeit [1 for _ in iter(ds1)]
10 loops, best of 3: 153 ms per loop

バッチ単位でやる場合はどうなるのでしょうか?
x の次元数はレコードごとに異なるので、Datasetをバッチ化してから map()decode_raw する方法だと失敗します。

def dict2tuple(feature):
    return tf.io.decode_raw(feature["x"], tf.float32), [feature["y"]]

ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
           .batch(512) \
           .apply(tf.data.experimental.parse_example_dataset({
               "x": tf.io.FixedLenFeature([], dtype=tf.string),
               "y": tf.io.FixedLenFeature([], dtype=tf.float32)
           })) \
           .map(dict2tuple)

print(next(iter(ds2)))
# InvalidArgumentError: DecodeRaw requires input strings to all be the same size, but element 1 has size 4 != 8

だからといって unbatch() してから decode_raw するのでは、せっかくの高速化メリットが失われてしまいます。

ds2 = tf.data.TFRecordDataset(["random.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.FixedLenFeature([], dtype=tf.string),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })).unbatch().map(dict2tuple).padded_batch(512, ([None], [1]))
ipython
In [2]: %timeit [1 for _ in iter(ds2)]
10 loops, best of 3: 136 ms per loop

RaggedFeature

ここで救世主が登場します。
TensorFlow 2.1以降でしか使えないのですが、データの読み込み時に RaggedFeature と呼ばれる新たな種類の特徴量を指定できるようになりました。
tf.io.RaggedFeature | TensorFlow Core v2.1.0

これを使うと、デコードされた特徴量が RaggedTensor となります。普通の Tensor は、行ごとに列の数が同じである必要がありますが、RaggedTensor ではその必要がありません。行ごとに列の数が異なるような Tensor を表現することができます。
tf.RaggedTensor | TensorFlow Core v2.1.0

まず、データの書き込み時には、可変長の特徴量を float32 のリストのまま使って Features を作成します。

def feature_float_list(l):
    return tf.train.Feature(float_list=tf.train.FloatList(value=l))

def record2example(r_x, r_y):
    return tf.train.Example(features=tf.train.Features(feature={
        "x": feature_float_list(r_x),
        "y": feature_float_list(r_y)
    }))

filename = "random2.tfrecords" # 先ほどと名前を変えました
with tf.io.TFRecordWriter(filename) as writer:
    for i in range(1000):
        r_x = np.random.random(i+1).astype("float32")
        r_y = np.random.random(1)
        ex = record2example(r_x, r_y)
        writer.write(ex.SerializeToString())

読み込み時には、特徴量として RaggedFeature を指定します。

ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.RaggedFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          }))

ここで ds2 の各レコードが dict になるのは固定長の場合と同じなのですが、xRaggedTensor になる点が異なります。RaggedTensor の各行をスライスすると、以下のようにサイズがバラバラの Tensor が出てきます。

ipython
In [1]: next(iter(ds2))["x"][0]
Out[1]: <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.8635351], dtype=float32)>

In [2]: next(iter(ds2))["x"][1]
Out[2]: <tf.Tensor: shape=(2,), dtype=float32, numpy=array([0.66411597, 0.8526721 ], dtype=float32)>

In [3]: next(iter(ds2))["x"][2]
Out[3]: <tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.7902446 , 0.13108689, 0.05331135], dtype=float32)>

短い特徴量の末尾を0埋めして、バッチ単位で普通の Tensor にすることができます。これで、レコード単位でデコードする場合と同じバッチが得られます。

def dict2tuple(feature):
    return feature["x"].to_tensor(), [feature["y"]]

ds2 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.RaggedFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })).map(dict2tuple)
ipython
In [4]: %timeit [1 for _ in iter(ds2)]
100 loops, best of 3: 18.6 ms per loop

レコード単位で処理した場合の10分の1近くに短縮されました。素晴らしい!

VarLenFeature

実は、TensorFlow 1.x/2.0にも可変長の特徴量を読み込む方法があります。
特徴量の種類を VarLenFeature とすると、SparseTensor として特徴量を読むことができます。
TFRecordの作り方は RaggedFeature と同じです。

def dict2tuple(feature):
    return tf.sparse.to_dense(feature["x"]), [feature["y"]]

ds3 = tf.data.TFRecordDataset(["random2.tfrecords"]) \
          .batch(512) \
          .apply(tf.data.experimental.parse_example_dataset({
              "x": tf.io.VarLenFeature(tf.float32),
              "y": tf.io.FixedLenFeature([], dtype=tf.float32)
          })) \
          .map(dict2tuple)
ipython
In [5]: %timeit [1 for _ in iter(ds3)]
10 loops, best of 3: 39.9 ms per loop

確かにレコード単位の場合よりは格段に速いのですが、RaggedFeature と比べると遅いです。
できればTensorFlow 2.1以降で RaggedFeature を使いたいですね。

まとめ

  • TFRecordからの読み込みはバッチ単位でやりましょう。
  • バッチ化した後に parse_example_dataset() を使って変換しましょう。この関数の戻り値を、Datasetの apply() の引数に指定します。
  • 可変長の特徴量は、TensorFlow 2.1以降なら RaggedFeature を指定して読み込みましょう。

  1. tf.io.parse_example() というのもあり、サンプルコードも見つけましたが、どうも1.x系(0.x系?)の名残らしく、うまく使えませんでした。(TFRecordReader を使おうとすると、Eager Executionでは使えないと怒られました)(2020/07/21) Dataset.map() の中ならばEager Executionではないので使えることが判明しました。 

27
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
27
12