TensorFlow: model.save()ができなかった
TensorFlowでVAE(Variational Auto Encoder)を作成したところ、動かすことには成功したもののモデルが保存できないという事態に。
モデル
公式のtutorialConvolutional Variational Autoencoder、およびサブクラス化による新しいレイヤーとモデルの作成を参考にしてMNISTデータ用のVAEを作成した。(モデルの構造は適当に変えてます)
環境はGoogle Colabで、TensorFlowのversionは2.3.0。
#tf.__version__ = 2.3.0
import tensorflow as tf
from tensorflow.keras import layers
#潜在変数のsampling
class Sampling(layers.Layer):
def call(self,inputs):
"""
inputs:(batch,hidden_dim)のタプル
"""
mean,logvar = inputs
eps = tf.random.normal(mean.shape)
return mean + eps*tf.exp(0.5*logvar)
#encoder
class Encoder(layers.Layer):
def __init__(self,hidden_dim=32,mid_dim=3,name='encoder',**kwargs):
super(Encoder,self).__init__(name=name,**kwargs)
self.conv1 = layers.Conv2D(filters=mid_dim,kernel_size=3,activation='relu')
self.pool = layers.MaxPool2D(2)
self.flatten = layers.Flatten()
self.dense_mean = layers.Dense(units=hidden_dim)
self.dense_sigma = layers.Dense(units=hidden_dim)
self.sample = Sampling() #自分でサブクラス化したlayer
def call(self,inputs):
x = self.conv1(inputs)
x = self.pool(x)
x = self.flatten(x)
mean = self.dense_mean(x)
var = self.dense_sigma(x)
z_sample = self.sample((mean,var))
return mean,var,z_sample
#Decoder
class Decoder(layers.Layer):
def __init__(self,original_dim=28*28,mid_dim=200,name="decoder",**kwargs):
super(Decoder,self).__init__(name=name,**kwargs)
self.dense1 = layers.Dense(mid_dim,activation='relu')
self.out = layers.Dense(original_dim)
def call(self,inputs,apply_sigmoid=False):
x = self.dense1(inputs)
out = self.out(x)
if apply_sigmoid: #訓練時は使わない。
out = tf.nn.sigmoid(out)
return out
#VAE classを定義する
class VAE(tf.keras.Model):
def __init__(self,hidden_dim,mid_dim_enc=3,mid_dim_dec=200,original_dim=28*28,name="VAE",**kwargs):
super(VAE,self).__init__(name=name,**kwargs)
self.encoder = Encoder(hidden_dim=hidden_dim,mid_dim=mid_dim_enc)
self.decoder = Decoder(original_dim=original_dim,mid_dim=mid_dim_dec)
def call(self,inputs,apply_sigmoid=False):
mean,logvar,z_sample = self.encoder(inputs)
output = self.decoder(z_sample,apply_sigmoid=apply_sigmoid)
#損失計算のためmeanとlogvarも出力する
return output,mean,logvar
結構長くて読みづらいコードになってしまったが、まとめると
・Encoder Layerで元画像(shape=(batch,28,28,1))を受け取り、潜在変数の事後分布パラメータmean,logvarを出力する
・Sampling Layerで、平均値mean,分散exp(logvar)の正規分布から潜在変数をサンプリングする(shape=(batch,hidden_dim))
・Decoder Layerで、サンプリングされた潜在変数を元に画像を再構成する(shape=(batch,28*28))
という流れ。(EncoderとDecoderの構造が全く対称になっていないがまあそこは適当ということで...)VAEについてはAuto-Encoding Variational Bayesを読んで理解しました。
損失関数定義
続いてVAEの損失関数を定義する。この形については上の論文を参照。
#metrics,optimizerを定義
train_loss = tf.keras.metrics.Mean()
test_loss = tf.keras.metrics.Mean()
optimizer = tf.keras.optimizers.Adam()
#損失関数を定義
def compute_loss(model,x):
out,mean,logvar = model(x)
out = tf.reshape(out,x.shape) #(batch,28,28,1) に変換
cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(labels=x,logits=out) #ベルヌーイ分布の対数損失
kl_loss = -0.5*tf.reduce_sum(1+logvar - mean**2 - tf.exp(logvar),axis=1) #KL divergence
loss_ent = tf.reduce_sum(cross_ent,axis=[1,2,3])
return tf.reduce_mean(kl_loss+loss_ent)
#訓練
@tf.function
def train_step(model,images,optimizer):
with tf.GradientTape() as tape:
loss = compute_loss(model,images)
gradient = tape.gradient(loss,model.trainable_variables)
optimizer.apply_gradients(zip(gradient,model.trainable_variables))
train_loss(loss)
学習、評価
道具は出揃ったのでいよいよ学習させる。(今回の記事は学習が目的ではないので、テストデータによる評価はしていません)
#data set
train_ds = tf.data.Dataset.from_tensor_slices(x_train).shuffle(60000).batch(128)
#潜在変数の次元は10
model = VAE(hidden_dim=10)
epochs = 2 #2エポックだけ学習する
for epoch in range(epochs):
for images in train_ds:
train_step(model,images,optimizer)
print("Epoch:{} Loss:{}".format(epoch+1,train_loss.result()))
2エポックしか学習していないのでぼやけた感じだが、ある程度いい感じに再構成できていると思われる。
モデルの保存
随分前置きが長くなってしまったが、ここからが本記事の主題。以上で学習させたモデルをmodel.save(filepath)で保存したい。
model.save('try')
すると...
ValueError: Cannot convert a partially known TensorShape to a Tensor: (None, 10)
となって保存できず。色々と見てみたところ、どうやらSamplingの部分が原因のよう。
解決策1.model.save_weightsを使う
多分これが一番簡単な策。model.save_weigthts(filepath)とすればそのモデルのパラメータを保存できるので、あとは新しくモデルを作成してloadすればいい。
model.save_weights('try') #これはうまく行く
model_new = VAE(hidden_dim=10) #新規モデル作成
model_new.load_weights('try') #学習済みパラメータ読み込み
学習済みのモデルを再利用するだけならこれでいいのでは?という感じがする。
解決策2. Sampling Layerの変更
各レイヤの設計はサブクラス化による新しいレイヤーとモデルの作成を参考にしたが、一か所Sampling Layerの中で変更していた点があった。
#チュートリアルの場合
class Sampling(layers.Layer):
def call(self,inputs):
"""
inputs:(batch,hidden_dim)のタプル
"""
mean,logvar = inputs
batch = tf.shape(mean)[0]
hid = tf.shape(mean)[1]
eps = tf.random.normal(shape=(batch,hid))
return mean + eps*tf.exp(0.5*logvar)
#自分で設計した方
class Sampling(layers.Layer):
def call(self,inputs):
"""
inputs:(batch,hidden_dim)のタプル
"""
mean,logvar = inputs
eps = tf.random.normal(shape=mean.shape)
return mean + eps*tf.exp(0.5*logvar)
チュートリアルではまず目標のテンソルのサイズをbatch,hid という変数に格納してからサンプリングを行っているのに対し、自分はいきなりtf.random.normal(shape=mean.shape)でサンプリングしようとした。実はこの部分が問題であり、チュートリアル通りに変更したところmodel.save()ができるようにできるようになった。
わざわざサイズを取得する必要ないだろ、と思って変えてしまったのが仇となったというわけでした。公式は正義。
よくわからないが、計算グラフの中で新しくテンソルを作る際にはサイズを事前に確保しておかないとダメだということだろうか。
まとめ
とりあえずsave()がうまく行かなかったらsave_weights()をすればモデルの再利用はできるのでそれでいい気がする。前者の保存法の方が後者の保存法より優れている点・望ましいケースって何かあるんでしょうか。