TensorFlowの高レベルAPIの使用方法:Dataset APIを使ってみる with Keras

TensorFlowでKerasを使ってみようシリーズ第三弾。

「TensorFlowの高レベルAPIの使用方法:Kerasの使い方」

https://qiita.com/cometscome_phys/items/d9553fe7c92e09fc14a9

「TensorFlowの高レベルAPIを使ったBatch Normalizationの実装:Keras版」

https://qiita.com/cometscome_phys/items/47467f1f0e3ab0a695cb

今回は以前の記事で取り扱った

Dataset API

「TensorFlowの高レベルAPIの使用方法2:Dataset APIを使ってみる」

https://qiita.com/cometscome_phys/items/a95a91f9822353303dd8

をKerasで取り扱ってみる。


バージョン

TensorFlow: 1.12.0

Keras: 2.1.6-tf


再現すべき関数

ここでは、ある関数

$$

y = a_0 x+ a_1 x^2 + b_0 + 3cos(20x)

$$

という関数を考える。ここで、最後のcosはノイズのようなものとして考えており、$a_0$と$a_1$と$b_0$によって得られる二次関数を得ることが目的となる。

データを300点作っておく。


test.py

import tensorflow as tf

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

n = 300
x0 = np.linspace(-2.0, 2.0, n)
a0 = 3.0
a1 = 2.0
b0 = 1.0
y0 = np.zeros(n)
y0[:] = a0*x0+a1*x0**2 + b0 + 3*np.cos(20*x0)

plt.plot(x0,y0 )
plt.show()
plt.savefig("graph.png")


この時のグラフは以下の通りである。

image.png

上のデータをフィッティングする際には、

$$

y = \sum_{k=0}^{k_{\rm max}} a_k x^k + b_0

$$

という形を考える。ここでは、$x^k$を基底関数として線形回帰をしていることになる。

詳しくは、

JuliaでTensorFlow その4: 線形基底関数を用いた回帰

https://qiita.com/cometscome_phys/items/92dba9f82cd58d877ec5

を参照。


インプットデータの生成

ここはこれまでの記事と同じである。

多項式をインプットデータとするので、


test.py

def make_phi(x0,n,k):    

phi = np.array([x0**j for j in range(k)])
return phi.T

という形でインプットデータを生成することにする。ここで$n$はデータの数である。

最大の次数を$3$とし、データの範囲は-2から2とすると


test.py

x0 = np.linspace(-2.0, 2.0, n)

k = 4
phi = make_phi(x0,n,k)

となる。

その時の教師データはy0である。

また、学習には使わない「テストデータ」を100点用意し、


test.py

n_test = 100

x0_test = np.linspace(-2.0, 2.0, n_test)
y0_test = np.zeros(n_test)
y0_test[:] = a0*x0_test+a1*x0_test**2 + b0 + 3*np.cos(20*x0_test)
phi_test = make_phi(x0_test,n_test,k)

としておく。


Dataset APIの使用

さて、Dataset APIを使ってみよう。

https://www.tensorflow.org/guide/keras

を参考にした。


test.py

batch_size = 20

dataset = tf.data.Dataset.from_tensor_slices(phi, y0).shuffle(10).batch(batch_size)
dataset = dataset.repeat()

となる。tf.data.Dataset.from_tensor_slicesはnumpy arrayからDataset フォーマットに変換するものである。

ここで、shuffle(10)の10はバッファーサイズらしいが、この数字の具体的な意味についてはわからなかった。最後のbatch(batch_size)というのは、データをbatch_size分束ねることを意味している。

同様に、テストデータに対しても、


test.py

test_dataset = tf.data.Dataset.from_tensor_slices(phi_test, y0_test).batch(n_test)

test_dataset = test_dataset.repeat()

としておく。テストデータはランダムに取る必要がなく、誤差評価には全体を使いたいのでn_testをバッチサイズとしている。

KerasではDataset API関連は終わりである。


モデル

あとはモデルを


test.py

import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import optimizers

def build_model(d_input,d_middle):
inputs = tf.keras.Input(shape=(d_input,)) #インプットの次元を指定
x = layers.Dense(d_middle, activation='relu')(inputs) #中間層の指定
y = layers.Dense(1)(x) #最終層の指定
adam = optimizers.Adam() #最適化にはAdamを使用
model = tf.keras.Model(inputs=inputs, outputs=y) #モデルのインプットとアウトプットを指定

model.compile(optimizer=adam,
loss='mean_squared_error') #modelの構築。平均二乗誤差をloss関数とした。

return model


として、


test.py

d_input = k

d_middle = 10
model = build_model(d_input,d_middle)

とする。


学習

学習を行うコードは、model.fitを呼び出すだけで済む。

今回は収束したら途中で止める機能も実装してみよう。

(参考:https://qiita.com/yukiB/items/f45f0f71bc9739830002)

途中で止めるには、


test.py

es_cb = callbacks.EarlyStopping(monitor='val_loss', patience=0, verbose=0, mode='auto')


を定義して、


test.py

history = model.fit(dataset, epochs=1000, steps_per_epoch=1,validation_data=test_dataset,

validation_steps=1,callbacks=[es_cb])

とすればよい。

Dataset APIを使っている場合には、model.fitに定義したdatasetを入れれば良い。numpy配列をそのまま入れるのと異なり、全体のデータ数がわからない場合があるので、steps_per_epochに何回呼べば全てのデータを見たことになるのか、を指定しておく。validation_stepsはテストデータに関連するものである。

これで、学習が実行できて、

結果をプロット


test.py

ytest = model.predict(phi_test)

plt.plot(x0_test ,y0_test )
plt.plot(x0_test ,ytest,'o')
plt.show()
plt.savefig("graph.png")

plt.yscale("log")
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['test'], loc='upper left')
plt.show()
plt.savefig("test.png")


すると、

image.png

image.png

となる。

前のDataset APIの記事と比べて拍子抜けするほど簡単で驚いた。


追記

複数のインプットの時には、


test.py

dataset_train = tf.data.Dataset.from_tensor_slices(((input_1,input_2,input_3),output_1)).shuffle(10).batch(100)


のようにして、モデルを作る時に


test.py

input_data_1 = tf.keras.Input(shape=(d_input_1,))

input_data_2 = tf.keras.Input(shape=(d_input_2,))
input_data_3 = tf.keras.Input(shape=(d_input_3,))
model = tf.keras.Model(inputs=[input_data_1,input_data_2,input_data_3], outputs=y)

として、学習の時には、


test.py

history = model.fit(dataset_train,epochs=1000, steps_per_epoch=80)


みたいにすればよいようだ。