ゼロから作るDeep Learning⓶のRNNLMをAntigravityを使ってコメント付きで出力させました。
勉強の参考にしてください。
import numpy as np
# --- レイヤの実装 ---
# 深層学習では「レイヤ(層)」という単位で処理をブロック化します。
# これにより、ブロックをレゴのように繋げるだけで複雑なネットワークを構築できるようになります。
class RNN:
"""1ステップ(1つの時間t)だけを処理する単体のRNN層"""
def __init__(self, Wx, Wh, b):
# Wx: 入力xに対する重み, Wh: 1つ前の隠れ状態hに対する重み, b: バイアス
self.params = [Wx, Wh, b] # この層が持つ学習パラメータをリストにまとめる
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] # 勾配(パラメータの更新量)を入れる箱
self.cache = None # 逆伝播(学習時)に使うための中間データを保存する場所
def forward(self, x, h_prev):
"""順伝播(推論・計算の実行)
x: 現在の入力, h_prev: 1時刻前の隠れ状態(過去の記憶)
"""
Wx, Wh, b = self.params
# RNNのコア計算: 現在の入力と過去の記憶を合わせて、新しい記憶(状態)を作る
t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
# tanh(ハイパボリックタンジェント)関数を使って、値を -1 〜 1 の間に収め、非線形性を持たせる
h_next = np.tanh(t)
# 逆伝播で計算を遡るために、今回の入力と状態を記録しておく
self.cache = (x, h_prev, h_next)
return h_next
def backward(self, dh_next):
"""逆伝播(学習のための勾配=修正量の計算)
dh_next: 次の層(または未来の時間)から伝わってきた「誤差(どれくらい間違っていたか)」
"""
Wx, Wh, b = self.params
x, h_prev, h_next = self.cache
# tanhの微分(逆算)。y = tanh(x) の微分は 1 - y^2 になるという性質を利用
dt = dh_next * (1 - h_next ** 2)
# バイアス(b)の勾配。バッチ方向に合計する
db = np.sum(dt, axis=0)
# 隠れ状態への重み(Wh)と、入力への重み(Wx)の勾配
dWh = np.dot(h_prev.T, dt)
dWx = np.dot(x.T, dt)
# 1つ前の時間(過去)へ伝える誤差
dh_prev = np.dot(dt, Wh.T)
# 前の層へ伝える誤差
dx = np.dot(dt, Wx.T)
# 計算した勾配を保存しておく(後でまとめて更新するため)
self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = db
return dx, dh_prev
class TimeRNN:
"""Tステップ分(複数時刻分)のRNNをまとめて1つの層として扱うためのクラス
言語モデルは一度に複数の単語の並び(ブロック)を処理するため、この層が必要です。
"""
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None # 内部に持つ複数のRNN層を格納するリスト
self.h, self.dh = None, None
# statefulがTrueの場合、前回のバッチ処理の終わりの「記憶」を、次のバッチの最初に引き継ぐ(長い文脈を覚える)
self.stateful = stateful
def set_state(self, h):
"""外部から隠れ状態(記憶)を強制的にセットする"""
self.h = h
def reset_state(self):
"""隠れ状態(記憶)をリセットし、初期状態に戻す"""
self.h = None
def forward(self, xs):
"""順伝播(Tステップ分の入力をまとめて処理)
xs: 入力データ。(データの個数N, 時系列の長さT, 単語ベクトルの次元D) の3次元配列
"""
Wx, Wh, b = self.params
N, T, D = xs.shape
D, H = Wx.shape # Hは隠れ層のサイズ
self.layers = []
hs = np.empty((N, T, H), dtype='f') # Tステップ分のすべての隠れ状態を格納する箱
# 記憶を引き継がない設定、または最初の処理の場合は、記憶(h)をゼロで初期化
if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')
# 時系列(T)の数だけループして、内部のRNN層に1ステップずつ渡していく
for t in range(T):
layer = RNN(*self.params) # 1ステップ分のRNN層を作成
self.h = layer.forward(xs[:, t, :], self.h) # 今の単語と過去の記憶を渡して、新しい記憶を得る
hs[:, t, :] = self.h
self.layers.append(layer) # 逆伝播で使うためにレイヤを取っておく
return hs
def backward(self, dhs):
"""RNN全体での誤差逆伝播(BPTT: Backpropagation Through Time)"""
Wx, Wh, b = self.params
N, T, H = dhs.shape
D, H = Wx.shape
dxs = np.empty((N, T, D), dtype='f')
dh = 0 # 未来から過去へ伝わってくる勾配(一番最初は未来がないので0)
grads = [0, 0, 0]
# Tステップを「過去に向かって(逆順に)」誤差を伝えていく
for t in reversed(range(T)):
layer = self.layers[t]
# 上の層から来た誤差(dhs[:, t, :])と、1つ未来から来た誤差(dh)を足して逆伝播
dx, dh = layer.backward(dhs[:, t, :] + dh)
dxs[:, t, :] = dx
# 各ステップでの重みの更新量をすべて足し合わせる
for i, grad in enumerate(layer.grads):
grads[i] += grad
# 足し合わせた更新量をこの層の勾配として保存
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh # 最終的にTステップ前までたどり着いた誤差を保存しておく
return dxs
class TimeEmbedding:
"""単語ID(数値)を、意味を持つ密なベクトル(分散表現)に変換する層"""
def __init__(self, W):
self.params = [W] # Wは単語の辞書(全ての単語のベクトルが並んだ行列)
self.grads = [np.zeros_like(W)]
self.layers = None
self.W = W
def forward(self, xs):
"""入ってきた単語ID(xs)に対応するベクトルを、重みWから「引き抜く」だけ"""
N, T = xs.shape
V, D = self.W.shape
out = np.empty((N, T, D), dtype='f')
self.layers = []
for t in range(T):
idx = xs[:, t] # ある時刻tの単語ID
out[:, t, :] = self.W[idx] # 対応するベクトルを抽出
self.layers.append(idx)
return out
def backward(self, dout):
"""Embedding層の逆伝播は、使われた単語ベクトルの部分だけに誤差を反映する"""
N, T, D = dout.shape
dW = np.zeros_like(self.W)
for t in range(T):
idx = self.layers[t]
for i, word_id in enumerate(idx):
dW[word_id] += dout[i, t, :] # 同じ単語が複数回使われた場合は勾配を加算
self.grads[0][...] = dW
return None
class TimeAffine:
"""RNNの出力を、語彙数と同じサイズに変換する全結合層(予測スコアを出すため)"""
def __init__(self, W, b):
self.params = [W, b]
self.grads = [np.zeros_like(W), np.zeros_like(b)]
self.x = None
def forward(self, x):
N, T, D = x.shape
W, b = self.params
# 行列の掛け算をしやすくするため、NとTの次元を一度平坦化(reshape)する
rx = x.reshape(N*T, -1)
# 内積(dot)による掛け算とバイアスの加算。これが全結合層のメイン処理
out = np.dot(rx, W) + b
self.x = x
# 形を(N, T, 出力次元)に戻す
return out.reshape(N, T, -1)
def backward(self, dout):
"""通常の行列の逆算"""
x = self.x
N, T, D = x.shape
W, b = self.params
dout = dout.reshape(N*T, -1)
rx = x.reshape(N*T, -1)
db = np.sum(dout, axis=0)
dW = np.dot(rx.T, dout)
dx = np.dot(dout, W.T)
dx = dx.reshape(N, T, -1)
self.grads[0][...] = dW
self.grads[1][...] = db
return dx
class TimeSoftmaxWithLoss:
"""最終的な予測スコアを確率に変換(Softmax)し、正解との誤差(Loss)を計算する層"""
def __init__(self):
self.params, self.grads = [], []
self.cache = None
def forward(self, xs, ts):
"""
xs: モデルが出した各単語の予測スコア
ts: 正解の単語ID
"""
N, T, V = xs.shape
if ts.ndim == 3:
ts = ts.argmax(axis=2)
ts = ts.reshape(N * T)
xs = xs.reshape(N * T, V)
# --- Softmaxの計算 ---
# 指数関数(exp)を使う際、値が大きすぎると無限大(オーバーフロー)になるため、最大値を引いてから計算
xs = xs - xs.max(axis=1, keepdims=True)
xs = np.exp(xs)
ys = xs / xs.sum(axis=1, keepdims=True) # 各単語の確率(和が1になる)が計算される
# --- Cross Entropy Error (交差エントロピー誤差)の計算 ---
# 正解だった単語の「予測確率だけ」を取り出して対数(log)をとる
# 1e-7 を足しているのは、log(0)がマイナス無限大になって計算が壊れるのを防ぐため
ls = np.log(ys[np.arange(N * T), ts] + 1e-7)
loss = -np.sum(ls) / (N * T) # 平均誤差
self.cache = (ys, ts, (N, T, V))
return loss
def backward(self, dout=1):
"""Softmax-with-Lossレイヤの逆伝播は非常に美しく、手抜き計算ができる特徴があります。
(モデルの予測確率 - 正解ラベル(1.0)) がそのまま誤差として前の層に渡ります。
"""
ys, ts, (N, T, V) = self.cache
dx = ys.copy()
# 正解ラベルの場所だけ確率から1を引き、残りは予測確率そのものを誤差とする
dx[np.arange(N * T), ts] -= 1
dx *= dout
dx /= (N * T)
dx = dx.reshape(N, T, V)
return dx
# --- 単純なRNNLMの構築 ---
# ここまで作った「ブロック」を組み合わせて、1つの言語モデル(AI)を作ります。
class SimpleRnnlm:
def __init__(self, vocab_size, wordvec_size, hidden_size):
# V: 扱う単語の総数, D: 単語ベクトルの次元数, H: 隠れ層の細胞数
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
# AIが学習を開始する前の、最初のデタラメな重み(パラメータ)を生成します
# 割る数(100やsqrt)は、学習がスムーズに進むための「初期値の工夫(Xavierの初期値など)」です
embed_W = (rn(V, D) / 100).astype('f')
rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
rnn_Wh = (rn(H, H) / np.sqrt(H)).astype('f')
rnn_b = np.zeros(H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
# 用意したレイヤを、データの流れ順に繋げてリスト(self.layers)に入れます
# 単語入力 -> 分散表現(ベクトル) -> RNNの時系列処理 -> スコア予測
self.layers = [
TimeEmbedding(embed_W),
TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful=True), # 文脈を繋ぐためにstateful=True
TimeAffine(affine_W, affine_b)
]
self.loss_layer = TimeSoftmaxWithLoss() # 最後の誤差計算層だけは特別扱い
self.rnn_layer = self.layers[1]
# 学習時に一気に重みを更新できるよう、全レイヤのパラメータと勾配を一つのリストに集約します
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads
def forward(self, xs, ts):
"""全体を貫通する順伝播。データxsを入れてロス(誤差)を計算します"""
# 順番にレイヤを通して変換していく
for layer in self.layers:
xs = layer.forward(xs)
# 最後に誤差レイヤを通して、正解とのズレを算出
loss = self.loss_layer.forward(xs, ts)
return loss
def backward(self, dout=1):
"""全体を逆走する逆伝播。誤差から、各重みの修正すべき量(勾配)を求めます"""
dout = self.loss_layer.backward(dout)
# 層を後ろから前へ、逆順に遡る
for layer in reversed(self.layers):
dout = layer.backward(dout)
return dout
def reset_state(self):
"""新しい文章を読み始める時にメモリ(文脈)をリセットする"""
self.rnn_layer.reset_state()
# --- 学習スクリプト ---
def preprocess(text):
"""テキストを前処理し、単語IDのリストに変換する関数"""
text = text.lower() # 全て小文字に
text = text.replace('.', ' .') # ピリオドも一つの単語として扱うためスペースを開ける
words = text.split(' ') # スペース区切りで単語のリストにする
word_to_id = {} # 単語 -> IDの辞書
id_to_word = {} # ID -> 単語の辞書
# 未知の単語が出てくるたびに、新しいIDを付与する
for word in words:
if word not in word_to_id:
new_id = len(word_to_id)
word_to_id[word] = new_id
id_to_word[new_id] = word
# 元のテキストを、単語IDの数値の並びに変換
corpus = np.array([word_to_id[w] for w in words])
return corpus, word_to_id, id_to_word
if __name__ == '__main__':
# 1. 学習するコーパス(データ)の準備
text = "You say goodbye and I say hello."
corpus, word_to_id, id_to_word = preprocess(text)
# 2. AIの設計図(ハイパーパラメータ)の設定
batch_size = 1 # 1回に学習する文の数
wordvec_size = 8 # 1つの単語をいくつの数字(次元)のベクトルで表現するか
hidden_size = 8 # RNNの記憶容量の大きさ
time_size = 2 # Truncated BPTT: 過去何単語分まで遡って学習するか
lr = 0.5 # 学習率 (1歩でどれくらい大きく重みを修正するか)
max_epoch = 300 # 全データセットを何周学習させるか
vocab_size = len(word_to_id) # 存在する単語の総数
# 3. モデルの実体化
model = SimpleRnnlm(vocab_size, wordvec_size, hidden_size)
# 4. 入力(xs)と正解(ts)の作成。RNNLMでは「次の単語」が正解になります。
# 例: xs = ["You", "say", "goodbye"]
# ts = ["say", "goodbye", "and"]
xs = corpus[:-1]
ts = corpus[1:]
data_size = len(xs)
print("=== RNNLM 学習開始 ===")
print(f"コーパス: '{text}'")
# 5. 学習ループ
for epoch in range(max_epoch):
model.reset_state() # 1周ごとに文脈をリセット
loss_sum = 0
iters = 0
# ブロック(time_size)ごとに少しずつ進めながら学習
for t in range(0, data_size - time_size + 1, time_size):
# バッチの切り出し
batch_x = xs[t:t+time_size].reshape(1, time_size)
batch_t = ts[t:t+time_size].reshape(1, time_size)
# ① 順伝播(推論)してロス(どれくらい間違えたか)を計算
loss = model.forward(batch_x, batch_t)
# ② 逆伝播を実行し、各レイヤのパラメータの「修正すべき量(誤差)」を求める
model.backward()
# ③ パラメータ更新 (SGD: 確率的勾配降下法)
# 現在のパラメータから、(学習率 × 更新量) を引き算して微修正する
for param, grad in zip(model.params, model.grads):
param -= lr * grad
loss_sum += loss
iters += 1
avg_loss = loss_sum / iters
# 20 Epochごとに、学習がどれくらい進んだかを出力
if (epoch+1) % 20 == 0:
# パプレキシティ(Perplexity)は e^(Loss) で計算できる指標。
# 「次に来る単語の候補を、平均して何個まで絞り込めているか」を表します。1.0なら完璧。
perplexity = np.exp(avg_loss)
print(f"Epoch {epoch+1:3d} | Loss: {avg_loss:.4f} | Perplexity: {perplexity:.4f}")
# --- 生成(推論)テスト ---
# 学習したAIに、最初の単語だけを与えて続きを予想させます。
print("\n=== 単語生成ジェネレート ===")
# スタート単語 "you" をAIに与える
word_id = word_to_id['you']
print(id_to_word[word_id], end=' ')
for _ in range(6): # 次の6単語を生成
x = np.array([[word_id]]) # 現在の単語ID
# AIの中(各レイヤ)を順番に通過させて予測スコアを計算
for layer in model.layers:
x = layer.forward(x)
p = x[0, 0, :] # Softmaxに通す前のスコアの羅列
# スコアを確率(0.0〜1.0)に変換。合計が1になる
p = np.exp(p - np.max(p)) / np.sum(np.exp(p - np.max(p)))
# 最も確率が一番高い(=AIが一番自信のある)単語IDを取得する (ここでは単純な貪欲法を使用)
word_id = np.argmax(p)
print(id_to_word[word_id], end=' ') # 生成された単語を表示し、これが次のループの入力になる
print("\n")