はじめに
注: [改良版]KerasでVAT(Virtual Adversarial Training)を使ってMNISTをやってみる の方がマシな実装だと思うので参考にしてください。
遅まきながら、VAT(Virtual Adversarial Training)という学習方法を知ったのですが、Kerasでの実装が見つからなかったので実装してみました。
VATは簡単にいうと、「通常の入力X→出力Y」と「なるべく結果が異なるように入力Xに微小なノイズdを入力に加えた入力(X+d)→出力Y'」から「KL-Divergence(Y, Y')」を損失関数に余分に加えて学習をする手法です。
これだけだと何言ってるかわからないと思うので、詳しくは元の論文か、この方の解説をご覧になると良いかと思います。
VATは学習における位置づけとしては「正則化」に近いという話で、DropoutやNoiseを加える代わりになる可能性があります。Dropoutとかのパラメータを調整するのも面倒なので、VATで代用できると嬉しい気がします。
Kerasだとコスト関数や正則化関数に入力Xを使うようにするのが少し厄介なのですが、そこさえなんとかなれば、ChainerやTheanoでの実装があるので移植すればOKです。
Version
- Python: 3.5.3
- Keras: 1.2.2
- Theano: 0.8.2
実装
こんな感じになりました。
ポイントは、
- VATの計算は損失関数にだけ入っているので、通常の推論には影響しない
- trainingするときに、教師信号である
y_true
にX_train
もくっつけて渡すことで、損失関数に対して入力Xを渡している - 損失関数でVATの計算をするときに、入力→出力の計算を再度行うので
Container
を使ってその計算を再利用できるようにしている -
K.gradients()
の後にK.stop_gradient()
しないと、学習していくうちにLossがnan
になるので注意(これにハマった...)
というところかと思います。
# coding: utf8
"""
* VAT: https://arxiv.org/abs/1507.00677
# 参考にしたCode
Original: https://github.com/fchollet/keras/blob/master/examples/mnist_cnn.py
VAT: https://github.com/musyoku/vat/blob/master/vat.py
# Result Example
use_dropout=False, use_vat=False: score=0.211949993095, accuracy=0.9877
use_dropout=True, use_vat=False: score=0.238920686956, accuracy=0.9853
use_dropout=False, use_vat=True: score=0.180048364889, accuracy=0.9916
use_dropout=True, use_vat=True: score=0.245401585515, accuracy=0.9901
"""
import numpy as np
from keras.engine.topology import Input, Container
from keras.engine.training import Model
np.random.seed(1337) # for reproducibility
from keras.datasets import mnist
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Convolution2D, MaxPooling2D
from keras.utils import np_utils
from keras import backend as K
SAMPLE_SIZE = 0
batch_size = 128
nb_classes = 10
nb_epoch = 12
# input image dimensions
img_rows, img_cols = 28, 28
# number of convolutional filters to use
nb_filters = 32
# size of pooling area for max pooling
pool_size = (2, 2)
# convolution kernel size
kernel_size = (3, 3)
def main(data, use_dropout, use_vat):
# the data, shuffled and split between train and test sets
(X_train, y_train), (X_test, y_test) = data
if K.image_dim_ordering() == 'th':
X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255.
X_test /= 255.
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)
if SAMPLE_SIZE:
X_train = X_train[:SAMPLE_SIZE]
y_train = y_train[:SAMPLE_SIZE]
X_test = X_test[:SAMPLE_SIZE]
y_test = y_test[:SAMPLE_SIZE]
my_model = MyModel(input_shape, use_dropout).build()
my_model.training(X_train, y_train, X_test, y_test, use_vat=use_vat)
score = my_model.model.evaluate(X_test, y_test, verbose=0)
print("use_dropout=%s, use_vat=%s: score=%s, accuracy=%s" % (use_dropout, use_vat, score[0], score[1]))
class MyModel:
model = None
core_layers = None
def __init__(self, input_shape, use_dropout=True):
self.input_shape = input_shape
self.use_dropout = use_dropout
def build(self):
input_layer = Input(self.input_shape)
output_layer = self.core_data_flow(input_layer)
self.model = Model(input_layer, output_layer)
return self
def core_data_flow(self, input_layer):
x = Convolution2D(nb_filters, kernel_size[0], kernel_size[1], border_mode='valid')(input_layer)
x = Activation('relu')(x)
x = Convolution2D(nb_filters, kernel_size[0], kernel_size[1])(x)
x = Activation('relu')(x)
x = MaxPooling2D(pool_size=pool_size)(x)
if self.use_dropout:
x = Dropout(0.25)(x)
x = Flatten()(x)
x = Dense(128, activation="relu")(x)
if self.use_dropout:
x = Dropout(0.5)(x)
x = Dense(nb_classes, activation='softmax')(x)
self.core_layers = Container(input_layer, x)
return x
def training(self, X_train, y_train, X_test, y_test, use_vat=False):
orig_loss_func = loss_func = K.categorical_crossentropy
if use_vat:
# y_true に concat(y_true, x_train) を取る変則的なLossFunctionを使います
loss_func = self.loss_with_vat_loss(loss_func)
self.model.compile(loss=loss_func, optimizer='adadelta', metrics=['accuracy'])
# train, test ともに y, X を横に連結したデータを作ります
yX_train = np.concatenate((y_train, X_train.reshape((X_train.shape[0], -1))), axis=1)
yX_test = np.concatenate((y_test, X_test.reshape((X_test.shape[0], -1))), axis=1)
# 普通に学習
self.model.fit(X_train, yX_train, batch_size=batch_size, nb_epoch=nb_epoch,
verbose=1, validation_data=(X_test, yX_test))
# 変則的なLossFunctionを与えたので、普通のLossFunctionに変更して再びcompileしないと、evaluate()が失敗します
self.model.compile(loss=orig_loss_func, optimizer='adadelta', metrics=['accuracy'])
else:
self.model.compile(loss=loss_func, optimizer='adadelta', metrics=['accuracy'])
self.model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch,
verbose=1, validation_data=(X_test, y_test))
def loss_with_vat_loss(self, original_loss_func, eps=1, xi=10, ip=1):
def with_vat_loss(yX_train, y_pred):
nb_output_classes = y_pred.shape[1]
y_true = yX_train[:, :nb_output_classes]
# VAT
X_train = yX_train[:, nb_output_classes:].reshape((-1, ) + self.input_shape)
d = K.random_normal(X_train.shape)
for _ in range(ip):
y = self.core_layers(X_train + self.normalize_vector(d) * xi)
kld = K.sum(self.kld(y_pred, y))
d = K.stop_gradient(K.gradients(kld, [d])[0]) # stop_gradient is important!!
y_perturbation = self.core_layers(X_train + self.normalize_vector(d)*eps)
kld = self.kld(y_pred, y_perturbation)
return original_loss_func(y_pred, y_true) + kld
return with_vat_loss
@staticmethod
def normalize_vector(x):
z = K.sum(K.batch_flatten(K.square(x)), axis=1)
while K.ndim(z) < K.ndim(x):
z = K.expand_dims(z, dim=-1)
return x / (K.sqrt(z) + K.epsilon())
@staticmethod
def kld(p, q):
v = p * (K.log(p + K.epsilon()) - K.log(q + K.epsilon()))
return K.sum(K.batch_flatten(v), axis=1, keepdims=True)
data = mnist.load_data()
main(data, use_dropout=False, use_vat=False)
main(data, use_dropout=True, use_vat=False)
main(data, use_dropout=False, use_vat=True)
main(data, use_dropout=True, use_vat=True)
実験結果
Dropoutあり・なし、VATあり・なしの4パターンで実験してみました。
1 epochの時間は GeForce GTX 1080で、環境変数を
KERAS_BACKEND=theano
THEANO_FLAGS=device=gpu,floatX=float32,lib.cnmem=1
として実行したときのものです。
Dropout | VAT | Accuracy | 1 epochの時間 |
---|---|---|---|
使わない | 使わない | 98.77% | 8秒 |
使う | 使わない | 98.53% | 8秒 |
使わない | 使う | 99.16% | 18秒 |
使う | 使う | 99.01% | 19秒 |
まあ、そこそこ良い結果ですので、だいたい実装的にも問題ないのではないかと思っています。
VATは1batchで計算を2回行うので実行時間も約2倍になっていますね。
さいごに
VATは教師なし学習にも使えるので、応用範囲の広い学習方法です。
いろいろ活用してみたいです。