chainerでRBMを実装してみる。

  • 44
    いいね
  • 4
    コメント
この記事は最終更新日から1年以上が経過しています。

0. はじめに

RBMはDeep Learningで多層化する場合に、事前学習として用いられる手法です。
ソースコードはtheanoで公開されています。
http://deeplearning.net/tutorial/rbm.html
今回はこのコードをchainerに移植してみました。

RBMの場合、誤差関数はFree Energyの関数となり複雑(?)ですが
逆誤差伝搬のため重み行列、バイアスの変数で微分すると綺麗な計算式となり、
Contrastive Divergence(CD)処理の値から計算できます。
chainerで実装する場合、わざわざ誤差関数(loss)を作成して逆誤差伝搬(loss.backward)するよりも、
単純にcupy(numpy+GPU)で実装したほうが単純となりますが、あえて誤差関数を使って実装してみました。
cupy(numpy+GPU)だけのソースコードはまた機会がある場合に公開します。
といっても以下のソースコードもCDの処理はcupyを使って高速化してますが。。。

# -*- coding: utf-8 -*-

import os,sys
import argparse
import time

import numpy as np

import chainer
from chainer import computational_graph
from chainer import cuda, Variable

import chainer.functions as F
import chainer.links as L
import chainer.optimizers as O


import pickle
import gzip

parser = argparse.ArgumentParser()
parser.add_argument('--gpu', '-g', default=-1, type=int,
                    help='GPU ID (negative value indicates CPU)')
parser.add_argument('--pcd', '-p', default=1, type=int,
                    help='pcd_flag')
parser.add_argument('--kcd', '-k', default=1, type=int,
                    help='cd-k')


''' GPUで計算する時は、cupy = numpy + GPUで処理する。 '''
args = parser.parse_args()
if args.gpu >= 0:
    cuda.check_cuda_available()
xp = cuda.cupy if args.gpu >= 0 else np

def sigmoid(x):
    return 1. / (1 + xp.exp(-x))

class RBM(chainer.Chain):
    '''
    n_visbile:可視層の次元
    n_hidden:隠れ層の次元
    l.W:重み行列
    l.b:hbias
    l.a:vbaias
    real: 可視層が0,1か実数なのかで場合分け。 初期層は実数を想定しているが、中間層は0,1しか考えていない。
    chainerで h = l(v)とかけるように一般的なRBMに出てくる重み行列を転置している。
    '''
    def __init__(self,n_visible,n_hidden,real=0,k=1,pcd_flag=0):
        super(RBM,self).__init__(
            l=L.Linear(n_visible,n_hidden),
            # chainer 1.5のmanualにこの使い方は非推奨とあったのでadd_param使用。
#            a=L.Parameter(xp.zeros(n_visible,dtype=xp.float32)),
        )
        self.l.add_param("a",(n_visible),dtype=xp.float32)

        ''' l.aを初期化、chainerの使用で初期化しないと学習しない? '''
        self.l.a.data.fill(0)

        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.real = real
        self.k = k
        self.pcd_flag = pcd_flag

    def __call__(self,v_data,v_prev_data=None):

        batch_size = v_data.shape[0]

        if self.pcd_flag == 0:
            v_prev_data = v_data
        elif self.pcd_flag ==1 and v_prev_data is None:
            v_prev_data = v_data
        vh_data = self.constrastive_divergence(v_prev_data,self.k)
        v = Variable(v_data)
        vh = Variable(vh_data.astype(np.float32))

        '''
        http://deeplearning.net/tutorial/rbm.htmlの式(5)から
        誤差関数(loss)は(可視層のデータのfree energy)と(可視層のデータのCD-kのfree energy)の差となる。
        loss = (self.free_energy(v) - self.free_energy(vh)) / batch_size
        v->vhへの写像は学習に入れないため、CD-kの処理完了後にVariable化
        '''

        loss = (self.free_energy(v) - self.free_energy(vh)) / batch_size
        return loss

    def free_energy(self,v):
        ''' Function to compute the free energy '''
        '''
        入力値vは必ずVariable化
        本来行単位でSUMを取ってから列単位(バッチ数)でSUMを取るべき処理だけど
        結局SUMを取るので一括でSUM
        '''
        batch_size = v.data.shape[0]
        n_visible = self.n_visible
        real = self.real
        if real == 0:
            '''
            可視層が[0,1]のとき
            vbias_term = -1 * SUM((a(i) * v(i))
            '''
            vbias_term = F.sum(F.matmul(v,self.l.a))
        else:
            '''
            可視層が実数のとき
            vbias_term = -0.5 * SUM((v(i)-a(i)) * (v(i)-a(i)))
            chainerではバッチ数*入力層で扱うため、各行からbiasを引くため
            無理やり m*nを使用
            '''
            m = Variable(xp.ones((batch_size,1),dtype=xp.float32))
            n = F.reshape(self.l.a,(1,n_visible))
            v_ = v - F.matmul(m,n)
            vbias_term = -F.sum(0.5 * v_ * v_)

        wx_b = self.l(v)
        hidden_term = F.sum(F.log(1+F.exp(wx_b)))
        return -vbias_term-hidden_term

    def propup(self,vis):
        '''
        可視層データから隠れ層の確率分布を計算
        隠れ層は[0,1]なので1となる確率を返す。
        入力値visは必ずxp(np)
        '''
        pre_sigmoid_activation = xp.dot(vis,self.l.W.data.T) + self.l.b.data
        return sigmoid(pre_sigmoid_activation)

    def propdown(self,hid):
        '''
        隠れ層データから可視層の確率分布を計算
        可視層が[0,1]の時は1となる確率を返す。
        可視層が実数の時は正規分布の平均を返す。分散は1固定。
        入力値hidは必ずxp(np)
        '''
        real = self.real
        if real == 0:
            pre_sigmoid_activation = xp.dot(hid,self.l.W.data) + self.l.a.data
            v_mean = sigmoid(pre_sigmoid_activation)
        else:
            v_mean = xp.dot(hid,self.l.W.data) + self.l.a.data
        return v_mean

    def sample_h_given_v(self,v0_sample):
        '''
        v0_sample → h1_sampleをgibbs sampling
        [0,1]の値をrandom作成して、popupで計算した1となる確率と比較
        h1_mean[i] >  p[i] なら h1_sample[i] = 1
        h1_mean[i] <= p[i] なら h1_sample[i] = 0
        '''
        h1_mean = self.propup(v0_sample)
        h1_sample = xp.random.binomial(size=h1_mean.shape,n=1,p=h1_mean)
        return h1_mean,h1_sample

    def sample_v_given_h(self,h0_sample):
        '''
        h0_sample → v1_sampleをgibbs sampling
        可視層が[0,1]の場合
        [0,1]の値をrandom値 p[i]を作成して、popdownで計算した1となる確率と比較
        v1_mean[i] >  p[i] なら v1_sample[i] = 1
        v1_mean[i] <= p[i] なら v1_sample[i] = 0
        可視層が実数の場合
        v1_sample[i]は平均v1_mean[i]、分散1の正規分布となるので
        平均0、分散1の正規分布からrandom値 u[i]を作成して、popdownで計算した値を追加
        v1_sample[i] = v1_mean[i] + u[i]
        '''
        v1_mean = self.propdown(h0_sample)
        if self.real == 0:
            v1_sample = xp.random.binomial(size=v1_mean.shape,n=1,p=v1_mean)
        else:
            batch_number = h0_sample.shape[0]
            v1_sample = v1_mean + xp.random.randn(batch_number,self.n_visible)
        return v1_mean,v1_sample

    def gibbs_hvh(self,h0_sample):
        ''' h->v->hをgibbs sampling '''
        v1_mean,v1_sample = self.sample_v_given_h(h0_sample)
        h1_mean,h1_sample = self.sample_h_given_v(v1_sample)
        return v1_mean,v1_sample,h1_mean,h1_sample

    def gibbs_vhv(self,v0_sample):
        ''' v->h->vをgibbs sampling '''
        h1_mean,h1_sample = self.sample_h_given_v(v0_sample)
        v1_mean,v1_sample = self.sample_v_given_h(h1_sample)
        return h1_mean,h1_sample,v1_mean,v1_sample

    def constrastive_divergence(self,v0_sample,k=1):
        ''' CD-kの処理、GPU化させるにはcupy必要 '''
        vh_sample = v0_sample
        for step in range(k):
            ph_mean,ph_sample,vh_mean,vh_sample = self.gibbs_vhv(vh_sample)
        return vh_sample

    def reconstruct(self, v):
        h = sigmoid(xp.dot(v,self.l.W.data.T) + self.l.b.data)
        reconstructed_v = sigmoid(xp.dot(h,self.l.W.data) + self.l.a.data)
        return reconstructed_v

chainerの場合、chainer.links.Linearを使って各層の写像の重み行列(W),バイアス(b)を定義できますが
RBMの場合追加でバイアスが必要となるため
l.add_param("a",(n_visible),dtype=xp.float32)
でパラメータを追加しました。
l.W:重み行列
l.b:hbias
l.a:vbaias
となります。
可視層から隠れ層への写像をchainer.links.Linearで管理させるため
一般的なRBMの本に記載されている重み行列を転置しています。(Wij→Wji)

1. MNISTで実験

以下のソースコードをMNISTのデータで実験してみました。
GPUのサーバがなかったのでCPUで学習させたため、学習回数は50回です。
今回は隠れ層を500次元とて、学習係数0.01,モメンタム0.5,PCD-10での結果を使用しています。

以下がoriginalのデータ。実際は60000件あるが、150件のみ表示
mnist_original.png

以下が上記の画像をreconstructした画像。ほぼ再現ができています。
mnist_reconstruct.png
件のみ表示。

そして次が重み行列を可視化した画像である。隠れ層は500次元で各ベクトルの基底に関して
28 * 28の画像に変換したものです。
mnist_weight.png