LSTM(numpyベース)をAntigravityに出力させました。
ご参考までに。
"""
【LSTM (Long Short-Term Memory) の背景と意図】
作られた背景:
1997年にSepp HochreiterとJürgen Schmidhuberによって提案されました。
通常のRNN(Recurrent Neural Network)は、過去の情報を現在の処理に活かせる画期的なモデルでしたが、
「長期依存性(Long-term dependencies)の学習が難しい」という致命的な欠点がありました。
入力となる時系列データが長くなると、逆伝播(BPTT)で過去へ勾配を戻す際、同じ重み行列を何度も掛け合わせるため、
勾配がゼロに近づいて消えてしまう「勾配消失(Vanishing Gradient)」問題が避けられなかったからです。
どこが肝(核心)なのか:
1. 「記憶セル(Cell State: c)」の導入
LSTMの最大の革新は、短期記憶(h)とは別に「長期記憶コンベアベルト」である「記憶セル(c)」を作ったことです。
記憶セルは「足し算」と「要素ごとの掛け算」だけで一直線に更新されます。
これにより、逆伝播時に微分(勾配)が減衰することなく過去へとストレートに伝わり、「勾配消失」を防ぎます。
2. 情報をコントロールする「ゲート(Gates)」仕組み
・忘却ゲート (Forget Gate): 過去の不要になった記憶を捨てる
・入力ゲート (Input Gate): 現在の新しい入力のうち、何を記憶に追加するかを決める
・出力ゲート (Output Gate): 最新の記憶から、今のステップでの出力(短期記憶)をどれにするか決める
このゲート機構により、「重要な情報はそのまま保持し続ける」というインテリジェントな記憶の取捨選択が可能になりました。
本コードの意図:
本実装は、PyTorchなどのフレームワークではブラックボックスとなっているLSTMの内部構造を、
NumPyのみでゼロから構築することで、数式の意味や勾配の伝わり方を明示的に可視化・理解することを意図しています。
"""
import numpy as np
def sigmoid(x):
"""シグモイド関数:出力を0.0〜1.0の「割合」に変換します"""
return 1 / (1 + np.exp(-x))
class LSTM:
"""
1ステップ(ある時刻t)を処理するLSTM層。
E資格の試験では「4つのゲートの数式」と「勾配消失を防ぐ仕組み(c_tの実装)」が頻出です!
"""
def __init__(self, Wx, Wh, b):
self.params = [Wx, Wh, b]
# パラメータに対する勾配(修正量)の箱を用意します
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None
def forward(self, x, h_prev, c_prev):
"""
順伝播計算(E資格ではこの数式の組み立てが問われます)
x: 現在の入力
h_prev: 前の時刻の隠れ状態(短期記憶)
c_prev: 前の時刻の記憶セル(LSTM特有の長期記憶コンベアベルト)
"""
Wx, Wh, b = self.params
N, H = h_prev.shape
# LSTMの計算量は多いですが、4つのゲートの行列計算を「一度にまとめて(アフィン変換を1回)」行い、
# 後で4つに分割(スライス)するのが標準的な実装テクニックです。
A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b
# 4つのゲート分に均等にスライスします(スライスの順番は実装のお作法によります)
f = A[:, :H] # forget(忘却)用のスコア
g = A[:, H:2*H] # g または c_candidate(新しい記憶の候補)用のスコア
i = A[:, 2*H:3*H] # input(入力)用のスコア
o = A[:, 3*H:] # output(出力)用のスコア
# ----------------------------------------------------
# 1. 忘却ゲート (Forget Gate): 過去の記憶(c)をどれくらい忘れるか(0に近いほど忘れる)
f = sigmoid(f)
# 2. 新しい記憶の候補 (Candidate): 普通のRNNと同じく tanh を使う
g = np.tanh(g)
# 3. 入力ゲート (Input Gate): 新しい記憶候補(g)をどれくらい「書き込む」か
i = sigmoid(i)
# 4. 出力ゲート (Output Gate): 今回の記憶(c)をどれくらい外の「h」として出力するか
o = sigmoid(o)
# ----------------------------------------------------
# --- LSTMの核心部(ここが「勾配消失」を防ぐ理由!) ---
# 記憶セル c_next は行列の掛け算(dot)ではなく「要素ごとの掛け算( * または アダマール積)」と「足し算」だけで更新されます。
# 単純な足し算のルート(コンベアベルト)があるため、逆伝播時に勾配が途切れず、過去へとロスが伝わりやすくなります。
c_next = f * c_prev + g * i
# 次の時刻の隠れ状態(h_next)を作る
# 更新された記憶セルを tanh で -1〜1 に整え、どれくらい外へ出すか(o)のゲートを通します
h_next = o * np.tanh(c_next)
# 逆伝播のために必要な値を全てキャッシュ(保存)しておきます
self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
return h_next, c_next
def backward(self, dh_next, dc_next):
"""
逆伝播計算(BPTT)。
ここでは微分の連鎖律(CHAIN RULE)を使って、出力で起きた誤差(ズレ)を入力やパラメータに還元していきます。
"""
Wx, Wh, b = self.params
x, h_prev, c_prev, i, f, g, o, c_next = self.cache
tanh_c_next = np.tanh(c_next)
# 記憶セル c への勾配(ds と置きます)
# c には「hからの勾配」と「直接の記憶セルからの勾配(dc_next)」の2ルートから誤差が集まってきます
ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)
# 記憶セルの「過去 (c_prev)」への勾配
# 要素ごとの掛け算だったため、f(忘却ゲート)を掛けるだけで過去へ伝わります
dc_prev = ds * f
# 各ゲートごとの影響度(勾配)の逆算
di = ds * g
df = ds * c_prev
do = dh_next * tanh_c_next
dg = ds * i
# 活性化関数の微分
# sigmoid(x) の微分は x * (1 - x)
# tanh(x) の微分は (1 - x^2) になるのがE資格頻出の数学ポイントです
di *= i * (1 - i)
df *= f * (1 - f)
do *= o * (1 - o)
dg *= (1 - g ** 2)
# 4つの勾配を、横(水平)方向につなぎ合わせて1つの大きな行列(dA)に戻します
dA = np.hstack((df, dg, di, do))
# アフィン変換( Y = X * W + b ) の逆伝播。これは全結合層と全く同じです。
dWh = np.dot(h_prev.T, dA)
dWx = np.dot(x.T, dA)
db = dA.sum(axis=0)
# このLSTM層としてのパラメータ更新量を保存します
self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = db
# 前の層(下)と前の時間(過去)へ流す勾配
dx = np.dot(dA, Wx.T)
dh_prev = np.dot(dA, Wh.T)
return dx, dh_prev, dc_prev
class TimeLSTM:
"""複数ステップ(T個の入力)をまとめて処理するためのLSTM層
言語モデル等では、上の `LSTM` を何個も数珠つなぎにした内部構造を持ちます。
"""
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None # T個のLSTMセルを格納するリスト
# LSTMは、h(短期記憶)と、c(長期記憶セル)の両方を伝搬させる必要があります
self.h, self.c = None, None
self.dh = None
self.stateful = stateful # バッチ間で記憶をリセットせず持ち越すか
def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
H = Wh.shape[0]
self.layers = []
hs = np.empty((N, T, H), dtype='f')
# statefulがFalse(文脈が途切れる)なら、真っさらのゼロ記憶で始めます
if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')
if not self.stateful or self.c is None:
self.c = np.zeros((N, H), dtype='f')
# T 個の時間ステップを順へ進みます
for t in range(T):
layer = LSTM(*self.params) # 1ステップ分のLSTMを用意
# 前のステップの h と c を今のステップに渡し、新しい h と c を受け取ります
self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
hs[:, t, :] = self.h # 全時間のhのログを記録(次のレイヤへ渡すため)
self.layers.append(layer)
return hs
def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D = Wx.shape[0]
dxs = np.empty((N, T, D), dtype='f')
# 逆伝播なので、未来から過去へ伝わる勾配(最初はゼロ)を用意
dh, dc = 0, 0
grads = [0, 0, 0]
# Tステップを逆順へ遡ります
for t in reversed(range(T)):
layer = self.layers[t]
# dhs[:, t, :]:上のレイヤ(Softmax等)から流れてくる勾配
# dh, dc:自分たちのタイムラインの「未来の時間ステップ」から流れてきた勾配
# これらを足して、該当ステップのLSTMに流し込みます。
dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
dxs[:, t, :] = dx
# 各ステップのパラメータ勾配を加算
for i, grad in enumerate(layer.grads):
grads[i] += grad
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs
# リセット機能:別の時系列データを最初から処理する際に、過去の記憶を断ち切るために使います。
def set_state(self, h, c=None):
self.h, self.c = h, c
def reset_state(self):
self.h, self.c = None, None
if __name__ == "__main__":
# -------------------------------------------------------------
# 推論(Inference / Forward)の実行とログ表示のデモ
# -------------------------------------------------------------
np.random.seed(42) # 実験の再現性のためシードを固定
# 1. ハイパーパラメータの設定
N = 2 # バッチサイズ(一度に処理する系列の数)
T = 5 # 系列長(タイムステップ数:例:5単語など)
D = 3 # 入力データの次元数(例:単語の埋め込みベクトルの次元)
H = 4 # 隠れ状態(LSTMの記憶セル/短期記憶)の次元数
print(f"=== LSTM 推論(Forward)デモ ===")
print(f"バッチサイズ={N}, タイムステップ={T}, 入力次元={D}, 隠れ層次元={H}\n")
# 2. パラメータの初期化 (学習済みモデルの重みを擬似的に用意)
# Wx: 入力に対する重み (D -> 4*H のアフィン変換用)
# Wh: 前回の隠れ状態に対する重み (H -> 4*H のアフィン変換用)
# b: バイアス (4*H 次元)
# ※ 4*H なのは、f, g, i, o の4つのゲート分の計算を一度にまとめて行うため
Wx = np.random.randn(D, 4 * H) / np.sqrt(D)
Wh = np.random.randn(H, 4 * H) / np.sqrt(H)
b = np.zeros(4 * H)
# 3. 再帰的につなぐための TimeLSTM インスタンスを作成
# stateful=True にすると、バッチが変わっても記憶(h, c)を保持し続けます
time_lstm = TimeLSTM(Wx, Wh, b, stateful=True)
# 4. ダミーの入力データ (例えば、[2バッチ x 5時間 x 3次元] のランダムデータ)
xs = np.random.randn(N, T, D)
print("【入力データ xs】")
print(f" Shape: {xs.shape}")
print(f" 最初のバッチの時刻 t=0 の入力ベクトル: {xs[0, 0, :].round(3)}\n")
# 5. 推論の実行(順伝播)
print("--- 順伝播(Forward)を実行 ---")
# xsを与えると、すべての時間ステップの隠れ状態 hs が計算されて返ってくる
hs = time_lstm.forward(xs)
# 6. 推論結果(ログ)の確認
print("\n【推論結果(全ステップの隠れ状態) hs】")
print(f" Shape: {hs.shape} -> (バッチサイズ, タイムステップ, 隠れ層次元)")
print(f" バッチ0 の全ステップ出力:\n{np.round(hs[0], 3)}\n")
# LSTMの内部に保持されている「最新の記憶」を見てみる
print("【現在保持されている最新の記憶(t=5 終了時点)】")
print(f" 短期記憶 (h): shape={time_lstm.h.shape}\n{np.round(time_lstm.h, 3)}")
print(f" 長期記憶 (c): shape={time_lstm.c.shape}\n{np.round(time_lstm.c, 3)}\n")
# 追加推論テスト(連続した文脈として次のステップを入力した場合)
xs_next = np.random.randn(N, 1, D) # 次の1ステップ分
hs_next = time_lstm.forward(xs_next)
print("【追加推論テスト:次の1ステップを入力】")
print(f" 追加後の最新の短期記憶(h):\n{np.round(hs_next[:, 0, :], 3)}")
print(" ※前の記憶(c, h)が引き継がれているため、文脈を維持して推論が行われます。")