TensorFlowでKerasを使ってみようシリーズ第三弾。
「TensorFlowの高レベルAPIの使用方法:Kerasの使い方」
https://qiita.com/cometscome_phys/items/d9553fe7c92e09fc14a9
「TensorFlowの高レベルAPIを使ったBatch Normalizationの実装:Keras版」
https://qiita.com/cometscome_phys/items/47467f1f0e3ab0a695cb
今回は以前の記事で取り扱った
Dataset API
「TensorFlowの高レベルAPIの使用方法2:Dataset APIを使ってみる」
https://qiita.com/cometscome_phys/items/a95a91f9822353303dd8
をKerasで取り扱ってみる。
バージョン
TensorFlow: 1.12.0
Keras: 2.1.6-tf
#再現すべき関数
ここでは、ある関数
$$
y = a_0 x+ a_1 x^2 + b_0 + 3cos(20x)
$$
という関数を考える。ここで、最後のcosはノイズのようなものとして考えており、$a_0$と$a_1$と$b_0$によって得られる二次関数を得ることが目的となる。
データを300点作っておく。
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
n = 300
x0 = np.linspace(-2.0, 2.0, n)
a0 = 3.0
a1 = 2.0
b0 = 1.0
y0 = np.zeros(n)
y0[:] = a0*x0+a1*x0**2 + b0 + 3*np.cos(20*x0)
plt.plot(x0,y0 )
plt.show()
plt.savefig("graph.png")
この時のグラフは以下の通りである。
上のデータをフィッティングする際には、
$$
y = \sum_{k=0}^{k_{\rm max}} a_k x^k + b_0
$$
という形を考える。ここでは、$x^k$を基底関数として線形回帰をしていることになる。
詳しくは、
JuliaでTensorFlow その4: 線形基底関数を用いた回帰
https://qiita.com/cometscome_phys/items/92dba9f82cd58d877ec5
を参照。
インプットデータの生成
ここはこれまでの記事と同じである。
多項式をインプットデータとするので、
def make_phi(x0,n,k):
phi = np.array([x0**j for j in range(k)])
return phi.T
という形でインプットデータを生成することにする。ここで$n$はデータの数である。
最大の次数を$3$とし、データの範囲は-2から2とすると
x0 = np.linspace(-2.0, 2.0, n)
k = 4
phi = make_phi(x0,n,k)
となる。
その時の教師データはy0である。
また、学習には使わない「テストデータ」を100点用意し、
n_test = 100
x0_test = np.linspace(-2.0, 2.0, n_test)
y0_test = np.zeros(n_test)
y0_test[:] = a0*x0_test+a1*x0_test**2 + b0 + 3*np.cos(20*x0_test)
phi_test = make_phi(x0_test,n_test,k)
としておく。
Dataset APIの使用
さて、Dataset APIを使ってみよう。
https://www.tensorflow.org/guide/keras
を参考にした。
batch_size = 20
dataset = tf.data.Dataset.from_tensor_slices(phi, y0).shuffle(10).batch(batch_size)
dataset = dataset.repeat()
となる。tf.data.Dataset.from_tensor_slices
はnumpy arrayからDataset フォーマットに変換するものである。
ここで、shuffle(10)
の10はバッファーサイズらしいが、この数字の具体的な意味についてはわからなかった。最後のbatch(batch_size)というのは、データをbatch_size分束ねることを意味している。
同様に、テストデータに対しても、
test_dataset = tf.data.Dataset.from_tensor_slices(phi_test, y0_test).batch(n_test)
test_dataset = test_dataset.repeat()
としておく。テストデータはランダムに取る必要がなく、誤差評価には全体を使いたいのでn_testをバッチサイズとしている。
KerasではDataset API関連は終わりである。
モデル
あとはモデルを
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import optimizers
def build_model(d_input,d_middle):
inputs = tf.keras.Input(shape=(d_input,)) #インプットの次元を指定
x = layers.Dense(d_middle, activation='relu')(inputs) #中間層の指定
y = layers.Dense(1)(x) #最終層の指定
adam = optimizers.Adam() #最適化にはAdamを使用
model = tf.keras.Model(inputs=inputs, outputs=y) #モデルのインプットとアウトプットを指定
model.compile(optimizer=adam,
loss='mean_squared_error') #modelの構築。平均二乗誤差をloss関数とした。
return model
として、
d_input = k
d_middle = 10
model = build_model(d_input,d_middle)
とする。
学習
学習を行うコードは、model.fit
を呼び出すだけで済む。
今回は収束したら途中で止める機能も実装してみよう。
(参考:https://qiita.com/yukiB/items/f45f0f71bc9739830002)
途中で止めるには、
es_cb = callbacks.EarlyStopping(monitor='val_loss', patience=0, verbose=0, mode='auto')
を定義して、
history = model.fit(dataset, epochs=1000, steps_per_epoch=1,validation_data=test_dataset,
validation_steps=1,callbacks=[es_cb])
とすればよい。
Dataset APIを使っている場合には、model.fit
に定義したdatasetを入れれば良い。numpy配列をそのまま入れるのと異なり、全体のデータ数がわからない場合があるので、steps_per_epoch
に何回呼べば全てのデータを見たことになるのか、を指定しておく。validation_steps
はテストデータに関連するものである。
これで、学習が実行できて、
結果をプロット
ytest = model.predict(phi_test)
plt.plot(x0_test ,y0_test )
plt.plot(x0_test ,ytest,'o')
plt.show()
plt.savefig("graph.png")
plt.yscale("log")
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['test'], loc='upper left')
plt.show()
plt.savefig("test.png")
すると、
となる。
前のDataset APIの記事と比べて拍子抜けするほど簡単で驚いた。
追記
複数のインプットの時には、
dataset_train = tf.data.Dataset.from_tensor_slices(((input_1,input_2,input_3),output_1)).shuffle(10).batch(100)
のようにして、モデルを作る時に
input_data_1 = tf.keras.Input(shape=(d_input_1,))
input_data_2 = tf.keras.Input(shape=(d_input_2,))
input_data_3 = tf.keras.Input(shape=(d_input_3,))
model = tf.keras.Model(inputs=[input_data_1,input_data_2,input_data_3], outputs=y)
として、学習の時には、
history = model.fit(dataset_train,epochs=1000, steps_per_epoch=80)
みたいにすればよいようだ。