1. 再帰的ニューラルネットワークの概念
1.1 要約
RNN
再帰的ニューラルネットワーク(RNN)は時系列データに対応可能なニューラルネットワークである。
時系列データとは、時間的順序を追って一定間隔ごとに観察され、相互に統計的依存関係が認められるようなデータの系列のこと。音声データやテキストデータなど。
RNNの概要は次の通りである。
u^t=W_{in}x^t+Wz^{t-1}+b\\
z^t=f(W_{in}x^t+Wz^{t-1}+b)\\
v^t=W_{out}z^t+c\\
y^t=g(W_{out}z^t+c)
時系列データを扱うために、初期の状態と過去の時間$t-1$の状態を保持し、そこから次の時間$t$の状態を再帰的に求める再帰構造を持っている。
BPTT
誤差逆伝播法の一種でRNNにおけるパラメータ調整方法である。
微分の計算を行う。
\frac{\partial E}{\partial W_{in}}=\frac{\partial E}{\partial u^{t}}\biggl[\frac{\partial u^t}{\partial W_{in}} \biggr]^T=\delta^t[x^t]^T\\
\frac{\partial E}{\partial W_{out}}=\frac{\partial E}{\partial v^{t}}\biggl[\frac{\partial v^t}{\partial W_{out}} \biggr]^T=\delta^{out,t}[z^t]^T\\
\frac{\partial E}{\partial W}=\frac{\partial E}{\partial u^{t}}\biggl[\frac{\partial u^t}{\partial W} \biggr]^T=\delta^{t}[z^{t-1}]^T\\
\frac{\partial E}{\partial b}=\frac{\partial E}{\partial u^t}\frac{\partial u^t}{\partial b}=\delta^t\\
\frac{\partial E}{\partial c}=\frac{\partial E}{\partial v^t}\frac{\partial v^t}{\partial c}=\delta^{out,t}
\frac{\partial E}{\partial u^{t}}=\frac{\partial E}{\partial v^{t}}\frac{\partial v^t}{\partial u^{t}}=\frac{\partial v^t}{\partial u^{t}}\frac{\partial \{W_{out}f(u^t)+c \}}{\partial u^{t}}=f'(u^t)W_{out}^T\delta^{out,t}=\delta^t\\
\delta^{t-1}=\frac{\partial E}{\partial u^{t-1}}=\frac{\partial E}{\partial u^{t}}\frac{\partial u^t}{\partial u^{t-1}}=\delta^t\bigl\{\frac{\partial u^t}{\partial z^{t-1}}\frac{\partial z^{t-1}}{\partial u^{t-1}} \bigr\}=\delta^t\{Wf'(u^{t-1}) \}\\
\delta^{t-z-1}=\delta^{t-z}\{Wf'(u^{t-z-1}) \}
パラメータの更新式は次のようになる。
W_{in}^{t+1}=W_{in}^{t}-\epsilon\frac{\partial E}{\partial W_{in}}=W^t_{in}-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}[x^{t-z}]^T\\
W_{out}^{t+1}=W_{out}^{t}-\epsilon\frac{\partial E}{\partial W_{out}}=W^t_{out}-\epsilon\delta^{out,t}[z^{t}]^T\\
W^{t+1}=W^{t}-\epsilon\frac{\partial E}{\partial W}=W^t-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}[z^{t-z-1}]^T\\
b^{t+1}=b^t-\epsilon\frac{\partial E}{\partial b}=b^t-\epsilon\sum_{z=0}^{T_t}\delta^{t-z}\\
c^{t+1}=c^t-\epsilon\frac{\partial E}{\partial c}=c^t-\epsilon\delta^{out,t}
1.2 実装
2進数の足し算を計算するモデルの実装を行う。
入力は2つの数値(2進数)を並べて1つの配列としたもの、出力はその和である。
誤差は学習が進むほど小さくなり、最終的には0近くとなる。
実際の結果をみてもうまく計算できていることが確認できる。
import numpy as np
from common import functions
import matplotlib.pyplot as plt
def d_tanh(x):
return 1/(np.cosh(x) ** 2)
# データを用意
# 2進数の桁数
binary_dim = 8
# 最大値 + 1
largest_number = pow(2, binary_dim)
# largest_numberまで2進数を用意
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)
input_layer_size = 2
hidden_layer_size = 16
output_layer_size = 1
weight_init_std = 1
learning_rate = 0.1
iters_num = 10000
plot_interval = 100
# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)
# Xavier
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size))
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size))
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size))
# He
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size)) * np.sqrt(2)
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)
# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)
u = np.zeros((hidden_layer_size, binary_dim + 1))
z = np.zeros((hidden_layer_size, binary_dim + 1))
y = np.zeros((output_layer_size, binary_dim))
delta_out = np.zeros((output_layer_size, binary_dim))
delta = np.zeros((hidden_layer_size, binary_dim + 1))
all_losses = []
for i in range(iters_num):
# A, B初期化 (a + b = d)
a_int = np.random.randint(largest_number/2)
a_bin = binary[a_int] # binary encoding
b_int = np.random.randint(largest_number/2)
b_bin = binary[b_int] # binary encoding
# 正解データ
d_int = a_int + b_int
d_bin = binary[d_int]
# 出力バイナリ
out_bin = np.zeros_like(d_bin)
# 時系列全体の誤差
all_loss = 0
# 時系列ループ
for t in range(binary_dim):
# 入力値
X = np.array([a_bin[ - t - 1], b_bin[ - t - 1]]).reshape(1, -1)
# 時刻tにおける正解データ
dd = np.array([d_bin[binary_dim - t - 1]])
u[:,t+1] = np.dot(X, W_in) + np.dot(z[:,t].reshape(1, -1), W)
z[:,t+1] = functions.sigmoid(u[:,t+1])
# z[:,t+1] = functions.relu(u[:,t+1])
# z[:,t+1] = np.tanh(u[:,t+1])
y[:,t] = functions.sigmoid(np.dot(z[:,t+1].reshape(1, -1), W_out))
#誤差
loss = functions.mean_squared_error(dd, y[:,t])
delta_out[:,t] = functions.d_mean_squared_error(dd, y[:,t]) * functions.d_sigmoid(y[:,t])
all_loss += loss
out_bin[binary_dim - t - 1] = np.round(y[:,t])
for t in range(binary_dim)[::-1]:
X = np.array([a_bin[-t-1],b_bin[-t-1]]).reshape(1, -1)
delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_sigmoid(u[:,t+1])
# delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_relu(u[:,t+1])
# delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * d_tanh(u[:,t+1])
# 勾配更新
W_out_grad += np.dot(z[:,t+1].reshape(-1,1), delta_out[:,t].reshape(-1,1))
W_grad += np.dot(z[:,t].reshape(-1,1), delta[:,t].reshape(1,-1))
W_in_grad += np.dot(X.T, delta[:,t].reshape(1,-1))
# 勾配適用
W_in -= learning_rate * W_in_grad
W_out -= learning_rate * W_out_grad
W -= learning_rate * W_grad
W_in_grad *= 0
W_out_grad *= 0
W_grad *= 0
if(i % plot_interval == 0):
all_losses.append(all_loss)
print("iters:" + str(i))
print("Loss:" + str(all_loss))
print("Pred:" + str(out_bin))
print("True:" + str(d_bin))
out_int = 0
for index,x in enumerate(reversed(out_bin)):
out_int += x * pow(2, index)
print(str(a_int) + " + " + str(b_int) + " = " + str(out_int))
print("------------")
lists = range(0, iters_num, plot_interval)
plt.plot(lists, all_losses, label="loss")
plt.show()
時系列データとしてサイン波を使い、予測を行う。
import numpy as np
from common import functions
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
np.random.seed(0)
# sin曲線
round_num = 10
div_num = 500
ts = np.linspace(0, round_num * np.pi, div_num)
f = np.sin(ts)
def d_tanh(x):
return 1/(np.cosh(x)**2 + 1e-4)
# ひとつの時系列データの長さ
maxlen = 2
# sin波予測の入力データ
test_head = [[f[k]] for k in range(0, maxlen)]
data = []
target = []
for i in range(div_num - maxlen):
data.append(f[i: i + maxlen])
target.append(f[i + maxlen])
X = np.array(data).reshape(len(data), maxlen, 1)
D = np.array(target).reshape(len(data), 1)
# データ設定
N_train = int(len(data) * 0.8)
N_validation = len(data) - N_train
x_train, x_test, d_train, d_test = train_test_split(X, D, test_size=N_validation)
input_layer_size = 1
hidden_layer_size = 5
output_layer_size = 1
weight_init_std = 0.01
learning_rate = 0.1
iters_num = 500
# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)
# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)
us = []
zs = []
u = np.zeros(hidden_layer_size)
z = np.zeros(hidden_layer_size)
y = np.zeros(output_layer_size)
delta_out = np.zeros(output_layer_size)
delta = np.zeros(hidden_layer_size)
losses = []
# トレーニング
for i in range(iters_num):
for s in range(x_train.shape[0]):
us.clear()
zs.clear()
z *= 0
# sにおける正解データ
d = d_train[s]
xs = x_train[s]
# 時系列ループ
for t in range(maxlen):
# 入力値
x = xs[t]
u = np.dot(x, W_in) + np.dot(z, W)
us.append(u)
z = np.tanh(u)
zs.append(z)
y = np.dot(z, W_out)
#誤差
loss = functions.mean_squared_error(d, y)
delta_out = functions.d_mean_squared_error(d, y)
delta *= 0
for t in range(maxlen)[::-1]:
delta = (np.dot(delta, W.T) + np.dot(delta_out, W_out.T)) * d_tanh(us[t])
# 勾配更新
W_grad += np.dot(zs[t].reshape(-1,1), delta.reshape(1,-1))
W_in_grad += np.dot(xs[t], delta.reshape(1,-1))
W_out_grad = np.dot(z.reshape(-1,1), delta_out)
# 勾配適用
W -= learning_rate * W_grad
W_in -= learning_rate * W_in_grad
W_out -= learning_rate * W_out_grad.reshape(-1,1)
W_in_grad *= 0
W_out_grad *= 0
W_grad *= 0
# テスト
for s in range(x_test.shape[0]):
z *= 0
# sにおける正解データ
d = d_test[s]
xs = x_test[s]
# 時系列ループ
for t in range(maxlen):
# 入力値
x = xs[t]
u = np.dot(x, W_in) + np.dot(z, W)
z = np.tanh(u)
y = np.dot(z, W_out)
#誤差
loss = functions.mean_squared_error(d, y)
print('loss:', loss, ' d:', d, ' y:', y)
original = np.full(maxlen, None)
pred_num = 200
xs = test_head
# sin波予測
for s in range(0, pred_num):
z *= 0
for t in range(maxlen):
# 入力値
x = xs[t]
u = np.dot(x, W_in) + np.dot(z, W)
z = np.tanh(u)
y = np.dot(z, W_out)
original = np.append(original, y)
xs = np.delete(xs, 0)
xs = np.append(xs, y)
plt.figure()
plt.ylim([-1.5, 1.5])
plt.plot(np.sin(np.linspace(0, round_num* pred_num / div_num * np.pi, pred_num)), linestyle='dotted', color='#aaaaaa')
plt.plot(original, linestyle='dashed', color='black')
plt.show()
条件を変えて結果を確認する。
hidden_layer_size = 10
1.3 確認テスト
RNNの3つの重み
- 入力層から中間層 $W_{in}$
- 中間層から中間層 $W$
- 中間層から出力層 $W_{out}$
中間層から中間層への重み$W$は、現在の中間層から次の中間層を定義するための重みである。
2. LSTM
2.1 要約
RNNには、時系列を遡るほど、勾配が消失するという問題がある。
LSTMは記憶セル$\boldsymbol{c}$を持ち、これはLSTM内だけで完結し他のレイヤへは出力されない。
記憶セル$\boldsymbol{c}t$は、$(\boldsymbol{c}{t-1},\boldsymbol{h}{t-1},\boldsymbol{x}t)$から何らかの計算によって求められる。
隠れ状態$\boldsymbol{h}{t}$は、$\boldsymbol{h}{t}=\tanh{(\boldsymbol{c}_{t})}$によって計算される。
LSTMでは3つのゲート
- outputゲート
- forgetゲート
- inputゲート
が使用される。
outputゲート
outputゲートは、$\tanh{\boldsymbol{c}t}$の各要素に対して、「それらが次時刻の隠れ状態としてどれだけ重要か」を調整し、
次の隠れ状態$\boldsymbol{h}{t}$を司るゲートである。
このoutputゲートの開き具合は、入力$\boldsymbol{x}{t}$と前の状態$\boldsymbol{h}{t-1}$から求める。
\boldsymbol{o}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)})
ここで、$\sigma()$はsigmoid関数を表し、$\boldsymbol{W}{x}^{(o)}$は入力$\boldsymbol{x}{t}$に対する重み、$\boldsymbol{W}{h}^{(o)}$は前時刻の状態$\boldsymbol{h}{t-1}$に対する重みである。
そして、この$\boldsymbol{o}$と$\tanh{(\boldsymbol{c}{t})}$の要素ごとの積を$\boldsymbol{h}{t}$として出力する。
\boldsymbol{h}_{t}=\boldsymbol{o}\odot\tanh{(\boldsymbol{c}_{t})}
ここで$\odot$はアダマール積と呼ばれ、要素ごとの積を計算を表す記号である。
forgetゲート
$\boldsymbol{c}_{t-1}$の記憶の中から、不要な記憶を忘れるためのゲートをforgetゲートと呼ぶ。
forgetゲートの出力は、
\boldsymbol{f}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)})
と求められる。
新しい記憶セル
forgetゲートによって忘れるのみとなるので、新しく覚えるべき情報を追加する。
\boldsymbol{g}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)})
この$\boldsymbol{g}$が前時刻の$\boldsymbol{c}_{t-1}$に加算されることで、新しい記憶が生まれる。
inputゲート
inputゲートは$\boldsymbol{g}$の各要素に重みづけするためのゲートである。
\boldsymbol{i}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)})
この$\boldsymbol{i}$と$\boldsymbol{g}$の要素ごとの積の結果を記憶セルに追加する。
すなわち新しい記憶$\boldsymbol{c}_t$は、
\boldsymbol{c}_t=\boldsymbol{f}\odot\boldsymbol{c}_{t-1}+\boldsymbol{g}\odot\boldsymbol{i}
となる。
LSTMで行う計算をまとめると、
\boldsymbol{f}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)})\\
\boldsymbol{g}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)})\\
\boldsymbol{i}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)})\\
\boldsymbol{o}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)})\\
\boldsymbol{c}_t=\boldsymbol{f}\odot\boldsymbol{c}_{t-1}+\boldsymbol{g}\odot\boldsymbol{i}\\
\boldsymbol{h}_{t}=\boldsymbol{o}\odot\tanh{(\boldsymbol{c}_{t})}
ここで、上から4つのアフィン変換
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(f)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(f)}+\boldsymbol{b}^{(f)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(g)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(g)}+\boldsymbol{b}^{(g)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(i)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(i)}+\boldsymbol{b}^{(i)}\\
\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(o)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(o)}+\boldsymbol{b}^{(o)}
について、
\boldsymbol{x}_{t}\bigl[\boldsymbol{W}_{x}^{(f)}\ \boldsymbol{W}_{x}^{(g)}\ \boldsymbol{W}_{x}^{(i)}\ \boldsymbol{W}_{x}^{(o)}\bigl]+
\boldsymbol{h}_{t-1}\bigl[\boldsymbol{W}_{x}^{(f)}\ \boldsymbol{W}_{x}^{(g)}\ \boldsymbol{W}_{x}^{(i)}\ \boldsymbol{W}_{x}^{(o)}\bigl]+
\bigl[\boldsymbol{b}^{(f)}\ \boldsymbol{b}^{(g)}\ \boldsymbol{b}^{(i)}\ \boldsymbol{b}^{(o)}\bigl]
さらに[]内の行列をまとめて書けば、
\boldsymbol{x}_{t}\boldsymbol{W}_{x}+\boldsymbol{h}_{t-1}\boldsymbol{W}+\boldsymbol{b}
となり、1回の計算で済ませることができる。
2.2 実装
class LSTM:
def __init__(self, Wx, Wh, b):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None
def forward(self, x, h_prev, c_prev):
Wx, Wh, b = self.params
N, H = h_prev.shape
A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b
# slice
f = A[:, :H]
g = A[:, H:2*H]
i = A[:, 2*H:3*H]
o = A[:, 3*H:]
f = sigmomid(f)
g = np.tanh(g)
i = sigmomid(i)
o = sigmomid(o)
c_next = f * c_prev + g * i
h_next = o * np.tanh(c_next)
self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
return h_next, c_next
def backward(self, dh_next, dc_next):
Wx, Wh, b = self.params
x, h_prev, c_prev, i, f, g, o, c_next = self.cache
tanh_c_next = np.tanh(c_next)
ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)
dc_prev = ds * f
di = ds * g
df = ds * c_prev
do = dh_next * tanh_c_next
dg = ds * i
di *= i * (1 - i)
df *= f * (1 - f)
do *= o * (1 - o)
dg *= (1 - g ** 2)
dA = np.hstack((df, dg, di, do))
dWh = np.dot(h_prev.T, dA)
dWx = np.dot(x.T, dA)
db = dA.sum(axis=0)
self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = db
dx = np.dot(dA, Wx.T)
dh_prev = np.dot(dA, Wh.T)
return dx, dh_prev, dc_prev
class TimeLSTM:
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None
self.h, self.c = None, None
self.dh = None
self.stateful = stateful
def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
H = Wh.shape[0]
self.layers = []
hs = np.empty((N, T, H), dtype='f')
if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')
if not self.stateful or self.c is None:
self.c = np.zeros((N, H), dtype='f')
for t in range(T):
layer = LSTM(*self.params)
self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
hs[:, t, :] = self.h
self.layers.append(layer)
return hs
def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D = Wx.shape[0]
dxs = np.empty((N, T, D), dtype='f')
dh, dc = 0, 0
grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
dxs[:, t, :] = dx
for i, grad in enumerate(layer.grads):
grads[i] += grad
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs
def set_state(self, h, c=None):
self.h, self.c = h, c
def reset_state(self):
self.h, self.c = None, None
2.3 演習
1つ前のセルの状態に入力ゲート、忘却ゲートを掛けて足し合わせたものと表現される。
正解は3となる。
3. GRU
3.1 要約
GRUはLSTMと違い、隠れ状態のみを使用する。
また、ゲートは、
- resetゲート
- updateゲート
の2種類となる。
GRUで行う計算をまとめると、
\boldsymbol{z}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(z)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(z)}+\boldsymbol{b}^{(z)})\\
\boldsymbol{r}=\sigma(\boldsymbol{x}_{t}\boldsymbol{W}_{x}^{(r)}+\boldsymbol{h}_{t-1}\boldsymbol{W}_h^{(r)}+\boldsymbol{b}^{(r)})\\
\hat{\boldsymbol{h}}=\tanh(\boldsymbol{x}_{t}\boldsymbol{W}_{x}+(\boldsymbol{r}\odot\boldsymbol{h}_{t-1})\boldsymbol{W}_h+\boldsymbol{b})\\
\boldsymbol{h}_t=(1-\boldsymbol{z})\odot\boldsymbol{h}_{t-1}+\boldsymbol{z}\odot\hat{\boldsymbol{h}}
resetゲートは、過去の隠れ状態をどれだけ無視するか決定する。
updateゲートは2つの役割(LSTMのforgetゲート、inputゲートに相当するもの)をもっており、
forgetゲートとして機能するのは$(1-\boldsymbol{z})\odot\boldsymbol{h}_{t-1}$の箇所、
inputゲートとして機能するのは$\boldsymbol{z}\odot\hat{\boldsymbol{h}}$の箇所である。
3.2 実装
class GRU:
def __init__(self, Wx, Wh, b):
'''
Parameters
----------
Wx: 入力`x`用の重みパラーメタ(3つ分の重みをまとめる)
Wh: 隠れ状態`h`用の重みパラメータ(3つ分の重みをまとめる)
b: バイアス(3つ分のバイアスをまとめる)
'''
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None
def forward(self, x, h_prev):
Wx, Wh, b = self.params
H = Wh.shape[0]
Wxz, Wxr, Wxh = Wx[:, :H], Wx[:, H:2 * H], Wx[:, 2 * H:]
Whz, Whr, Whh = Wh[:, :H], Wh[:, H:2 * H], Wh[:, 2 * H:]
bz, br, bh = b[:H], b[H:2 * H], b[2 * H:]
z = sigmoid(np.dot(x, Wxz) + np.dot(h_prev, Whz) + bz)
r = sigmoid(np.dot(x, Wxr) + np.dot(h_prev, Whr) + br)
h_hat = np.tanh(np.dot(x, Wxh) + np.dot(r*h_prev, Whh) + bh)
h_next = (1-z) * h_prev + z * h_hat
self.cache = (x, h_prev, z, r, h_hat)
return h_next
def backward(self, dh_next):
Wx, Wh, b = self.params
H = Wh.shape[0]
Wxz, Wxr, Wxh = Wx[:, :H], Wx[:, H:2 * H], Wx[:, 2 * H:]
Whz, Whr, Whh = Wh[:, :H], Wh[:, H:2 * H], Wh[:, 2 * H:]
x, h_prev, z, r, h_hat = self.cache
dh_hat =dh_next * z
dh_prev = dh_next * (1-z)
# tanh
dt = dh_hat * (1 - h_hat ** 2)
dbh = np.sum(dt, axis=0)
dWhh = np.dot((r * h_prev).T, dt)
dhr = np.dot(dt, Whh.T)
dWxh = np.dot(x.T, dt)
dx = np.dot(dt, Wxh.T)
dh_prev += r * dhr
# update gate(z)
dz = dh_next * h_hat - dh_next * h_prev
dt = dz * z * (1-z)
dbz = np.sum(dt, axis=0)
dWhz = np.dot(h_prev.T, dt)
dh_prev += np.dot(dt, Whz.T)
dWxz = np.dot(x.T, dt)
dx += np.dot(dt, Wxz.T)
# rest gate(r)
dr = dhr * h_prev
dt = dr * r * (1-r)
dbr = np.sum(dt, axis=0)
dWhr = np.dot(h_prev.T, dt)
dh_prev += np.dot(dt, Whr.T)
dWxr = np.dot(x.T, dt)
dx += np.dot(dt, Wxr.T)
self.dWx = np.hstack((dWxz, dWxr, dWxh))
self.dWh = np.hstack((dWhz, dWhr, dWhh))
self.db = np.hstack((dbz, dbr, dbh))
self.grads[0][...] = self.dWx
self.grads[1][...] = self.dWh
self.grads[2][...] = self.db
return dx, dh_prev
class TimeGRU:
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None
self.h, self.dh = None, None
self.stateful = stateful
def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
H = Wh.shape[0]
self.layers = []
hs = np.empty((N, T, H), dtype='f')
if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')
for t in range(T):
layer = GRU(*self.params)
self.h = layer.forward(xs[:, t, :], self.h)
hs[:, t, :] = self.h
self.layers.append(layer)
return hs
def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D = Wx.shape[0]
dxs = np.empty((N, T, D), dtype='f')
dh = 0
grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
dx, dh = layer.backward(dhs[:, t, :] + dh)
dxs[:, t, :] = dx
for i, grad in enumerate(layer.grads):
grads[i] += grad
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs
def set_state(self, h):
self.h = h
def reset_state(self):
self.h = None
3.3 演習
次の時刻の中間状態は、1時刻前の中間表現と計算された中間表現の線形和で表現される。
正解は4となる。
4. 双方向RNN
4.1 要約
過去の情報だけでなく、未来の情報を加味することで精度を向上させるためのモデル。
4.2 実装
時系列データを反転させたものを、LSTMで順伝搬させる。
そして、その出力を再度反転させることで元の順番に並べ替える。
最後に通常のLSTMの出力と結合(concatenate)させて全体の出力とする。
通常のLSTMと比べて出力の長さは2倍になる。
class TimeBiLSTM:
def __init__(self, Wx1, Wh1, b1,
Wx2, Wh2, b2, stateful=False):
self.forward_lstm = TimeLSTM(Wx1, Wh1, b1, stateful)
self.backward_lstm = TimeLSTM(Wx2, Wh2, b2, stateful)
self.params = self.forward_lstm.params + self.backward_lstm.params
self.grads = self.forward_lstm.grads + self.backward_lstm.grads
def forward(self, xs):
o1 = self.forward_lstm.forward(xs)
o2 = self.backward_lstm.forward(xs[:, ::-1])
o2 = o2[:, ::-1]
out = np.concatenate((o1, o2), axis=2)
return out
def backward(self, dhs):
H = dhs.shape[2] // 2
do1 = dhs[:, :, :H]
do2 = dhs[:, :, H:]
dxs1 = self.forward_lstm.backward(do1)
do2 = do2[:, ::-1]
dxs2 = self.backward_lstm.backward(do2)
dxs2 = dxs2[:, ::-1]
dxs = dxs1 + dxs2
return dxs
4.3 演習
順方向と逆方向に伝播させたときの中間表現を結合したものが出力となるので、正解は4となる。
5. Seq2Seq
5.1 要約
Encoder-Decoderモデルの一種であり、機械対話や機器翻訳に使用される。
Encoder RNN
Encoderの処理としては、
-
vec1をRNNに入力し、hidden stateを出力
このhidden stateと次の入力vec2をまたRNNに入力して、hidden stateを出力という流れを繰り返す。 -
最後のvecを入力したときのhidden stateをfinal stateとする。
このfinal stateがthought vectorと呼ばれ入力した文の意味を表すベクトルとなる。
Decoder RNN
以下の手順で文章を生成する。
- RNNのfinal stateから、各トークンの生成確率を出力
- 生成確率にもとづいてトークンをランダムに選ぶ
- 2で選ばれたトークンをEmbeddingしてDecoder RNNへの次の入力とする
- 1-3を繰り返し、2で得られたトークンを文字列に直す
HRED
過去n-1個の発話から次の発話を生成数る。
Seq2Seqでは、会話の文脈無視で応答がなされたが、HREDでは前の単語の流れに即して応答される。
構造は、Seq2SeqとContext RNNを組み合わせたものとなる。
Context RNNとは、Encoderのまとめた各文章の系列をまとめて、これまでの会話コンテキスト全体を表すベクトルに変換する構造のこと。
HREDでは、
- 会話の流れのような多様性が無い
- さらに、短く情報量に乏しい答えをしやすい傾向がある
などの課題がある。
VHRED
VHREDは、HREDにVAEの潜在変数の概念を追加したもの。
VAE
オートエンコーダとは、教師なし学習の1つ。
例えば、入力画像と同じ画像を出力するニューラルネットワークなど。
VAEでは、潜在変数zに確率分布N(0,1)を仮定したもの。
5.2 実装
EncoderとDecoderの実装を行う。
class Encoder:
def __init__(self, vocab_size, wprdvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4*H).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.params = self.embed.params + self.lstm.params
self.grads = self.embed.grads + self.lstm.grads
self.hs = None
def forward(self, xs):
xs = self.embed.forward(xs)
hs = self.lstm.forward(xs)
self.hs = hs
return hs[:, -1 , :]
def backward(self, dh):
dhs = np.zeros_like(self.hs)
dhs[:, -1, :] = dh
dout = self.lstm.backward(dhs)
dout = self.embed.backward(dout)
return dout
class Decoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4*H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
self.affine = TimeAffine(affine_W, affine_b)
self.params, self.grads = [], []
for layer in (self.embed, self.lstm, self.affine):
self.params += layer.params
self.grads += layer.grads
def forward(self, xs, h):
self.lstm.set_state(h)
out = self.embed.forward(xs)
out = self.lstm.forward(out)
score = self.affine.forward(out)
return score
def backward(self, dscore):
dout = self.affine.backward(dscore)
dout = self.lstm.backward(dout)
dout = self.embed.backward(dout)
dh = self.lstm.dh
return dh
def generate(self, h, start_id, sample_size):
sampled = []
sample_id = start_id
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array(sample_id).reshape((1,1))
out = self.embed.forward(x)
out = self.lstm.forward(out)
score = self.affine.forward(out)
sample_id = np.argmax(score.flatten())
sampled.append(int(sample_id))
return sampled
Seq2Seqの実装を行う。
class Seq2seq(BaseModel):
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
self.encoder = Encoder(V, D, H)
self.decoder = Decoder(V, D, H)
self.softmax = TimeSoftmaxWithLoss()
self.params = self.encoder.params + self.decoder.params
self.grads = self.encoder.grads + self.decoder.grads
def forward(self, xs, ts):
decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:]
h = self.encoder.forward(xs)
score = self.decoder.forward(decoder_xs, h)
loss = self.softmax.forward(score, decoder_ts)
return loss
def backward(self, dout=1):
dout = self.softmax.backward(dout)
dh = self.decoder.backward(dout)
dout = self.encoder.backward(dh)
return dout
def generate(self, xs, start_id, sample_size):
h = self.encoder.forward(xs)
sampled = self.decoder.generate(h, start_id, sample_size)
return sampled
VAE
- 中間層
- パラメータを出力する層
- サンプリングする層
- 出力層
の実装を行う。
class BaseLayer:
def update(self, eta):
self.w -= eta*self.grad_w
self.b -= eta*self.grad_b
class MiddleLayer(BaseLayer):
def __init__(self, n_upper, n):
self.w = np.random.randn(n_upper, n) * np.sqrt(2/n_upper)
se;f.b = np.zeros(n)
def forward(self, x):
self.x = x
self.u = np.dot(x, self.w) + self.b
self.y = np.where(self.u <= 0 ,0, self.u)
def backward(self, grad_y):
delta = grad_y * np.where(self.u <= 0, 0, 1)
self.grad_w = np.dot(self.x.T, delta)
self.grad_b = np.sum(delta, axis=0)
self.grad_x = np.dot(delta, self.w.T)
class ParamsLayer(BaseLayer):
def __init__(self, n_upper, n):
self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper)
self.b = np.zeros(n)
def forward(self, x):
self.x = x
u = np.dot(x, self.w) + self.b
self.y = u
def backward(self, grad_y):
delta = grad_y
self.grad_w = np.dot(self.x.T, delta)
self.grad_b = np.sum(delta, axis=0)
self.grad_x = np.dot(delta, self.w.T)
def OutputLayer(BaseLayer):
def __init__(self, n_upper, n):
self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper)
self.b = np.zeros(n)
def forward(self, x):
self.x = x
u = np.dot(x, self.w) + self.b
self.y = 1/(1+np.exp(-u))
def backward(self, t):
delta = self.y - t
self.grad_w = np.dot(self.x.T, delta)
self.grad_b = np.sum(delta, axis=0)
self.grad_x = np.dot(delta, self.w.T)
class LatentLayer:
def forward(self, mu, log_var):
self.mu = mu
self.log_var = log_var
self.epsilon = np.random.randn(*log_var.shape)
self.z = mu + self.epsilon*np.exp(log_var/2)
def backward(self, grad_z):
self.grad_mu = grad_z + selh.mu
self.grad_log_var = grad_z*self.epsilon/2*np.exp(self.log_var/2) - 0.5*(1-np.exp(self.log_var))
5.3 確認テスト
wはワンホットベクトルであり、単語埋め込みによりエンベディングを行う。
正解は1となる。
6. word2vec
6.1 要約
ニューラルネットワークで単語の処理を行うため、単語を「固定長のベクトル」に変換する必要がある。
その方法の1つとしてone-hot表現がある。これは、要素の中で1つだけ1で、残りはすべて0であるようなベクトルのことである。
例えば 「you」の単語IDが2のとき、one-hot表現は$(0,0,1,0,0,0,0)$のようになる。
ここで、全結合層を考えると入力$\boldsymbol{c}$(one-hot表現)と重み行列$\boldsymbol{W}$の行列の積の計算となる。
このとき出力は、重み行列から該当する場所の行ベクトルを抜き出しただけの値となる。
CBOW(Continuous Bag-of-Words)
word2vecでは、CBOWモデルとskip-gramモデルの2つのモデルが使用されるニューラルネットワークである。
CBOWモデルは、コンテキストからターゲットを推測することを目的としたニューラルネットワークのことである。
CBOWモデルは、入力層が2つあり(コンテキストの数)、中間層を経て、出力層へたどり着く。
入力層から中間層へは、同じ全結合層($\boldsymbol{W_{in}}$)によって行われ、中間層から出力層へは全結合層($\boldsymbol{W_{out}}$)によって行われる。
全結合層により、1つ目の入力層が$\boldsymbol{h_{1}}$、2つ目の入力層が$\boldsymbol{h_{2}}$に変換されたとすると、
中間層のニューロンは$\frac{1}{2}(\boldsymbol{h_{1}}+\boldsymbol{h_{2}})$となる。
出力層には単語の数だけニューロンが存在するが、出力されたスコアにSoftmax関数を適用することで確率が得られる。
全結合層の重み$\boldsymbol{W_{in}}$は、単語の分散表現となる。
中間層のニューロンの数を入力層よりも減らすことで、単語を予測するために必要な情報をコンパクトに納められる。
これはエンコードと呼ばれ、中間層から目的の結果を得る作業はデコードと呼ばれる。
6.2 実装
def preprocess(text):
# 前処理
text = text.lower() # すべての文字を小文字に変換
text = text.replace('.', ' .') # 「.」も1単語として扱うためスペースを入れる。
words = text.split(' ')
# 単語辞書、ID辞書の作成
word_to_id = {} # 単語: ID
id_to_word = {} # ID: 単語
for word in words: # 単語のリスト
if word not in word_to_id: # word_to_idに存在しない場合
new_id = len(word_to_id) # 新しいIDの発行
word_to_id[word] = new_id
id_to_word[new_id] = word
# コーパスの作成(単語リストをIDリストに変換)
corpus = [word_to_id[w] for w in words]
return corpus, word_to_id, id_to_word
def create_contexts_target(corpus, window_size=1):
# windowサイズに合わせてtargetの範囲を決める
target = corpus[window_size:-window_size]
contexts = []
# targetのID
for idx in range(window_size, len(corpus)-window_size):
cs = [] # targetごとにコンテキストのリストを作成
# 左隣から右隣まで
for t in range(-window_size, window_size+1):
# t=0は自分自身
if t == 0:
continue
# IDをcsに追加
cs.append(corpus[idx+t])
# IDリストをcontextsに追加
contexts.append(cs)
return np.array(contexts), np.array(target)
def convert_one_hot(corpus, vocab_size):
'''one-hot表現への変換
:param corpus: 単語IDのリスト(1次元もしくは2次元のNumPy配列)
:param vocab_size: 語彙数
:return: one-hot表現(2次元もしくは3次元のNumPy配列)
'''
N = corpus.shape[0]
# コーパスが1次元配列
if corpus.ndim == 1:
# コーパスの各単語(N)に対し,単語数(vocab_size)の列を用意
one_hot = np.zeros((N, vocab_size), dtype=np.int32)
# コーパスを順に処理
for idx, word_id in enumerate(corpus):
# 該当する単語の位置に1を入れる
one_hot[idx, word_id] = 1
# コーパスが多次元(C次元)
elif corpus.ndim == 2:
C = corpus.shape[1]
# コーパスの各単語(N)に対し,C次元分(コンテキスト数)の単語数(vocab_size)の列を用意
one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
# 1番目の単語から処理していく
for idx_0, word_ids in enumerate(corpus):
# 次元ごとに処理していく
for idx_1, word_id in enumerate(word_ids):
one_hot[idx_0, idx_1, word_id] = 1
return one_hot
class SimpleCBOW:
def __init__(self, vocab_size, hidden_size):
V, H = vocab_size, hidden_size
# 重みの初期化
W_in = 0.01 * np.random.randn(V, H).astype('f')
W_out = 0.01 * np.random.randn(H, V).astype('f')
# レイヤの生成
self.in_layer0 = MatMul(W_in)
self.in_layer1 = MatMul(W_in)
self.out_layer = MatMul(W_out)
self.loss_layer = SoftmaxWithLoss()
# すべての重みと勾配をリストにまとめる
layers = [self.in_layer0, self.in_layer1, self.out_layer]
self.params, self.grads = [], []
for layer in layers:
# 同じ重みを複数のレイヤで共有していることに注意
self.params += layer.params
self.grads += layer.grads
# メンバ変数に単語の分散表現を設定
self.word_vecs = W_in
def forward(self, contexts, target):
h0 = self.in_layer0.forward(contexts[:, 0]) # 1番目の次元はコンテキストのウィンドウサイズ分
h1 = self.in_layer1.forward(contexts[:, 1])
h = (h0 + h1) * 0.5
score = self.out_layer.forward(h)
loss = self.loss_layer.forward(score, target)
return loss
def backward(self, dout=1):
ds = self.loss_layer.backward(dout)
da = self.out_layer.backward(ds)
da *= 0.5 # 「×」の逆伝播
self.in_layer1.backward(da)
self.in_layer0.backward(da)
return None
window_size = 1
hidden_size = 5
batch_size = 3
max_epoch = 1000
eval_interval = 200
text = 'あなた は 私 に さよなら と 言う. 私 は あなた に こんにちは と 言う.'
corpus, word_to_id, id_to_word = preprocess(text)
vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size=1)
target = convert_one_hot(target, vocab_size)
contexts = convert_one_hot(contexts, vocab_size)
model = SimpleCBOW(vocab_size, hidden_size)
optimizer = Adam()
trainer = Trainer(model, optimizer)
trainer.fit(contexts, target, max_epoch, batch_size, eval_interval=100)
trainer.plot()
word_vecs = model.word_vecs
for word_id, word in id_to_word.items():
print(word, word_vecs[word_id])
7. AttentionMechanism
7.1 要約
Encoderの改良
seq2seqでは、Encoderが時系列データを固定長のベクトルに変換した。
どんな長い文章でも、それを固定長のベクトルに変換してしまうことが問題点として挙げられる。
そこで、各時刻の隠れ状態ベクトルをすべて利用することで、入力された単語列と同じ数のベクトル$\boldsymbol{hs}$を得ることができる。
各時刻の隠れ状態には、直前に入力された単語の情報が多く含まれる。
Decoderの改良
Encoderに出力$\boldsymbol{hs}$全てを活用できるように改良を行う。
「翻訳先の単語」と対応関係にある「翻訳元の単語」の情報を選び出すこと(単語の対応関係を表す情報をアライメントと呼ぶ)、
そしてその情報を利用して翻訳を行うことを考える。
Encoderに出力$\boldsymbol{hs}$を受け取ったとき、最終行はこれまで通りDecoderの最初のLSTMレイヤに渡す。
さらに、各時刻のAffineレイヤの前に、各時刻の隠れ状態と$\boldsymbol{hs}$を受け取り「何らかの計算」を行うレイヤを追加する。
各時刻において、Decoderへの入力単語と対応関係にある単語のベクトルを$\boldsymbol{hs}$から選び出させたいが、選び出すという操作は微分ができないという問題点がある。
そこで単語の重要度を表す重み$\boldsymbol{a}$を計算し、$\boldsymbol{hs}$から、重み付き和「コンテキストベクトル」$\boldsymbol{c}$を求める。
7.2 実装
class WeightSum:
def __init__(self):
self.params, self.grads = [], []
self.cache = None
def forward(self, hs, a):
N, T, H = hs.shape
ar = a.reshape(N, T, 1).repeat(H, axis=2)
t = hs * ar
c = np.sum(t, axis=1)
self.cache = (hs, ar)
return c
def backward(self, dc):
hs, ar = self.cache
N, T, H = hs.shape
dt = dc.reshape(N, 1, H).repeat(T, axis=1)
dar = dt * hs
dhs = dt * ar
da = np.sum(dar, axis=2)
return dhs, da
重み$\boldsymbol{a}$の求め方を考える。
LSTMの隠れ状態ベクトルを$\boldsymbol{h}$表すと、類似度は$\boldsymbol{hs}$と$\boldsymbol{h}$の内積により求まる。
この結果を$\boldsymbol{s}$とし、Softmax関数により正規化を行う。
class AttentionWeight:
def __init__(self):
self.params, self.grads = [], []
self.softmax = Softmax()
self.cache = None
def forward(self, hs, h):
N, T, H = hs.shape
hr = h.reshape(N, 1, H).repeat(T, axis=1)
t = hs * hr
s = np.sum(t, axis=2)
a = self.softmax.forward(s)
self.cache = (hs, hr)
return a
def backward(self, da):
hs, hr = self.cache
N, T, H = hs.shape
ds = self.softmax.backward(da)
dt = ds.reshape(N, T, 1).repeat(H, axis=2)
dhs = dt * hr
dhr = dt * hs
dh = np.sum(dhr, axis=1)
return dhs, dh
2つのレイヤを組み合わせる。
Encoderが出力する各単語ベクトル$\boldsymbol{hs}$に対し、Attention Weightレイヤが注意を払い、各単語の重み$\boldsymbol{a}$を求める。
それに続き、Weight Sumレイヤが$\boldsymbol{a}$と$\boldsymbol{hs}$の重み付き和を求め、コンテキストベクトル$\boldsymbol{c}$として出力する。
この一連の計算を行うレイヤをAttentionレイヤと呼ぶこととする。
class Attention:
def __init__(self):
self.params, self.grads = [], []
self.attention_weight_layer = AttentionWeight()
self.weight_sum_layer = WeightSum()
self.attention_weight = None
def forward(self, hs, h):
a = self.attention_weight_layer.forward(hs, h)
out = self.weight_sum_layer.forward(hs, a)
self.attention_weight = a
return out
def backward(self, dout):
dhs0, da = self.weight_sum_layer.backward(dout)
dhs1, dh = self.attention_weight_layer.backward(da)
dhs = dhs0 + dhs1
return dhs, dh
複数のAttentionレイヤをまとめてTime Attentionレイヤを実装する
class TimeAttention:
def __init__(self):
self.params, self.grads = [], []
self.layers = None
self.attention_weights = None
def forward(self, hs_enc, hs_dec):
N, T, H = hs_dec.shape
out = np.empty_like(hs_dec)
self.layers = []
self.attention_weights = []
for t in range(T):
layer = Attention()
out[:, t, :] = layer.forward(hs_enc, hs_dec[:, t, :])
self.layers.append(layer)
self.attention_weights.append(layer.attention_weight)
return out
def backward(self, dout):
N, T, H = dout.shape
dhs_enc = 0
dhs_dec = np.empty_like(dout)
for t in range(T):
layer = self.layers[t]
dhs, dh = layer.backward(dout[:, t, :])
dhs_enc += dhs
dhs_dec[:, t, :] = dh
return dhs_enc, dhs_dec
Attention付きのSeq2Seqの実装を行う。
class AttentionEncoder(Encoder):
def forward(self, xs):
xs = self.embed.forward(xs)
hs = self.lstm.forward(xs)
return hs
def backward(self, dhs):
dout = self.lstm.backward(dhs)
dout = self.embed.backward(dout)
return dout
class AttentionDecoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4*H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4*H).astype('f')
affine_W = (rn(2*H, V) / np.sqrt(2*H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
self.attention = TimeAttention()
self.affine = TimeAffine(affine_W, affine_b)
layers = [self.embed, self.lstm, self.attention, self.affine]
self.params, self.grads = [], []
for layer in layers:
self.params += layer.params
self.grads += layer.grads
def forward(self, xs, enc_hs):
h = enc_hs[:,-1]
self.lstm.set_state(h)
out = self.embed.forward(xs)
dec_hs = self.lstm.forward(out)
c = self.attention.forward(enc_hs, dec_hs)
out = np.concatenate((c, dec_hs), axis=2)
score = self.affine.forward(out)
return score
def backward(self, dscore):
dout = self.affine.backward(dscore)
N, T, H2 = dout.shape
H = H2 // 2
dc, ddec_hs0 = dout[:,:,:H], dout[:,:,H:]
denc_hs, ddec_hs1 = self.attention.backward(dc)
ddec_hs = ddec_hs0 + ddec_hs1
dout = self.lstm.backward(ddec_hs)
dh = self.lstm.dh
denc_hs[:, -1] += dh
self.embed.backward(dout)
return denc_hs
def generate(self, enc_hs, start_id, sample_size):
sampled = []
sample_id = start_id
h = enc_hs[:, -1]
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array([sample_id]).reshape((1, 1))
out = self.embed.forward(x)
dec_hs = self.lstm.forward(out)
c = self.attention.forward(enc_hs, dec_hs)
out = np.concatenate((c, dec_hs), axis=2)
score = self.affine.forward(out)
sample_id = np.argmax(score.flatten())
sampled.append(sample_id)
return sampled
class AttentionSeq2seq(Seq2seq):
def __init__(self, vocab_size, wordvec_size, hidden_size):
args = vocab_size, wordvec_size, hidden_size
self.encoder = AttentionEncoder(*args)
self.decoder = AttentionDecoder(*args)
self.softmax = TimeSoftmaxWithLoss()
self.params = self.encoder.params + self.decoder.params
self.grads = self.encoder.grads + self.decoder.grads
単語の重要度を可視化することができる。