Help us understand the problem. What is going on with this article?

KerasでVAT(Virtual Adversarial Training)を使ってMNISTをやってみる

More than 1 year has passed since last update.

はじめに

注: [改良版]KerasでVAT(Virtual Adversarial Training)を使ってMNISTをやってみる の方がマシな実装だと思うので参考にしてください。

遅まきながら、VAT(Virtual Adversarial Training)という学習方法を知ったのですが、Kerasでの実装が見つからなかったので実装してみました。

VATは簡単にいうと、「通常の入力X→出力Y」と「なるべく結果が異なるように入力Xに微小なノイズdを入力に加えた入力(X+d)→出力Y'」から「KL-Divergence(Y, Y')」を損失関数に余分に加えて学習をする手法です。

これだけだと何言ってるかわからないと思うので、詳しくは元の論文か、この方の解説をご覧になると良いかと思います。

VATは学習における位置づけとしては「正則化」に近いという話で、DropoutやNoiseを加える代わりになる可能性があります。Dropoutとかのパラメータを調整するのも面倒なので、VATで代用できると嬉しい気がします。

Kerasだとコスト関数や正則化関数に入力Xを使うようにするのが少し厄介なのですが、そこさえなんとかなれば、ChainerやTheanoでの実装があるので移植すればOKです。

Version

  • Python: 3.5.3
  • Keras: 1.2.2
  • Theano: 0.8.2

実装

こんな感じになりました。

ポイントは、

  • VATの計算は損失関数にだけ入っているので、通常の推論には影響しない
  • trainingするときに、教師信号であるy_trueX_trainもくっつけて渡すことで、損失関数に対して入力Xを渡している
  • 損失関数でVATの計算をするときに、入力→出力の計算を再度行うので Containerを使ってその計算を再利用できるようにしている
  • K.gradients()の後にK.stop_gradient()しないと、学習していくうちにLossがnanになるので注意(これにハマった...)

というところかと思います。

keras_mnist_vat.py
# coding: utf8
"""
* VAT: https://arxiv.org/abs/1507.00677

# 参考にしたCode
Original: https://github.com/fchollet/keras/blob/master/examples/mnist_cnn.py
VAT: https://github.com/musyoku/vat/blob/master/vat.py

# Result Example
use_dropout=False, use_vat=False: score=0.211949993095, accuracy=0.9877
use_dropout=True, use_vat=False: score=0.238920686956, accuracy=0.9853
use_dropout=False, use_vat=True: score=0.180048364889, accuracy=0.9916
use_dropout=True, use_vat=True: score=0.245401585515, accuracy=0.9901
"""

import numpy as np
from keras.engine.topology import Input, Container
from keras.engine.training import Model

np.random.seed(1337)  # for reproducibility

from keras.datasets import mnist
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Convolution2D, MaxPooling2D
from keras.utils import np_utils
from keras import backend as K

SAMPLE_SIZE = 0

batch_size = 128
nb_classes = 10
nb_epoch = 12

# input image dimensions
img_rows, img_cols = 28, 28
# number of convolutional filters to use
nb_filters = 32
# size of pooling area for max pooling
pool_size = (2, 2)
# convolution kernel size
kernel_size = (3, 3)


def main(data, use_dropout, use_vat):
    # the data, shuffled and split between train and test sets
    (X_train, y_train), (X_test, y_test) = data

    if K.image_dim_ordering() == 'th':
        X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
        X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)
        input_shape = (1, img_rows, img_cols)
    else:
        X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
        X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')
    X_train /= 255.
    X_test /= 255.

    # convert class vectors to binary class matrices
    y_train = np_utils.to_categorical(y_train, nb_classes)
    y_test = np_utils.to_categorical(y_test, nb_classes)

    if SAMPLE_SIZE:
        X_train = X_train[:SAMPLE_SIZE]
        y_train = y_train[:SAMPLE_SIZE]
        X_test = X_test[:SAMPLE_SIZE]
        y_test = y_test[:SAMPLE_SIZE]

    my_model = MyModel(input_shape, use_dropout).build()
    my_model.training(X_train, y_train, X_test, y_test, use_vat=use_vat)

    score = my_model.model.evaluate(X_test, y_test, verbose=0)
    print("use_dropout=%s, use_vat=%s: score=%s, accuracy=%s" % (use_dropout, use_vat, score[0], score[1]))


class MyModel:
    model = None
    core_layers = None

    def __init__(self, input_shape, use_dropout=True):
        self.input_shape = input_shape
        self.use_dropout = use_dropout

    def build(self):
        input_layer = Input(self.input_shape)
        output_layer = self.core_data_flow(input_layer)
        self.model = Model(input_layer, output_layer)
        return self

    def core_data_flow(self, input_layer):
        x = Convolution2D(nb_filters, kernel_size[0], kernel_size[1], border_mode='valid')(input_layer)
        x = Activation('relu')(x)
        x = Convolution2D(nb_filters, kernel_size[0], kernel_size[1])(x)
        x = Activation('relu')(x)
        x = MaxPooling2D(pool_size=pool_size)(x)
        if self.use_dropout:
            x = Dropout(0.25)(x)

        x = Flatten()(x)
        x = Dense(128, activation="relu")(x)
        if self.use_dropout:
            x = Dropout(0.5)(x)
        x = Dense(nb_classes, activation='softmax')(x)
        self.core_layers = Container(input_layer, x)
        return x

    def training(self, X_train, y_train, X_test, y_test, use_vat=False):
        orig_loss_func = loss_func = K.categorical_crossentropy
        if use_vat:
            # y_true に concat(y_true, x_train) を取る変則的なLossFunctionを使います
            loss_func = self.loss_with_vat_loss(loss_func)
            self.model.compile(loss=loss_func, optimizer='adadelta', metrics=['accuracy'])
            # train, test ともに y, X を横に連結したデータを作ります
            yX_train = np.concatenate((y_train, X_train.reshape((X_train.shape[0], -1))), axis=1)
            yX_test = np.concatenate((y_test, X_test.reshape((X_test.shape[0], -1))), axis=1)

            # 普通に学習
            self.model.fit(X_train, yX_train, batch_size=batch_size, nb_epoch=nb_epoch,
                           verbose=1, validation_data=(X_test, yX_test))
            # 変則的なLossFunctionを与えたので、普通のLossFunctionに変更して再びcompileしないと、evaluate()が失敗します
            self.model.compile(loss=orig_loss_func, optimizer='adadelta', metrics=['accuracy'])
        else:
            self.model.compile(loss=loss_func, optimizer='adadelta', metrics=['accuracy'])
            self.model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch,
                           verbose=1, validation_data=(X_test, y_test))

    def loss_with_vat_loss(self, original_loss_func, eps=1, xi=10, ip=1):
        def with_vat_loss(yX_train, y_pred):
            nb_output_classes = y_pred.shape[1]
            y_true = yX_train[:, :nb_output_classes]

            # VAT
            X_train = yX_train[:, nb_output_classes:].reshape((-1, ) + self.input_shape)
            d = K.random_normal(X_train.shape)

            for _ in range(ip):
                y = self.core_layers(X_train + self.normalize_vector(d) * xi)
                kld = K.sum(self.kld(y_pred, y))
                d = K.stop_gradient(K.gradients(kld, [d])[0])  # stop_gradient is important!!

            y_perturbation = self.core_layers(X_train + self.normalize_vector(d)*eps)
            kld = self.kld(y_pred, y_perturbation)
            return original_loss_func(y_pred, y_true) + kld
        return with_vat_loss

    @staticmethod
    def normalize_vector(x):
        z = K.sum(K.batch_flatten(K.square(x)), axis=1)
        while K.ndim(z) < K.ndim(x):
            z = K.expand_dims(z, dim=-1)
        return x / (K.sqrt(z) + K.epsilon())

    @staticmethod
    def kld(p, q):
        v = p * (K.log(p + K.epsilon()) - K.log(q + K.epsilon()))
        return K.sum(K.batch_flatten(v), axis=1, keepdims=True)


data = mnist.load_data()
main(data, use_dropout=False, use_vat=False)
main(data, use_dropout=True, use_vat=False)
main(data, use_dropout=False, use_vat=True)
main(data, use_dropout=True, use_vat=True)

実験結果

Dropoutあり・なし、VATあり・なしの4パターンで実験してみました。
1 epochの時間は GeForce GTX 1080で、環境変数を

KERAS_BACKEND=theano 
THEANO_FLAGS=device=gpu,floatX=float32,lib.cnmem=1

として実行したときのものです。

Dropout VAT Accuracy 1 epochの時間
使わない 使わない 98.77% 8秒
使う 使わない 98.53% 8秒
使わない 使う 99.16% 18秒
使う 使う 99.01% 19秒

まあ、そこそこ良い結果ですので、だいたい実装的にも問題ないのではないかと思っています。
VATは1batchで計算を2回行うので実行時間も約2倍になっていますね。

さいごに

VATは教師なし学習にも使えるので、応用範囲の広い学習方法です。
いろいろ活用してみたいです。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away