1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ラビットチャレンジレポート5 深層学習Day3

Posted at

1. 再帰的ニューラルネットワークの概念

1.1 要約

RNN

再帰的ニューラルネットワーク(RNN)は時系列データに対応可能なニューラルネットワークである。
時系列データとは、時間的順序を追って一定間隔ごとに観察され、相互に統計的依存関係が認められるようなデータの系列のこと。音声データやテキストデータなど。

RNNの概要は次の通りである。

image.png

u^t=W_{in}x^t+Wz^{t-1}+b\\
z^t=f(W_{in}x^t+Wz^{t-1}+b)\\
v^t=W_{out}z^t+c\\
y^t=g(W_{out}z^t+c)

時系列データを扱うために、初期の状態と過去の時間$t-1$の状態を保持し、そこから次の時間$t$の状態を再帰的に求める再帰構造を持っている。

BPTT

誤差逆伝播法の一種でRNNにおけるパラメータ調整方法である。
微分の計算を行う。

\frac{\partial E}{\partial W_{in}}=\frac{\partial E}{\partial u^{t}}\biggl[\frac{\partial u^t}{\partial W_{in}} \biggr]^T=\delta^t[x^t]^T\\
\frac{\partial E}{\partial W_{out}}=\frac{\partial E}{\partial v^{t}}\biggl[\frac{\partial v^t}{\partial W_{out}} \biggr]^T=\delta^{out,t}[z^t]^T\\
\frac{\partial E}{\partial W}=\frac{\partial E}{\partial u^{t}}\biggl[\frac{\partial u^t}{\partial W} \biggr]^T=\delta^{t}[z^{t-1}]^T\\
\frac{\partial E}{\partial b}=\frac{\partial E}{\partial u^t}\frac{\partial u^t}{\partial b}=\delta^t\\
\frac{\partial E}{\partial c}=\frac{\partial E}{\partial v^t}\frac{\partial v^t}{\partial c}=\delta^{out,t}
\frac{\partial E}{\partial u^{t}}=\frac{\partial E}{\partial v^{t}}\frac{\partial v^t}{\partial u^{t}}=\frac{\partial v^t}{\partial u^{t}}\frac{\partial \{W_{out}f(u^t)+c \}}{\partial u^{t}}=f'(u^t)W_{out}^T\delta^{out,t}=\delta^t\\
\delta^{t-1}=\frac{\partial E}{\partial u^{t-1}}=\frac{\partial E}{\partial u^{t}}\frac{\partial u^t}{\partial u^{t-1}}=\delta^t\bigl\{\frac{\partial u^t}{\partial z^{t-1}}\frac{\partial z^{t-1}}{\partial u^{t-1}} \bigr\}=\delta^t\{Wf'(u^{t-1}) \}\\
\delta^{t-z-1}=\delta^{t-z}\{Wf'(u^{t-z-1}) \}

パラメータの更新式は次のようになる。

W_{in}^{t+1}=W_{in}^{t}-\epsilon\frac{\partial E}{\partial W_{in}}=W^t_{in}-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}[x^{t-z}]^T\\
W_{out}^{t+1}=W_{out}^{t}-\epsilon\frac{\partial E}{\partial W_{out}}=W^t_{out}-\epsilon\delta^{out,t}[z^{t}]^T\\
W^{t+1}=W^{t}-\epsilon\frac{\partial E}{\partial W}=W^t-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}[z^{t-z-1}]^T\\
b^{t+1}=b^t-\epsilon\frac{\partial E}{\partial b}=b^t-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}\\
c^{t+1}=c^t-\epsilon\frac{\partial E}{\partial c}=c^t-\epsilon\delta^{out,t}

1.2 実装

2進数の足し算を計算するモデルの実装を行う。
入力は2つの数値(2進数)を並べて1つの配列としたもの、出力はその和である。
誤差は学習が進むほど小さくなり、最終的には0近くとなる。
実際の結果をみてもうまく計算できていることが確認できる。

import numpy as np
from common import functions
import matplotlib.pyplot as plt


def d_tanh(x):
    return 1/(np.cosh(x) ** 2)

# データを用意
# 2進数の桁数
binary_dim = 8
# 最大値 + 1
largest_number = pow(2, binary_dim)
# largest_numberまで2進数を用意
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)

input_layer_size = 2
hidden_layer_size = 16
output_layer_size = 1

weight_init_std = 1
learning_rate = 0.1

iters_num = 10000
plot_interval = 100

# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)
# Xavier
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size))
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size))
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size))
# He
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size)) * np.sqrt(2)
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)


# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)

u = np.zeros((hidden_layer_size, binary_dim + 1))
z = np.zeros((hidden_layer_size, binary_dim + 1))
y = np.zeros((output_layer_size, binary_dim))

delta_out = np.zeros((output_layer_size, binary_dim))
delta = np.zeros((hidden_layer_size, binary_dim + 1))

all_losses = []

for i in range(iters_num):
    
    # A, B初期化 (a + b = d)
    a_int = np.random.randint(largest_number/2)
    a_bin = binary[a_int] # binary encoding
    b_int = np.random.randint(largest_number/2)
    b_bin = binary[b_int] # binary encoding
    
    # 正解データ
    d_int = a_int + b_int
    d_bin = binary[d_int]
    
    # 出力バイナリ
    out_bin = np.zeros_like(d_bin)
    
    # 時系列全体の誤差
    all_loss = 0    
    
    # 時系列ループ
    for t in range(binary_dim):
        # 入力値
        X = np.array([a_bin[ - t - 1], b_bin[ - t - 1]]).reshape(1, -1)
        # 時刻tにおける正解データ
        dd = np.array([d_bin[binary_dim - t - 1]])
        
        u[:,t+1] = np.dot(X, W_in) + np.dot(z[:,t].reshape(1, -1), W)
        z[:,t+1] = functions.sigmoid(u[:,t+1])
#         z[:,t+1] = functions.relu(u[:,t+1])
#         z[:,t+1] = np.tanh(u[:,t+1])    
        y[:,t] = functions.sigmoid(np.dot(z[:,t+1].reshape(1, -1), W_out))


        #誤差
        loss = functions.mean_squared_error(dd, y[:,t])
        
        delta_out[:,t] = functions.d_mean_squared_error(dd, y[:,t]) * functions.d_sigmoid(y[:,t])        
        
        all_loss += loss

        out_bin[binary_dim - t - 1] = np.round(y[:,t])
    
    
    for t in range(binary_dim)[::-1]:
        X = np.array([a_bin[-t-1],b_bin[-t-1]]).reshape(1, -1)        

        delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_sigmoid(u[:,t+1])
#         delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_relu(u[:,t+1])
#         delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * d_tanh(u[:,t+1])    

        # 勾配更新
        W_out_grad += np.dot(z[:,t+1].reshape(-1,1), delta_out[:,t].reshape(-1,1))
        W_grad += np.dot(z[:,t].reshape(-1,1), delta[:,t].reshape(1,-1))
        W_in_grad += np.dot(X.T, delta[:,t].reshape(1,-1))
    
    # 勾配適用
    W_in -= learning_rate * W_in_grad
    W_out -= learning_rate * W_out_grad
    W -= learning_rate * W_grad
    
    W_in_grad *= 0
    W_out_grad *= 0
    W_grad *= 0
    

    if(i % plot_interval == 0):
        all_losses.append(all_loss)        
        print("iters:" + str(i))
        print("Loss:" + str(all_loss))
        print("Pred:" + str(out_bin))
        print("True:" + str(d_bin))
        out_int = 0
        for index,x in enumerate(reversed(out_bin)):
            out_int += x * pow(2, index)
        print(str(a_int) + " + " + str(b_int) + " = " + str(out_int))
        print("------------")

lists = range(0, iters_num, plot_interval)
plt.plot(lists, all_losses, label="loss")
plt.show()

image.png
image.png

image.png

時系列データとしてサイン波を使い、予測を行う。

import numpy as np
from common import functions
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

np.random.seed(0)

# sin曲線
round_num = 10
div_num = 500
ts = np.linspace(0, round_num * np.pi, div_num)
f = np.sin(ts)

def d_tanh(x):
    return 1/(np.cosh(x)**2 + 1e-4)

# ひとつの時系列データの長さ
maxlen = 2

# sin波予測の入力データ
test_head = [[f[k]] for k in range(0, maxlen)]

data = []
target = []

for i in range(div_num - maxlen):
    data.append(f[i: i + maxlen])
    target.append(f[i + maxlen])
    
X = np.array(data).reshape(len(data), maxlen, 1)
D = np.array(target).reshape(len(data), 1)

# データ設定
N_train = int(len(data) * 0.8)
N_validation = len(data) - N_train

x_train, x_test, d_train, d_test = train_test_split(X, D, test_size=N_validation)

input_layer_size = 1
hidden_layer_size = 5
output_layer_size = 1

weight_init_std = 0.01
learning_rate = 0.1

iters_num = 500

# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)

# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)

us = []
zs = []

u = np.zeros(hidden_layer_size)
z = np.zeros(hidden_layer_size)
y = np.zeros(output_layer_size)

delta_out = np.zeros(output_layer_size)
delta = np.zeros(hidden_layer_size)

losses = []

# トレーニング
for i in range(iters_num):
    for s in range(x_train.shape[0]):
        us.clear()
        zs.clear()
        z *= 0
        
        # sにおける正解データ
        d = d_train[s]

        xs = x_train[s]        
        
        # 時系列ループ
        for t in range(maxlen):
            
            # 入力値
            x = xs[t]
            u = np.dot(x, W_in) + np.dot(z, W)
            us.append(u)
            z = np.tanh(u)
            zs.append(z)

        y = np.dot(z, W_out)
        
        #誤差
        loss = functions.mean_squared_error(d, y)
        
        delta_out = functions.d_mean_squared_error(d, y)
        
        delta *= 0
        for t in range(maxlen)[::-1]:
            
            delta = (np.dot(delta, W.T) + np.dot(delta_out, W_out.T)) * d_tanh(us[t])
            
            # 勾配更新
            W_grad += np.dot(zs[t].reshape(-1,1), delta.reshape(1,-1))
            W_in_grad += np.dot(xs[t], delta.reshape(1,-1))
        W_out_grad = np.dot(z.reshape(-1,1), delta_out)
        
        # 勾配適用
        W -= learning_rate * W_grad
        W_in -= learning_rate * W_in_grad
        W_out -= learning_rate * W_out_grad.reshape(-1,1)
            
        W_in_grad *= 0
        W_out_grad *= 0
        W_grad *= 0

# テスト        
for s in range(x_test.shape[0]):
    z *= 0

    # sにおける正解データ
    d = d_test[s]

    xs = x_test[s]

    # 時系列ループ
    for t in range(maxlen):

        # 入力値
        x = xs[t]
        u = np.dot(x, W_in) + np.dot(z, W)
        z = np.tanh(u)

    y = np.dot(z, W_out)

    #誤差
    loss = functions.mean_squared_error(d, y)
    print('loss:', loss, '   d:', d, '   y:', y)
        
        
        
original = np.full(maxlen, None)
pred_num = 200

xs = test_head

# sin波予測
for s in range(0, pred_num):
    z *= 0
    for t in range(maxlen):
        
        # 入力値
        x = xs[t]
        u = np.dot(x, W_in) + np.dot(z, W)
        z = np.tanh(u)

    y = np.dot(z, W_out)
    original = np.append(original, y)
    xs = np.delete(xs, 0)
    xs = np.append(xs, y)

plt.figure()
plt.ylim([-1.5, 1.5])
plt.plot(np.sin(np.linspace(0, round_num* pred_num / div_num * np.pi, pred_num)), linestyle='dotted', color='#aaaaaa')
plt.plot(original, linestyle='dashed', color='black')
plt.show()

image.png

条件を変えて結果を確認する。

hidden_layer_size = 10

image.png

1.3 確認テスト

RNNの3つの重み

  • 入力層から中間層 $W_{in}$
  • 中間層から中間層 $W$
  • 中間層から出力層 $W_{out}$

中間層から中間層への重み$W$は、現在の中間層から次の中間層を定義するための重みである。

2. LSTM

2.1 要約

RNNには、時系列を遡るほど、勾配が消失するという問題がある。

image.png

LSTMは記憶セル$\boldsymbol{c}$を持ち、これはLSTM内だけで完結し他のレイヤへは出力されない。
記憶セル$\boldsymbol{c}t$は、$(\boldsymbol{c}{t-1},\boldsymbol{h}{t-1},\boldsymbol{x}t)$から何らかの計算によって求められる。
隠れ状態$\boldsymbol{h}
{t}$は、$\boldsymbol{h}
{t}=\tanh{(\boldsymbol{c}_{t})}$によって計算される。

LSTMでは3つのゲート

  • outputゲート
  • forgetゲート
  • inputゲート

が使用される。

outputゲート

outputゲートは、$\tanh{\boldsymbol{c}t}$の各要素に対して、「それらが次時刻の隠れ状態としてどれだけ重要か」を調整し、
次の隠れ状態$\boldsymbol{h}
{t}$を司るゲートである。
このoutputゲートの開き具合は、入力$\boldsymbol{x}{t}$と前の状態$\boldsymbol{h}{t-1}$から求める。

\boldsymbol{o}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)})

ここで、$\sigma()$はsigmoid関数を表し、$\boldsymbol{W}{x}^{(o)}$は入力$\boldsymbol{x}{t}$に対する重み、$\boldsymbol{W}{h}^{(o)}$は前時刻の状態$\boldsymbol{h}{t-1}$に対する重みである。
そして、この$\boldsymbol{o}$と$\tanh{(\boldsymbol{c}{t})}$の要素ごとの積を$\boldsymbol{h}{t}$として出力する。

\boldsymbol{h}_{t}=\boldsymbol{o}\odot\tanh{(\boldsymbol{c}_{t})}

ここで$\odot$はアダマール積と呼ばれ、要素ごとの積を計算を表す記号である。

forgetゲート

$\boldsymbol{c}_{t-1}$の記憶の中から、不要な記憶を忘れるためのゲートをforgetゲートと呼ぶ。
forgetゲートの出力は、

\boldsymbol{f}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)})

と求められる。

新しい記憶セル

forgetゲートによって忘れるのみとなるので、新しく覚えるべき情報を追加する。

\boldsymbol{g}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)})

この$\boldsymbol{g}$が前時刻の$\boldsymbol{c}_{t-1}$に加算されることで、新しい記憶が生まれる。

inputゲート

inputゲートは$\boldsymbol{g}$の各要素に重みづけするためのゲートである。

\boldsymbol{i}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)})

この$\boldsymbol{i}$と$\boldsymbol{g}$の要素ごとの積の結果を記憶セルに追加する。
すなわち新しい記憶$\boldsymbol{c}_t$は、

\boldsymbol{c}_t=\boldsymbol{f}\odot\boldsymbol{c}_{t-1}+\boldsymbol{g}\odot\boldsymbol{i}

となる。
LSTMで行う計算をまとめると、

\boldsymbol{f}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)})\\
\boldsymbol{g}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)})\\
\boldsymbol{i}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)})\\
\boldsymbol{o}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)})\\
\boldsymbol{c}_t=\boldsymbol{f}\odot\boldsymbol{c}_{t-1}+\boldsymbol{g}\odot\boldsymbol{i}\\
\boldsymbol{h}_{t}=\boldsymbol{o}\odot\tanh{(\boldsymbol{c}_{t})}

ここで、上から4つのアフィン変換

\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)}

について、

\boldsymbol{x}_{t}\bigl[\boldsymbol{W}_{x}^{(f)}\  \boldsymbol{W}_{x}^{(g)}\ \boldsymbol{W}_{x}^{(i)}\ \boldsymbol{W}_{x}^{(o)}\bigl]+
\boldsymbol{h}_{t-1}\bigl[\boldsymbol{W}_{x}^{(f)}\  \boldsymbol{W}_{x}^{(g)}\ \boldsymbol{W}_{x}^{(i)}\ \boldsymbol{W}_{x}^{(o)}\bigl]+
\bigl[\boldsymbol{b}^{(f)}\  \boldsymbol{b}^{(g)}\ \boldsymbol{b}^{(i)}\ \boldsymbol{b}^{(o)}\bigl]

さらに[]内の行列をまとめて書けば、

\boldsymbol{x}_{t}\boldsymbol{W}_{x}+\boldsymbol{h}_{t-1}\boldsymbol{W}+\boldsymbol{b}

となり、1回の計算で済ませることができる。

2.2 実装

class LSTM:
    def __init__(self, Wx, Wh, b):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None
    
    def forward(self, x, h_prev, c_prev):
        Wx, Wh, b = self.params
        N, H = h_prev.shape
        
        A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b
        
        # slice
        f = A[:, :H]
        g = A[:, H:2*H]
        i = A[:, 2*H:3*H]
        o = A[:, 3*H:]
        
        f = sigmomid(f)
        g = np.tanh(g)
        i = sigmomid(i)
        o = sigmomid(o)
        
        c_next = f * c_prev + g * i
        h_next = o * np.tanh(c_next)
        
        self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
        return h_next, c_next
    
    def backward(self, dh_next, dc_next):
        Wx, Wh, b = self.params
        x, h_prev, c_prev, i, f, g, o, c_next = self.cache

        tanh_c_next = np.tanh(c_next)

        ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

        dc_prev = ds * f

        di = ds * g
        df = ds * c_prev
        do = dh_next * tanh_c_next
        dg = ds * i

        di *= i * (1 - i)
        df *= f * (1 - f)
        do *= o * (1 - o)
        dg *= (1 - g ** 2)

        dA = np.hstack((df, dg, di, do))

        dWh = np.dot(h_prev.T, dA)
        dWx = np.dot(x.T, dA)
        db = dA.sum(axis=0)

        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        dx = np.dot(dA, Wx.T)
        dh_prev = np.dot(dA, Wh.T)

        return dx, dh_prev, dc_prev

class TimeLSTM:
    def __init__(self, Wx, Wh, b, stateful=False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None

        self.h, self.c = None, None
        self.dh = None
        self.stateful = stateful

    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]

        self.layers = []
        hs = np.empty((N, T, H), dtype='f')

        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype='f')
        if not self.stateful or self.c is None:
            self.c = np.zeros((N, H), dtype='f')

        for t in range(T):
            layer = LSTM(*self.params)
            self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
            hs[:, t, :] = self.h

            self.layers.append(layer)

        return hs

    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype='f')
        dh, dc = 0, 0

        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]
            dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
            dxs[:, t, :] = dx
            for i, grad in enumerate(layer.grads):
                grads[i] += grad

        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh
        return dxs

    def set_state(self, h, c=None):
        self.h, self.c = h, c

    def reset_state(self):
        self.h, self.c = None, None

2.3 演習

image.png

1つ前のセルの状態に入力ゲート、忘却ゲートを掛けて足し合わせたものと表現される。
正解は3となる。

3. GRU

3.1 要約

image.png

GRUはLSTMと違い、隠れ状態のみを使用する。
また、ゲートは、

  • resetゲート
  • updateゲート

の2種類となる。
GRUで行う計算をまとめると、

\boldsymbol{z}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(z)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(z)}+\boldsymbol{b}^{(z)})\\
\boldsymbol{r}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(r)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(r)}+\boldsymbol{b}^{(r)})\\
\hat{\boldsymbol{h}}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}+(\boldsymbol{r}\odot\boldsymbol{h}_{t-1})\boldsymbol{W}_h+\boldsymbol{b})\\
\boldsymbol{h}_t=(1-\boldsymbol{z})\odot\boldsymbol{h}_{t-1}+\boldsymbol{z}\odot\hat{\boldsymbol{h}}

resetゲートは、過去の隠れ状態をどれだけ無視するか決定する。
updateゲートは2つの役割(LSTMのforgetゲート、inputゲートに相当するもの)をもっており、
forgetゲートとして機能するのは$(1-\boldsymbol{z})\odot\boldsymbol{h}_{t-1}$の箇所、
inputゲートとして機能するのは$\boldsymbol{z}\odot\hat{\boldsymbol{h}}$の箇所である。

3.2 実装

class GRU:
    def __init__(self, Wx, Wh, b):
        '''

        Parameters
        ----------
        Wx: 入力`x`用の重みパラーメタ(3つ分の重みをまとめる)
        Wh: 隠れ状態`h`用の重みパラメータ(3つ分の重みをまとめる)
        b: バイアス(3つ分のバイアスをまとめる)
        '''
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None

    def forward(self, x, h_prev):
        Wx, Wh, b = self.params
        H = Wh.shape[0]
        Wxz, Wxr, Wxh = Wx[:, :H], Wx[:, H:2 * H], Wx[:, 2 * H:]
        Whz, Whr, Whh = Wh[:, :H], Wh[:, H:2 * H], Wh[:, 2 * H:]
        bz, br, bh = b[:H], b[H:2 * H], b[2 * H:]

        z = sigmoid(np.dot(x, Wxz) + np.dot(h_prev, Whz) + bz)
        r = sigmoid(np.dot(x, Wxr) + np.dot(h_prev, Whr) + br)
        h_hat = np.tanh(np.dot(x, Wxh) + np.dot(r*h_prev, Whh) + bh)
        h_next = (1-z) * h_prev + z * h_hat

        self.cache = (x, h_prev, z, r, h_hat)

        return h_next

    def backward(self, dh_next):
        Wx, Wh, b = self.params
        H = Wh.shape[0]
        Wxz, Wxr, Wxh = Wx[:, :H], Wx[:, H:2 * H], Wx[:, 2 * H:]
        Whz, Whr, Whh = Wh[:, :H], Wh[:, H:2 * H], Wh[:, 2 * H:]
        x, h_prev, z, r, h_hat = self.cache

        dh_hat =dh_next * z
        dh_prev = dh_next * (1-z)

        # tanh
        dt = dh_hat * (1 - h_hat ** 2)
        dbh = np.sum(dt, axis=0)
        dWhh = np.dot((r * h_prev).T, dt)
        dhr = np.dot(dt, Whh.T)
        dWxh = np.dot(x.T, dt)
        dx = np.dot(dt, Wxh.T)
        dh_prev += r * dhr

        # update gate(z)
        dz = dh_next * h_hat - dh_next * h_prev
        dt = dz * z * (1-z)
        dbz = np.sum(dt, axis=0)
        dWhz = np.dot(h_prev.T, dt)
        dh_prev += np.dot(dt, Whz.T)
        dWxz = np.dot(x.T, dt)
        dx += np.dot(dt, Wxz.T)

        # rest gate(r)
        dr = dhr * h_prev
        dt = dr * r * (1-r)
        dbr = np.sum(dt, axis=0)
        dWhr = np.dot(h_prev.T, dt)
        dh_prev += np.dot(dt, Whr.T)
        dWxr = np.dot(x.T, dt)
        dx += np.dot(dt, Wxr.T)

        self.dWx = np.hstack((dWxz, dWxr, dWxh))
        self.dWh = np.hstack((dWhz, dWhr, dWhh))
        self.db = np.hstack((dbz, dbr, dbh))

        self.grads[0][...] = self.dWx
        self.grads[1][...] = self.dWh
        self.grads[2][...] = self.db

        return dx, dh_prev


class TimeGRU:
    def __init__(self, Wx, Wh, b, stateful=False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None
        self.h, self.dh = None, None
        self.stateful = stateful

    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]
        self.layers = []
        hs = np.empty((N, T, H), dtype='f')

        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype='f')

        for t in range(T):
            layer = GRU(*self.params)
            self.h = layer.forward(xs[:, t, :], self.h)
            hs[:, t, :] = self.h
            self.layers.append(layer)
        return hs

    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype='f')

        dh = 0
        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]
            dx, dh = layer.backward(dhs[:, t, :] + dh)
            dxs[:, t, :] = dx

            for i, grad in enumerate(layer.grads):
                grads[i] += grad

        for i, grad in enumerate(grads):
            self.grads[i][...] = grad

        self.dh = dh
        return dxs

    def set_state(self, h):
        self.h = h

    def reset_state(self):
        self.h = None

3.3 演習

image.png

次の時刻の中間状態は、1時刻前の中間表現と計算された中間表現の線形和で表現される。
正解は4となる。

4. 双方向RNN

4.1 要約

過去の情報だけでなく、未来の情報を加味することで精度を向上させるためのモデル。

image.png

4.2 実装

時系列データを反転させたものを、LSTMで順伝搬させる。
そして、その出力を再度反転させることで元の順番に並べ替える。
最後に通常のLSTMの出力と結合(concatenate)させて全体の出力とする。
通常のLSTMと比べて出力の長さは2倍になる。

class TimeBiLSTM:
    def __init__(self, Wx1, Wh1, b1,
                 Wx2, Wh2, b2, stateful=False):
        self.forward_lstm = TimeLSTM(Wx1, Wh1, b1, stateful)
        self.backward_lstm = TimeLSTM(Wx2, Wh2, b2, stateful)
        self.params = self.forward_lstm.params + self.backward_lstm.params
        self.grads = self.forward_lstm.grads + self.backward_lstm.grads

    def forward(self, xs):
        o1 = self.forward_lstm.forward(xs)
        o2 = self.backward_lstm.forward(xs[:, ::-1])
        o2 = o2[:, ::-1]

        out = np.concatenate((o1, o2), axis=2)
        return out

    def backward(self, dhs):
        H = dhs.shape[2] // 2
        do1 = dhs[:, :, :H]
        do2 = dhs[:, :, H:]

        dxs1 = self.forward_lstm.backward(do1)
        do2 = do2[:, ::-1]
        dxs2 = self.backward_lstm.backward(do2)
        dxs2 = dxs2[:, ::-1]
        dxs = dxs1 + dxs2
        return dxs

4.3 演習

image.png

順方向と逆方向に伝播させたときの中間表現を結合したものが出力となるので、正解は4となる。

5. Seq2Seq

5.1 要約

Encoder-Decoderモデルの一種であり、機械対話や機器翻訳に使用される。

image.png

Encoder RNN

Encoderの処理としては、

  • vec1をRNNに入力し、hidden stateを出力
    このhidden stateと次の入力vec2をまたRNNに入力して、hidden stateを出力という流れを繰り返す。

  • 最後のvecを入力したときのhidden stateをfinal stateとする。
    このfinal stateがthought vectorと呼ばれ入力した文の意味を表すベクトルとなる。

Decoder RNN

以下の手順で文章を生成する。

  1. RNNのfinal stateから、各トークンの生成確率を出力
  2. 生成確率にもとづいてトークンをランダムに選ぶ
  3. 2で選ばれたトークンをEmbeddingしてDecoder RNNへの次の入力とする
  4. 1-3を繰り返し、2で得られたトークンを文字列に直す

HRED

過去n-1個の発話から次の発話を生成数る。
Seq2Seqでは、会話の文脈無視で応答がなされたが、HREDでは前の単語の流れに即して応答される。
構造は、Seq2SeqとContext RNNを組み合わせたものとなる。
Context RNNとは、Encoderのまとめた各文章の系列をまとめて、これまでの会話コンテキスト全体を表すベクトルに変換する構造のこと。

HREDでは、

  • 会話の流れのような多様性が無い
  • さらに、短く情報量に乏しい答えをしやすい傾向がある

などの課題がある。

VHRED

VHREDは、HREDにVAEの潜在変数の概念を追加したもの。

VAE

オートエンコーダとは、教師なし学習の1つ。
例えば、入力画像と同じ画像を出力するニューラルネットワークなど。

image.png

VAEでは、潜在変数zに確率分布N(0,1)を仮定したもの。

5.2 実装

EncoderとDecoderの実装を行う。

class Encoder:
    def __init__(self, vocab_size, wprdvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn
        
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4*H).astype('f')
        
        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
        
        self.params = self.embed.params + self.lstm.params
        self.grads = self.embed.grads + self.lstm.grads
        self.hs = None
    
    def forward(self, xs):
        xs = self.embed.forward(xs)
        hs = self.lstm.forward(xs)
        self.hs = hs
        return hs[:, -1 , :]
    
    def backward(self, dh):
        dhs = np.zeros_like(self.hs)
        dhs[:, -1, :] = dh
        
        dout = self.lstm.backward(dhs)
        dout = self.embed.backward(dout)
        return dout

class Decoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn
        
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4*H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')
        
        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
        self.affine = TimeAffine(affine_W, affine_b)
        
        self.params, self.grads = [], []
        for layer in (self.embed, self.lstm, self.affine):
            self.params +=  layer.params
            self.grads +=  layer.grads
    
    def forward(self, xs, h):
        self.lstm.set_state(h)
        
        out = self.embed.forward(xs)
        out = self.lstm.forward(out)
        score = self.affine.forward(out)
        return score
    
    def backward(self, dscore):
        dout = self.affine.backward(dscore)
        dout = self.lstm.backward(dout)
        dout = self.embed.backward(dout)
        dh = self.lstm.dh
        return dh
    
    def generate(self, h, start_id, sample_size):
        sampled = []
        sample_id = start_id
        self.lstm.set_state(h)
        
        for _ in range(sample_size):
            x = np.array(sample_id).reshape((1,1))
            out = self.embed.forward(x)
            out = self.lstm.forward(out)
            score = self.affine.forward(out)
            
            sample_id = np.argmax(score.flatten())
            sampled.append(int(sample_id))
        
        return sampled

Seq2Seqの実装を行う。

class Seq2seq(BaseModel):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        self.encoder = Encoder(V, D, H)
        self.decoder = Decoder(V, D, H)
        self.softmax = TimeSoftmaxWithLoss()
        
        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads
    
    def forward(self, xs, ts):
        decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:]
        
        h = self.encoder.forward(xs)
        score = self.decoder.forward(decoder_xs, h)
        loss = self.softmax.forward(score, decoder_ts)
        return loss
    
    def backward(self, dout=1):
        dout = self.softmax.backward(dout)
        dh = self.decoder.backward(dout)
        dout = self.encoder.backward(dh)
        return dout
    
    def generate(self, xs, start_id, sample_size):
        h = self.encoder.forward(xs)
        sampled = self.decoder.generate(h, start_id, sample_size)
        return sampled

VAE

  • 中間層
  • パラメータを出力する層
  • サンプリングする層
  • 出力層

の実装を行う。

class BaseLayer:
    def update(self, eta):
        self.w -= eta*self.grad_w
        self.b -= eta*self.grad_b

class MiddleLayer(BaseLayer):
    def __init__(self, n_upper, n):
        self.w = np.random.randn(n_upper, n) * np.sqrt(2/n_upper)
        se;f.b = np.zeros(n)
    
    def forward(self, x):
        self.x = x
        self.u = np.dot(x, self.w) + self.b
        self.y = np.where(self.u <= 0 ,0, self.u)
    
    def backward(self, grad_y):
        delta = grad_y * np.where(self.u <= 0, 0, 1)
        
        self.grad_w = np.dot(self.x.T, delta)
        self.grad_b = np.sum(delta, axis=0)
        self.grad_x = np.dot(delta, self.w.T)

class ParamsLayer(BaseLayer):
    def __init__(self, n_upper, n):
        self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper)
        self.b = np.zeros(n)
    
    def forward(self, x):
        self.x = x
        u = np.dot(x, self.w) + self.b
        self.y = u
    
    def backward(self, grad_y):
        delta = grad_y
        
        self.grad_w = np.dot(self.x.T, delta)
        self.grad_b = np.sum(delta, axis=0)
        self.grad_x = np.dot(delta, self.w.T)

def OutputLayer(BaseLayer):
    def __init__(self, n_upper, n):
        self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper)
        self.b = np.zeros(n)
    
    def forward(self, x):
        self.x = x
        u = np.dot(x, self.w) + self.b
        self.y = 1/(1+np.exp(-u))
    
    def backward(self, t):
        delta = self.y - t
        
        self.grad_w = np.dot(self.x.T, delta)
        self.grad_b = np.sum(delta, axis=0)
        self.grad_x = np.dot(delta, self.w.T)

class LatentLayer:
    def forward(self, mu, log_var):
        self.mu = mu
        self.log_var = log_var
        
        self.epsilon = np.random.randn(*log_var.shape)
        self.z = mu + self.epsilon*np.exp(log_var/2)
    
    def backward(self, grad_z):
        self.grad_mu = grad_z + selh.mu
        self.grad_log_var = grad_z*self.epsilon/2*np.exp(self.log_var/2) - 0.5*(1-np.exp(self.log_var))

5.3 確認テスト

image.png

wはワンホットベクトルであり、単語埋め込みによりエンベディングを行う。
正解は1となる。

6. word2vec

6.1 要約

ニューラルネットワークで単語の処理を行うため、単語を「固定長のベクトル」に変換する必要がある。
その方法の1つとしてone-hot表現がある。これは、要素の中で1つだけ1で、残りはすべて0であるようなベクトルのことである。
例えば 「you」の単語IDが2のとき、one-hot表現は$(0,0,1,0,0,0,0)$のようになる。

ここで、全結合層を考えると入力$\boldsymbol{c}$(one-hot表現)と重み行列$\boldsymbol{W}$の行列の積の計算となる。
このとき出力は、重み行列から該当する場所の行ベクトルを抜き出しただけの値となる。

CBOW(Continuous Bag-of-Words)

word2vecでは、CBOWモデルとskip-gramモデルの2つのモデルが使用されるニューラルネットワークである。
CBOWモデルは、コンテキストからターゲットを推測することを目的としたニューラルネットワークのことである。

CBOWモデルは、入力層が2つあり(コンテキストの数)、中間層を経て、出力層へたどり着く。
入力層から中間層へは、同じ全結合層($\boldsymbol{W_{in}}$)によって行われ、中間層から出力層へは全結合層($\boldsymbol{W_{out}}$)によって行われる。
全結合層により、1つ目の入力層が$\boldsymbol{h_{1}}$、2つ目の入力層が$\boldsymbol{h_{2}}$に変換されたとすると、
中間層のニューロンは$\frac{1}{2}(\boldsymbol{h_{1}}+\boldsymbol{h_{2}})$となる。
出力層には単語の数だけニューロンが存在するが、出力されたスコアにSoftmax関数を適用することで確率が得られる。

全結合層の重み$\boldsymbol{W_{in}}$は、単語の分散表現となる。
中間層のニューロンの数を入力層よりも減らすことで、単語を予測するために必要な情報をコンパクトに納められる。
これはエンコードと呼ばれ、中間層から目的の結果を得る作業はデコードと呼ばれる。

6.2 実装

def preprocess(text):
    # 前処理
    text = text.lower() # すべての文字を小文字に変換
    text = text.replace('.', ' .') # 「.」も1単語として扱うためスペースを入れる。
    words = text.split(' ')
    
    # 単語辞書、ID辞書の作成
    word_to_id = {} # 単語: ID
    id_to_word = {} # ID: 単語

    for word in words: # 単語のリスト
        if word not in word_to_id: # word_to_idに存在しない場合
            new_id = len(word_to_id) # 新しいIDの発行
            word_to_id[word] = new_id
            id_to_word[new_id] = word
    
    # コーパスの作成(単語リストをIDリストに変換)
    corpus = [word_to_id[w] for w in words]
    
    return corpus, word_to_id, id_to_word

def create_contexts_target(corpus, window_size=1):
    # windowサイズに合わせてtargetの範囲を決める
    target = corpus[window_size:-window_size]
    contexts = []
    
    # targetのID
    for idx in range(window_size, len(corpus)-window_size):
        cs = [] # targetごとにコンテキストのリストを作成
        # 左隣から右隣まで
        for t in range(-window_size, window_size+1):
            # t=0は自分自身
            if t == 0:
                continue
            # IDをcsに追加
            cs.append(corpus[idx+t])
        # IDリストをcontextsに追加
        contexts.append(cs)
    
    return np.array(contexts), np.array(target)

def convert_one_hot(corpus, vocab_size):
    '''one-hot表現への変換

    :param corpus: 単語IDのリスト(1次元もしくは2次元のNumPy配列)
    :param vocab_size: 語彙数
    :return: one-hot表現(2次元もしくは3次元のNumPy配列)
    '''
    N = corpus.shape[0]
    
    # コーパスが1次元配列
    if corpus.ndim == 1:
        # コーパスの各単語(N)に対し,単語数(vocab_size)の列を用意
        one_hot = np.zeros((N, vocab_size), dtype=np.int32)
        # コーパスを順に処理
        for idx, word_id in enumerate(corpus):
            # 該当する単語の位置に1を入れる
            one_hot[idx, word_id] = 1
            
    # コーパスが多次元(C次元)
    elif corpus.ndim == 2:
        C = corpus.shape[1]
        # コーパスの各単語(N)に対し,C次元分(コンテキスト数)の単語数(vocab_size)の列を用意
        one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
        # 1番目の単語から処理していく
        for idx_0, word_ids in enumerate(corpus):
            # 次元ごとに処理していく
            for idx_1, word_id in enumerate(word_ids):
                one_hot[idx_0, idx_1, word_id] = 1

    return one_hot
class SimpleCBOW:
    def __init__(self, vocab_size, hidden_size):
        V, H = vocab_size, hidden_size
        
        # 重みの初期化
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(H, V).astype('f')
        
        # レイヤの生成
        self.in_layer0 = MatMul(W_in)
        self.in_layer1 = MatMul(W_in)
        self.out_layer = MatMul(W_out)
        self.loss_layer = SoftmaxWithLoss()
        
        # すべての重みと勾配をリストにまとめる
        layers = [self.in_layer0, self.in_layer1, self.out_layer]
        self.params, self.grads = [], []
        for layer in layers:
            # 同じ重みを複数のレイヤで共有していることに注意
            self.params += layer.params
            self.grads += layer.grads
        
        # メンバ変数に単語の分散表現を設定
        self.word_vecs = W_in
    
    def forward(self, contexts, target):
        h0 = self.in_layer0.forward(contexts[:, 0]) # 1番目の次元はコンテキストのウィンドウサイズ分
        h1 = self.in_layer1.forward(contexts[:, 1])
        h = (h0 + h1) * 0.5
        score = self.out_layer.forward(h)
        loss = self.loss_layer.forward(score, target)
        return loss
    
    def backward(self, dout=1):
        ds = self.loss_layer.backward(dout)
        da = self.out_layer.backward(ds)
        da *= 0.5 # 「×」の逆伝播
        self.in_layer1.backward(da)
        self.in_layer0.backward(da)
        return None
window_size = 1
hidden_size = 5
batch_size = 3
max_epoch = 1000
eval_interval = 200

text = 'あなた は 私 に さよなら と 言う. 私 は あなた に こんにちは と 言う.'
corpus, word_to_id, id_to_word = preprocess(text)

vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size=1)
target = convert_one_hot(target, vocab_size)
contexts =  convert_one_hot(contexts, vocab_size)

model = SimpleCBOW(vocab_size, hidden_size)
optimizer = Adam()
trainer = Trainer(model, optimizer)

trainer.fit(contexts, target, max_epoch, batch_size, eval_interval=100)
trainer.plot()

image.png

word_vecs = model.word_vecs

for word_id, word in id_to_word.items():
    print(word, word_vecs[word_id])

image.png

7. AttentionMechanism

7.1 要約

Encoderの改良

seq2seqでは、Encoderが時系列データを固定長のベクトルに変換した。
どんな長い文章でも、それを固定長のベクトルに変換してしまうことが問題点として挙げられる。
そこで、各時刻の隠れ状態ベクトルをすべて利用することで、入力された単語列と同じ数のベクトル$\boldsymbol{hs}$を得ることができる。
各時刻の隠れ状態には、直前に入力された単語の情報が多く含まれる。

Decoderの改良

Encoderに出力$\boldsymbol{hs}$全てを活用できるように改良を行う。
「翻訳先の単語」と対応関係にある「翻訳元の単語」の情報を選び出すこと(単語の対応関係を表す情報をアライメントと呼ぶ)、
そしてその情報を利用して翻訳を行うことを考える。

Encoderに出力$\boldsymbol{hs}$を受け取ったとき、最終行はこれまで通りDecoderの最初のLSTMレイヤに渡す。
さらに、各時刻のAffineレイヤの前に、各時刻の隠れ状態と$\boldsymbol{hs}$を受け取り「何らかの計算」を行うレイヤを追加する。
各時刻において、Decoderへの入力単語と対応関係にある単語のベクトルを$\boldsymbol{hs}$から選び出させたいが、選び出すという操作は微分ができないという問題点がある。
そこで単語の重要度を表す重み$\boldsymbol{a}$を計算し、$\boldsymbol{hs}$から、重み付き和「コンテキストベクトル」$\boldsymbol{c}$を求める。

7.2 実装

class WeightSum:
    def __init__(self):
        self.params, self.grads = [], []
        self.cache = None
    
    def forward(self, hs, a):
        N, T, H = hs.shape
        
        ar = a.reshape(N, T, 1).repeat(H, axis=2)
        t = hs * ar
        c = np.sum(t, axis=1)
        
        self.cache = (hs, ar)
        return c
    
    def backward(self, dc):
        hs, ar = self.cache
        N, T, H = hs.shape
        
        dt = dc.reshape(N, 1, H).repeat(T, axis=1)
        dar = dt * hs
        dhs = dt * ar
        da = np.sum(dar, axis=2)
        
        return dhs, da

重み$\boldsymbol{a}$の求め方を考える。
LSTMの隠れ状態ベクトルを$\boldsymbol{h}$表すと、類似度は$\boldsymbol{hs}$と$\boldsymbol{h}$の内積により求まる。
この結果を$\boldsymbol{s}$とし、Softmax関数により正規化を行う。

class AttentionWeight:
    def __init__(self):
        self.params, self.grads = [], []
        self.softmax = Softmax()
        self.cache = None
    
    def forward(self, hs, h):
        N, T, H = hs.shape
        
        hr = h.reshape(N, 1, H).repeat(T, axis=1)
        t = hs * hr
        s = np.sum(t, axis=2)
        a = self.softmax.forward(s)
        
        self.cache = (hs, hr)
        return a
    
    def backward(self, da):
        hs, hr = self.cache
        N, T, H = hs.shape
        
        ds = self.softmax.backward(da)
        dt = ds.reshape(N, T, 1).repeat(H, axis=2)
        dhs = dt * hr
        dhr = dt * hs
        dh = np.sum(dhr, axis=1)
        return dhs, dh

2つのレイヤを組み合わせる。
Encoderが出力する各単語ベクトル$\boldsymbol{hs}$に対し、Attention Weightレイヤが注意を払い、各単語の重み$\boldsymbol{a}$を求める。
それに続き、Weight Sumレイヤが$\boldsymbol{a}$と$\boldsymbol{hs}$の重み付き和を求め、コンテキストベクトル$\boldsymbol{c}$として出力する。
この一連の計算を行うレイヤをAttentionレイヤと呼ぶこととする。

class Attention:
    def __init__(self):
        self.params, self.grads = [], []
        self.attention_weight_layer = AttentionWeight()
        self.weight_sum_layer = WeightSum()
        self.attention_weight = None

    def forward(self, hs, h):
        a = self.attention_weight_layer.forward(hs, h)
        out = self.weight_sum_layer.forward(hs, a)
        self.attention_weight = a
        return out

    def backward(self, dout):
        dhs0, da = self.weight_sum_layer.backward(dout)
        dhs1, dh = self.attention_weight_layer.backward(da)
        dhs = dhs0 + dhs1
        return dhs, dh

複数のAttentionレイヤをまとめてTime Attentionレイヤを実装する

class TimeAttention:
    def __init__(self):
        self.params, self.grads = [], []
        self.layers = None
        self.attention_weights = None
    
    def forward(self, hs_enc, hs_dec):
        N, T, H = hs_dec.shape
        out = np.empty_like(hs_dec)
        self.layers = []
        self.attention_weights = []
        
        for t in range(T):
            layer = Attention()
            out[:, t, :] = layer.forward(hs_enc, hs_dec[:, t, :])
            self.layers.append(layer)
            self.attention_weights.append(layer.attention_weight)
        
        return out
    
    def backward(self, dout):
        N, T, H = dout.shape
        dhs_enc = 0
        dhs_dec = np.empty_like(dout)
        
        for t in range(T):
            layer = self.layers[t]
            dhs, dh = layer.backward(dout[:, t, :])
            dhs_enc += dhs
            dhs_dec[:, t, :] = dh
        
        return dhs_enc, dhs_dec

Attention付きのSeq2Seqの実装を行う。

class AttentionEncoder(Encoder):
    def forward(self, xs):
        xs = self.embed.forward(xs)
        hs = self.lstm.forward(xs)
        return hs
    
    def backward(self, dhs):
        dout = self.lstm.backward(dhs)
        dout = self.embed.backward(dout)
        return dout

class AttentionDecoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn
        
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4*H).astype('f')
        affine_W = (rn(2*H, V) / np.sqrt(2*H)).astype('f')
        affine_b = np.zeros(V).astype('f')
        
        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
        self.attention = TimeAttention()
        self.affine = TimeAffine(affine_W, affine_b)
        layers = [self.embed, self.lstm, self.attention, self.affine]
        
        self.params, self.grads = [], []
        for layer in layers:
            self.params +=  layer.params
            self.grads +=  layer.grads
    
    def forward(self, xs, enc_hs):
        h = enc_hs[:,-1]
        self.lstm.set_state(h)
        
        out = self.embed.forward(xs)
        dec_hs = self.lstm.forward(out)
        c = self.attention.forward(enc_hs, dec_hs)
        out = np.concatenate((c, dec_hs), axis=2)
        score = self.affine.forward(out)
        return score
    
    def backward(self, dscore):
        dout = self.affine.backward(dscore)
        N, T, H2 = dout.shape
        H = H2 // 2

        dc, ddec_hs0 = dout[:,:,:H], dout[:,:,H:]
        denc_hs, ddec_hs1 = self.attention.backward(dc)
        ddec_hs = ddec_hs0 + ddec_hs1
        dout = self.lstm.backward(ddec_hs)
        dh = self.lstm.dh
        denc_hs[:, -1] += dh
        self.embed.backward(dout)

        return denc_hs

    def generate(self, enc_hs, start_id, sample_size):
        sampled = []
        sample_id = start_id
        h = enc_hs[:, -1]
        self.lstm.set_state(h)

        for _ in range(sample_size):
            x = np.array([sample_id]).reshape((1, 1))

            out = self.embed.forward(x)
            dec_hs = self.lstm.forward(out)
            c = self.attention.forward(enc_hs, dec_hs)
            out = np.concatenate((c, dec_hs), axis=2)
            score = self.affine.forward(out)

            sample_id = np.argmax(score.flatten())
            sampled.append(sample_id)

        return sampled

class AttentionSeq2seq(Seq2seq):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        args = vocab_size, wordvec_size, hidden_size
        self.encoder = AttentionEncoder(*args)
        self.decoder = AttentionDecoder(*args)
        self.softmax = TimeSoftmaxWithLoss()
        
        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads

単語の重要度を可視化することができる。

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?