LoginSignup
5
8

More than 3 years have passed since last update.

Efficient GANをkerasで実装

Posted at

概要

業務で異常検知手法を調査していた時にEfficientGANという手法を見つけたのですが、著者のソースコードはライブラリのバージョンが記載なく実行に手こずったので、勉強も兼ねてkerasで実装してみました。
なお、実装したのはテーブルデータ用のネットワークのみで、推論時のロス計算の内”feature-matching loss”は未実装です。
※EfficientGANについてはこの記事で解説しませんが、分かりやすい解説記事が有りましたので、以下に記載しておきます。

ソースコード:https://github.com/asm94/EfficientGAN

↓参考にしたもの
元論文      :https://arxiv.org/abs/1802.06222
著者のソースコード:https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection
解説記事     :https://qiita.com/masataka46/items/49dba2790fa59c29126b

実行環境

・Windows10 64bit
・Python 3.8.3
・numpy 1.18.5
・tensorflow 2.3.1
・scikit-learn 0.22.2

実装

1.全体構成(クラス)

今回、EfficientGANのネットワークと学習および推論機能を、一つのクラスとして定義しました。
全体像は以下の通りです。個々の関数は後述します。

class EfficientGAN(object):
    def __init__(self, input_dim=0, latent_dim=32):
        self.input_dim = int(input_dim)
        self.latent_dim = int(latent_dim)

    #Train model
    def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
            optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
            verbose=1):        
        #後述

    #Test model
    def predict(self, X_test, weight=0.9, degree=1):        
        #後述  

    ##Encoder
    def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

    ##Generator
    def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

    ##Discriminator
    def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

2.ネットワーク

論文を参考に以下の様に実装しました。
・”input_dim”は、論文では使用したデータの次元数である121になっていますが、可変設定出来るように変更しています。
・Discriminatorの出力層の活性化関数は、論文ではlinear(線形)ですが、著者のソースコードを見ると、ロス計算時にシグモイド関数で変換しているため、今回はネットワークの方に組み込みました。

##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.input_dim,), name='input')
    net = inputs
    net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)

    return Model(inputs=inputs, outputs=outputs, name='Encoder')

##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.latent_dim,), name='input')
    net = inputs
    net = Dense(64, activation='relu', kernel_initializer=initializer,
                name='layer_1')(net)
    net = Dense(128, activation='relu', kernel_initializer=initializer,
                name='layer_2')(net)
    outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)

    return Model(inputs=inputs, outputs=outputs, name='Generator')

##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
    #D(x)
    inputs1 = Input(shape=(self.input_dim,), name='real')
    net = inputs1
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    dx = Dropout(.2)(net)

    #D(z)
    inputs2 = Input(shape=(self.latent_dim,), name='noise')
    net = inputs2
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_2')(net)
    dz = Dropout(.2)(net)

    #D(x) と D(z) を結合
    conet = Concatenate(axis=1)([dx,dz])

    #D(x,z)
    conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                  name='layer_3')(conet)
    conet = Dropout(.2)(conet)
    outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
                    name='output')(conet)

    return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')

3.モデル学習

論文を参考に以下の様に実装しました。
・著者のソースコードでは、ロス計算時の直前にシグモイド関数で変換していますが、項2に記載の通りシグモイド関数による変換はネットワークに組み込んだので、ここでは変換しません。
・Discriminator等の各部分モデルは、EfficientGANクラスの定義時では無く学習時に定義され、また、入力の次元数が未定義なら、このタイミングで学習データの次元数が入力の次元数に設定されます。

#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
        verbose=1):

    #学習データをnumpy型に変換
    X_train = np.array(X_train)

    #"input_dim"が1以上で無いなら(未定義想定)、学習データの次元数を設定
    if not self.input_dim >= 1: self.input_dim = X_train.shape[1]

    #Discriminatorモデル定義
    self.dis = self.get_discriminator()
    self.dis.compile(loss=loss, optimizer=optimizer)        

    #Encoder学習用のモデル定義(Encoder → Discriminator)
    self.enc = self.get_encoder()
    x = Input(shape=(self.input_dim,))
    z_gen = self.enc(x)
    valid = self.dis([x, z_gen])
    enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
    enc_dis.compile(loss=loss, optimizer=optimizer) 

    #Generator学習用のモデル定義(Generator → Discriminator)
    self.gen = self.get_generator()
    z = Input(shape=(self.latent_dim,))
    x_gen = self.gen(z)
    valid = self.dis([x_gen, z])
    gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
    gen_dis.compile(loss=loss, optimizer=optimizer)          

    #Training
    min_val_loss = float('inf')
    stop_count = 0
    for i in range(epochs):    
        #Discriminatorを学習機能をオンに
        self.dis.trainable = True

        #学習データから、"batch_size"の半数をランダムに取得
        idx = np.random.randint(0, X_train.shape[0], batch_size//2)
        real_data = X_train[idx]

        #"batch_size"の半数だけノイズを生成し、各生成ノイズからデータ生成
        noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
        gen_data = self.gen.predict(noise)

        #取得した各学習データから、ノイズを生成
        enc_noise = self.enc.predict(real_data)

        #Discriminator学習
        d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
        d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
        d_loss = d_enc_loss + d_gen_loss

        #Discriminatorの学習機能をオフに
        self.dis.trainable = False

        #Encoder学習
        e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))

        #Generator学習
        g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))

        #評価用データの設定有れば、当該データのloss計算とearly stop検討
        if len(test)>0:
            #評価用データ取得
            X_test = test[0]
            y_true = test[1]

            #評価用データの推論
            proba = self.predict(X_test)
            proba = minmax_scale(proba)

            #loss計算
            val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()

            #評価用データのlossが今までより減衰していれば、最小lossを更新し、early stopカウントをリセット
            if min_val_loss > val_loss:                                        
                min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
                stop_count = 0 #Change "stop_count" to 0
            #If "stop_count" is equal or more than "early_stop_num", training is end
            #指定回数の間で、評価用データのlossが減衰しなければ、学習ストップ
            elif stop_count >= early_stop_num:
                break
            else:
                stop_count += 1               

        #学習状況の表示
        if verbose==1 and i%100==0:
            if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}')
            else: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}  val_loss:{val_loss}')

4.モデル推論(異常検知)

論文を参考に以下の様に実装しました。
異常スコアは論文の通り、下記式にて算出しています(高いほど異常)。

A(x)=αL_G(x)+(1-α)L_D(x)・・・Anomaly Score
L_G(x)=||x-G(E(x))||_1・・・Generator Loss
L_D(x)=σ(D(x,E(x)),1)・・・Discriminator Loss

ちなみに、著者のソースコードではDiscriminatorLossが以下の様になっており、論文の内容とどちらを取ればよいか迷いましたが、今回は論文の通り上記式で実装しました。

L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):

    #評価用データをnumpy型に変換
    X_test = np.array(X_test)

    #評価用データからノイズ生成
    z_gen = self.enc.predict(X_test)

    #評価用データから生成したノイズで、再度データを生成
    reconstructs = self.gen.predict(z_gen)

    #元のデータと再度生成したデータの差を各説明変数毎に計算しそれらを合算       
    #学習データと同様のデータであれば、上手く学習出来たEncoderとGeneratorで、入力データを再生成できるはず。
    delta = X_test - reconstructs
    gen_score = tf.norm(delta, ord=degree, axis=1).numpy()

    #DiscriminatorでEncoderの入出力を推論
    l_encoder = self.dis.predict([X_test, z_gen])

    #上記推論結果と、全て1の配列とのクロスエントロピーを算出       
    #学習データと同様のデータであれば、Encoderの入出力をDiscriminatorで推論した結果は1となるはず
    dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()

    #Return anomality calculated "gen_score" and "dis_score"
    return weight*gen_score + (1-weight)*dis_score

 

以上、ご閲覧頂きありがとうございました。
何か気になる点などあれば、ご指摘頂けると幸いです。

5
8
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
8