データの次元圧縮手法について調べている時に、後輩が次元圧縮 + クラスタリングを同時に学習するディープラーニングの手法「Deep Embedded Clustering」を調べて教えてくれたのでせっかくだからChainerで実装してみようというのがこの記事です。
実装したコードはGithubで公開しています。
https://github.com/ymym3412/DeepEmbeddedClustering
Deep Embedded Clusteringとは
Deep Embedded Clusteringは「Unsupervised Deep Embedding for Clustering Analysis」という論文の中で提案されているクラスタリングの手法です。
次元圧縮やクラスタリングの手法は他にも以下のようなものがあります。
- k-means、混合ガウスモデル(GMM)
- 高速に動作し、多くの問題に適用しやすい
- 距離メトリクスは元のデータ空間に制限される
- 高次元空間に分布するデータではうまくいかないことが多い
- k-meansの拡張手法
- k-meansを用いたクラスタリングとクラスタ間の分散の最大化を交互に行なうEMアルゴリズム
- 低次元への変換は線形変換のみに制限される
- Spectral Clusteringとそれの拡張手法
- k-meansといった手法よりうまく動作することが多い
- Spectral Clusteringで行なう固有値分解をAutoEncoderに置き換える拡張手法も提案されている
- パフォーマンスは良いが、メモリの消費量が大きい
- グラフラプラシアン行列の計算がデータ点の数に対して二乗、あるいはそれ以上の計算オーダーが必要
- t-SNE
- 元のデータ空間の分布と特徴空間の分布のKLダイバージェンスを最小化する手法
- Deep Embedded Clusteringの基となっている
- DECでは、元のデータ空間の分布の代わりに特徴空間上での分布を使用
Deep Embedded Clusteringは上記の手法と比較して
- ハイパーパラメータの選択に頑健
- 従来手法より高い精度
- 計算時間は$O(nk)$
- データの数とセントロイドの数に比例
といった優位性があります。
学習は大きく分けて2つのステップがあります。
- ニューラルネットの事前学習
- 写像とセントロイドの最適化
1. ニューラルネットの事前学習
ニューラルネットの事前学習にはStacked (Denoising)AutoEncoderを使用します。
コードでは1層をDenoising AutoencoderとしてStacked AutoEncoderをChainListとして定義しています。
class DenoisingAutoEncoder(chainer.Chain):
def __init__(self, input_size, output_size, encoder_activation=True, decoder_activation=True):
w = chainer.initializers.Normal(scale=0.01)
super(DenoisingAutoEncoder, self).__init__()
with self.init_scope():
self.encoder = L.Linear(input_size, output_size, initialW=w)
self.decoder = L.Linear(output_size, input_size, initialW=w)
# encode,decode時の活性化関数の有無を決めるパラメータ
self.encoder_activation = encoder_activation
self.decoder_activation = decoder_activation
def __call__(self, x):
if self.encoder_activation:
h = F.relu(self.encoder(F.dropout(x, 0.2)))
else:
h = self.encoder(F.dropout(x, 0.2))
if self.decoder_activation:
h = F.relu(self.decoder(F.dropout(h, 0.2)))
else:
h = self.decoder(F.dropout(h, 0.2))
return h
def encode(self, x):
if self.encoder_activation:
h = F.relu(self.encoder(x))
else:
h = self.encoder(x)
return h
def decode(self, x):
if self.decoder_activation:
h = F.relu(self.decoder(x))
else:
h = self.decoder(x)
return h
class StackedDenoisingAutoEncoder(chainer.ChainList):
def __init__(self, input_dim):
# 論文に合わせて各層の次元は入力の次元->500->500->2000->10
super(StackedDenoisingAutoEncoder, self).__init__(
DenoisingAutoEncoder(input_dim, 500, decoder_activation=False),
DenoisingAutoEncoder(500, 500),
DenoisingAutoEncoder(500, 2000),
DenoisingAutoEncoder(2000, 10, encoder_activation=False)
)
def __call__(self, x):
# encode
models = []
for dae in self.children():
x = dae.encode(x)
models.append(dae)
# decode
for dae in reversed(models):
x = dae.decode(x)
return x
事前学習も2つのステップに分かれており、1層毎にAutoEncoderとして学習を行う「Layer-Wise Pretrain」とネットワーク全体を学習する「Fine Turing」です。
# Layer-Wise Pretrain
print("Layer-Wise Pretrain")
# 各層を取得しAutoEncoderとして学習する
for i, dae in enumerate(model.children()):
print("Layer {}".format(i+1))
train_tuple = tuple_dataset.TupleDataset(train, train)
train_iter = iterators.SerialIterator(train_tuple, batchsize)
clf = L.Classifier(dae, lossfun=mean_squared_error)
clf.compute_accuracy = False
if chainer.cuda.available and args.gpu >= 0:
clf.to_gpu(gpu_id)
optimizer = optimizers.MomentumSGD(lr=0.1)
optimizer.setup(clf)
updater = training.StandardUpdater(train_iter, optimizer, device=gpu_id)
trainer = training.Trainer(updater, (50000, "iteration"), out="mnist_result")
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['iteration', 'main/loss', 'elapsed_time']))
# 20000iteration毎に学習率(lr)を1/10にする
trainer.extend(ChangeLearningRate(), trigger=(20000, "iteration"))
trainer.run()
# 層毎の学習に合わせて学習データの次元を784->500->500->2000と変換していく
train = dae.encode(train).data
# Finetuning
print("fine tuning")
# Fine Tuning時はDropoutなし
with chainer.using_config("train", False):
train, _ = mnist.get_mnist()
train, _ = convert.concat_examples(train, device=gpu_id)
train_tuple = tuple_dataset.TupleDataset(train, train)
train_iter = iterators.SerialIterator(train_tuple, batchsize)
model = L.Classifier(model, lossfun=mean_squared_error)
model.compute_accuracy = False
if chainer.cuda.available and args.gpu >= 0:
model.to_gpu(gpu_id)
optimizer = optimizers.MomentumSGD(lr=0.1)
optimizer.setup(model)
updater = training.StandardUpdater(train_iter, optimizer, device=gpu_id)
trainer = training.Trainer(updater, (100000, "iteration"), out="mnist_result")
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['iteration', 'main/loss', 'elapsed_time']))
trainer.extend(ChangeLearningRate(), trigger=(20000, "iteration"))
trainer.run()
ここで学習させたニューラルネットのEncode部分が、データの低次元への写像としてそのまま再利用されます。
2. 写像とセントロイドの最適化
Deep Embedded Clusteringで特徴的なのは、「データの低次元への写像」と「クラスターのセントロイド」を同時に学習していくというところです。
元データ$x_i$の低次元への変換にはニューラルネットを使います。$x_i$を低次元に変換したものを$z_i$とし、$z_i$に対してk-meansをかけてクラスタのセントロイド$u_j$を初期化します。$x_i$を$z_i$に変換するニューラルネットのパラメータとクラスタのセントロイド$u_j$が学習対象となります。
パラメータの学習は以下で表されるQとPの2つの分布のKLダイバージェンスを最小化するように行われます。
教師なし学習では交差検証でパラメータの調整ができないため、$\alpha$は1に固定しています。
q_{ij} = \frac{(1 + \|z_i - u_j\|^2 / \alpha)^{-\frac{\alpha+1}{2}}}{\sum_{j^{'}}(1 + \|z_i - u_{j^{'}}\|^2 / \alpha)^{-\frac{\alpha + 1}{2}}}
p_{ij} = \frac{q^2_{ij} / f_j}{\sum_{j^{'}}q^2_{ij^{'}} / f_{j^{'}}}\\
※ただしf_j = \sum_i q_{ij}
学習はクラスタの割り当てが変化するデータの数が0.1%を下回るまで行います。
今回はちょうどいい誤差関数がなかった(と思う)ので自作しました。
class TdistributionKLDivergence(chainer.Function):
def forward(self, inputs):
xp = cuda.get_array_module(*inputs)
z, us = inputs[0], xp.array(inputs[1:], dtype=xp.float32)
# ij成分がziとujのユークリッド距離となる行列
dist_matrix = xp.linalg.norm(xp.vstack(chain.from_iterable(map(lambda v: repeat(v, us.shape[0]), z))) - xp.vstack(repeat(us, z.shape[0])), axis= 1).reshape(z.shape[0], us.shape[0])
q_matrix = (self.tdistribution_kernel(dist_matrix).T / self.tdistribution_kernel(dist_matrix).sum(axis=1)).T
p_matrix = self.compute_pmatrix(q_matrix)
kl_divergence = (p_matrix * (xp.log(p_matrix) - xp.log(q_matrix))).sum()
return xp.array(kl_divergence),
def backward(self, inputs, grad_outputs):
xp = cuda.get_array_module(*inputs)
z, us = inputs[0], xp.array(inputs[1:], dtype=xp.float32)
gloss, = grad_outputs
gloss = gloss / z.shape[0]
# z
norms = xp.vstack(chain.from_iterable(map(lambda v: repeat(v, us.shape[0]), z))) - xp.vstack(repeat(us, z.shape[0]))
# ij成分が(zi-uj)の10次元のベクトル
# shapeは({データ数},{セントロイド数},10)
z_norm_matrix = norms.reshape(z.shape[0], us.shape[0], z.shape[1])
dist_matrix = xp.linalg.norm(norms, axis= 1).reshape(z.shape[0], us.shape[0])
q_matrix = (self.tdistribution_kernel(dist_matrix).T / self.tdistribution_kernel(dist_matrix).sum(axis=1)).T
p_matrix = self.compute_pmatrix(q_matrix)
# ziとujの勾配を計算
gz = 2 * ((((p_matrix - q_matrix) * self.tdistribution_kernel(dist_matrix)) * z_norm_matrix.transpose(2,0,1)).transpose(1,2,0)).sum(axis=1) * gloss
gus = -2 * ((((p_matrix - q_matrix) * self.tdistribution_kernel(dist_matrix)) * z_norm_matrix.transpose(2,0,1)).transpose(1,2,0)).sum(axis=0) * gloss
g = [gz]
g.extend(gus)
grads = tuple(g)
return grads
def tdistribution_kernel(self, norm):
xp = cuda.get_array_module(norm)
return xp.power((1 + norm), -1)
def compute_pmatrix(self, q_matrix):
xp = cuda.get_array_module(q_matrix)
fj = q_matrix.sum(axis=0)
matrix = xp.power(q_matrix, 2) / fj
p_matrix = (matrix.T / matrix.sum(axis=1)).T
return p_matrix
def tdistribution_kl_divergence(z, us):
return TdistributionKLDivergence()(z, *us)
動かしてみる
学習させたモデルを使ってMNISTのクラスタリングを試してみました。
ラベル毎に色を変え、クラスタのセントロイドは黒い三角でプロットしています。
ランダムに500点を抽出してt-SNEで2次元に圧縮しました。
論文中でもMNISTを使った実験が書かれていますが、あそこまできれいには分離できませんでした。
特に「4」「7」「9」はかなりごちゃっとしてしまっています。
きれいに分かれてくれるかは事前学習とセントロイドの初期位置によって多少左右されるような気がしました。
終わりに
今回はChainer2.0を使ってディープラーニングによるクラスタリング手法「Deep Embedded Clustering」を実装しました。
Chainerのバージョンが2に上がってから触れていなかったので、いい勉強になりました。
計算時間はGPUを使えば事前学習~写像&セントロイドの最適化まで40分ほどでできるので、それほど負担にならないように感じました。
最後に面白い手法を紹介してくれた後輩に感謝です。
参考文献
Unsupervised Deep Embedding for Clustering Analysis
【ICML読み会】Unsupervised Deep Embedding for Clustering Analysis
Chainer: ビギナー向けチュートリアル Vol.1
Chainer で Stacked Auto-Encoder を試してみた
Chainer Docs-Define your own function