#概要
業務で異常検知手法を調査していた時にEfficientGANという手法を見つけたのですが、著者のソースコードはライブラリのバージョンが記載なく実行に手こずったので、勉強も兼ねてkerasで実装してみました。
なお、実装したのはテーブルデータ用のネットワークのみで、推論時のロス計算の内”feature-matching loss”は未実装です。
※EfficientGANについてはこの記事で解説しませんが、分かりやすい解説記事が有りましたので、以下に記載しておきます。
ソースコード:https://github.com/asm94/EfficientGAN
↓参考にしたもの
元論文 :https://arxiv.org/abs/1802.06222
著者のソースコード:https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection
解説記事 :https://qiita.com/masataka46/items/49dba2790fa59c29126b
#実行環境
・Windows10 64bit
・Python 3.8.3
・numpy 1.18.5
・tensorflow 2.3.1
・scikit-learn 0.22.2
#実装
##1.全体構成(クラス)
今回、EfficientGANのネットワークと学習および推論機能を、一つのクラスとして定義しました。
全体像は以下の通りです。個々の関数は後述します。
class EfficientGAN(object):
def __init__(self, input_dim=0, latent_dim=32):
self.input_dim = int(input_dim)
self.latent_dim = int(latent_dim)
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#後述
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#後述
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
#後述
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
#後述
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#後述
##2.ネットワーク
論文を参考に以下の様に実装しました。
・”input_dim”は、論文では使用したデータの次元数である121になっていますが、可変設定出来るように変更しています。
・Discriminatorの出力層の活性化関数は、論文ではlinear(線形)ですが、著者のソースコードを見ると、ロス計算時にシグモイド関数で変換しているため、今回はネットワークの方に組み込みました。
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.input_dim,), name='input')
net = inputs
net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Encoder')
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.latent_dim,), name='input')
net = inputs
net = Dense(64, activation='relu', kernel_initializer=initializer,
name='layer_1')(net)
net = Dense(128, activation='relu', kernel_initializer=initializer,
name='layer_2')(net)
outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Generator')
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#D(x)
inputs1 = Input(shape=(self.input_dim,), name='real')
net = inputs1
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
dx = Dropout(.2)(net)
#D(z)
inputs2 = Input(shape=(self.latent_dim,), name='noise')
net = inputs2
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_2')(net)
dz = Dropout(.2)(net)
#D(x) と D(z) を結合
conet = Concatenate(axis=1)([dx,dz])
#D(x,z)
conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_3')(conet)
conet = Dropout(.2)(conet)
outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
name='output')(conet)
return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')
##3.モデル学習
論文を参考に以下の様に実装しました。
・著者のソースコードでは、ロス計算時の直前にシグモイド関数で変換していますが、項2に記載の通りシグモイド関数による変換はネットワークに組み込んだので、ここでは変換しません。
・Discriminator等の各部分モデルは、EfficientGANクラスの定義時では無く学習時に定義され、また、入力の次元数が未定義なら、このタイミングで学習データの次元数が入力の次元数に設定されます。
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#学習データをnumpy型に変換
X_train = np.array(X_train)
#"input_dim"が1以上で無いなら(未定義想定)、学習データの次元数を設定
if not self.input_dim >= 1: self.input_dim = X_train.shape[1]
#Discriminatorモデル定義
self.dis = self.get_discriminator()
self.dis.compile(loss=loss, optimizer=optimizer)
#Encoder学習用のモデル定義(Encoder → Discriminator)
self.enc = self.get_encoder()
x = Input(shape=(self.input_dim,))
z_gen = self.enc(x)
valid = self.dis([x, z_gen])
enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
enc_dis.compile(loss=loss, optimizer=optimizer)
#Generator学習用のモデル定義(Generator → Discriminator)
self.gen = self.get_generator()
z = Input(shape=(self.latent_dim,))
x_gen = self.gen(z)
valid = self.dis([x_gen, z])
gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
gen_dis.compile(loss=loss, optimizer=optimizer)
#Training
min_val_loss = float('inf')
stop_count = 0
for i in range(epochs):
#Discriminatorを学習機能をオンに
self.dis.trainable = True
#学習データから、"batch_size"の半数をランダムに取得
idx = np.random.randint(0, X_train.shape[0], batch_size//2)
real_data = X_train[idx]
#"batch_size"の半数だけノイズを生成し、各生成ノイズからデータ生成
noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
gen_data = self.gen.predict(noise)
#取得した各学習データから、ノイズを生成
enc_noise = self.enc.predict(real_data)
#Discriminator学習
d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
d_loss = d_enc_loss + d_gen_loss
#Discriminatorの学習機能をオフに
self.dis.trainable = False
#Encoder学習
e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))
#Generator学習
g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))
#評価用データの設定有れば、当該データのloss計算とearly stop検討
if len(test)>0:
#評価用データ取得
X_test = test[0]
y_true = test[1]
#評価用データの推論
proba = self.predict(X_test)
proba = minmax_scale(proba)
#loss計算
val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()
#評価用データのlossが今までより減衰していれば、最小lossを更新し、early stopカウントをリセット
if min_val_loss > val_loss:
min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
stop_count = 0 #Change "stop_count" to 0
#If "stop_count" is equal or more than "early_stop_num", training is end
#指定回数の間で、評価用データのlossが減衰しなければ、学習ストップ
elif stop_count >= early_stop_num:
break
else:
stop_count += 1
#学習状況の表示
if verbose==1 and i%100==0:
if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss}')
else: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss} val_loss:{val_loss}')
##4.モデル推論(異常検知)
論文を参考に以下の様に実装しました。
異常スコアは論文の通り、下記式にて算出しています(高いほど異常)。
A(x)=αL_G(x)+(1-α)L_D(x)・・・Anomaly Score
L_G(x)=||x-G(E(x))||_1・・・Generator Loss
L_D(x)=σ(D(x,E(x)),1)・・・Discriminator Loss
ちなみに、著者のソースコードではDiscriminatorLossが以下の様になっており、論文の内容とどちらを取ればよいか迷いましたが、今回は論文の通り上記式で実装しました。
L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#評価用データをnumpy型に変換
X_test = np.array(X_test)
#評価用データからノイズ生成
z_gen = self.enc.predict(X_test)
#評価用データから生成したノイズで、再度データを生成
reconstructs = self.gen.predict(z_gen)
#元のデータと再度生成したデータの差を各説明変数毎に計算しそれらを合算
#学習データと同様のデータであれば、上手く学習出来たEncoderとGeneratorで、入力データを再生成できるはず。
delta = X_test - reconstructs
gen_score = tf.norm(delta, ord=degree, axis=1).numpy()
#DiscriminatorでEncoderの入出力を推論
l_encoder = self.dis.predict([X_test, z_gen])
#上記推論結果と、全て1の配列とのクロスエントロピーを算出
#学習データと同様のデータであれば、Encoderの入出力をDiscriminatorで推論した結果は1となるはず
dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()
#Return anomality calculated "gen_score" and "dis_score"
return weight*gen_score + (1-weight)*dis_score
#
以上、ご閲覧頂きありがとうございました。
何か気になる点などあれば、ご指摘頂けると幸いです。