0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LSTM

0
Posted at

LSTM(numpyベース)をAntigravityに出力させました。
ご参考までに。

"""
【LSTM (Long Short-Term Memory) の背景と意図】

作られた背景:
1997年にSepp HochreiterとJürgen Schmidhuberによって提案されました。
通常のRNN(Recurrent Neural Network)は、過去の情報を現在の処理に活かせる画期的なモデルでしたが、
「長期依存性(Long-term dependencies)の学習が難しい」という致命的な欠点がありました。
入力となる時系列データが長くなると、逆伝播(BPTT)で過去へ勾配を戻す際、同じ重み行列を何度も掛け合わせるため、
勾配がゼロに近づいて消えてしまう「勾配消失(Vanishing Gradient)」問題が避けられなかったからです。

どこが肝(核心)なのか:
1. 「記憶セル(Cell State: c)」の導入
   LSTMの最大の革新は、短期記憶(h)とは別に「長期記憶コンベアベルト」である「記憶セル(c)」を作ったことです。
   記憶セルは「足し算」と「要素ごとの掛け算」だけで一直線に更新されます。
   これにより、逆伝播時に微分(勾配)が減衰することなく過去へとストレートに伝わり、「勾配消失」を防ぎます。

2. 情報をコントロールする「ゲート(Gates)」仕組み
   ・忘却ゲート (Forget Gate): 過去の不要になった記憶を捨てる
   ・入力ゲート (Input Gate): 現在の新しい入力のうち、何を記憶に追加するかを決める
   ・出力ゲート (Output Gate): 最新の記憶から、今のステップでの出力(短期記憶)をどれにするか決める
   このゲート機構により、「重要な情報はそのまま保持し続ける」というインテリジェントな記憶の取捨選択が可能になりました。

本コードの意図:
本実装は、PyTorchなどのフレームワークではブラックボックスとなっているLSTMの内部構造を、
NumPyのみでゼロから構築することで、数式の意味や勾配の伝わり方を明示的に可視化・理解することを意図しています。
"""

import numpy as np
def sigmoid(x):
    """シグモイド関数:出力を0.0〜1.0の「割合」に変換します"""
    return 1 / (1 + np.exp(-x))

class LSTM:
    """
    1ステップ(ある時刻t)を処理するLSTM層。
    E資格の試験では「4つのゲートの数式」と「勾配消失を防ぐ仕組み(c_tの実装)」が頻出です!
    """
    def __init__(self, Wx, Wh, b):
        self.params = [Wx, Wh, b]
        # パラメータに対する勾配(修正量)の箱を用意します
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None

    def forward(self, x, h_prev, c_prev):
        """
        順伝播計算(E資格ではこの数式の組み立てが問われます)
        x: 現在の入力
        h_prev: 前の時刻の隠れ状態(短期記憶)
        c_prev: 前の時刻の記憶セル(LSTM特有の長期記憶コンベアベルト)
        """
        Wx, Wh, b = self.params
        N, H = h_prev.shape

        # LSTMの計算量は多いですが、4つのゲートの行列計算を「一度にまとめて(アフィン変換を1回)」行い、
        # 後で4つに分割(スライス)するのが標準的な実装テクニックです。
        A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b

        # 4つのゲート分に均等にスライスします(スライスの順番は実装のお作法によります)
        f = A[:, :H]        # forget(忘却)用のスコア
        g = A[:, H:2*H]     # g または c_candidate(新しい記憶の候補)用のスコア
        i = A[:, 2*H:3*H]   # input(入力)用のスコア
        o = A[:, 3*H:]      # output(出力)用のスコア

        # ----------------------------------------------------
        # 1. 忘却ゲート (Forget Gate): 過去の記憶(c)をどれくらい忘れるか(0に近いほど忘れる)
        f = sigmoid(f)     
        
        # 2. 新しい記憶の候補 (Candidate): 普通のRNNと同じく tanh を使う
        g = np.tanh(g)     
        
        # 3. 入力ゲート (Input Gate): 新しい記憶候補(g)をどれくらい「書き込む」か
        i = sigmoid(i)     
        
        # 4. 出力ゲート (Output Gate): 今回の記憶(c)をどれくらい外の「h」として出力するか
        o = sigmoid(o)     
        # ----------------------------------------------------

        # --- LSTMの核心部(ここが「勾配消失」を防ぐ理由!) ---
        # 記憶セル c_next は行列の掛け算(dot)ではなく「要素ごとの掛け算( * または アダマール積)」と「足し算」だけで更新されます。
        # 単純な足し算のルート(コンベアベルト)があるため、逆伝播時に勾配が途切れず、過去へとロスが伝わりやすくなります。
        c_next = f * c_prev + g * i

        # 次の時刻の隠れ状態(h_next)を作る
        # 更新された記憶セルを tanh で -1〜1 に整え、どれくらい外へ出すか(o)のゲートを通します
        h_next = o * np.tanh(c_next)

        # 逆伝播のために必要な値を全てキャッシュ(保存)しておきます
        self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
        
        return h_next, c_next

    def backward(self, dh_next, dc_next):
        """
        逆伝播計算(BPTT)。
        ここでは微分の連鎖律(CHAIN RULE)を使って、出力で起きた誤差(ズレ)を入力やパラメータに還元していきます。
        """
        Wx, Wh, b = self.params
        x, h_prev, c_prev, i, f, g, o, c_next = self.cache

        tanh_c_next = np.tanh(c_next)

        # 記憶セル c への勾配(ds と置きます)
        # c には「hからの勾配」と「直接の記憶セルからの勾配(dc_next)」の2ルートから誤差が集まってきます
        ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

        # 記憶セルの「過去 (c_prev)」への勾配
        # 要素ごとの掛け算だったため、f(忘却ゲート)を掛けるだけで過去へ伝わります
        dc_prev = ds * f

        # 各ゲートごとの影響度(勾配)の逆算
        di = ds * g
        df = ds * c_prev
        do = dh_next * tanh_c_next
        dg = ds * i

        # 活性化関数の微分
        # sigmoid(x) の微分は x * (1 - x)
        # tanh(x) の微分は (1 - x^2) になるのがE資格頻出の数学ポイントです
        di *= i * (1 - i)
        df *= f * (1 - f)
        do *= o * (1 - o)
        dg *= (1 - g ** 2)

        # 4つの勾配を、横(水平)方向につなぎ合わせて1つの大きな行列(dA)に戻します
        dA = np.hstack((df, dg, di, do))

        # アフィン変換( Y = X * W + b ) の逆伝播。これは全結合層と全く同じです。
        dWh = np.dot(h_prev.T, dA)
        dWx = np.dot(x.T, dA)
        db = dA.sum(axis=0)

        # このLSTM層としてのパラメータ更新量を保存します
        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        # 前の層(下)と前の時間(過去)へ流す勾配
        dx = np.dot(dA, Wx.T)
        dh_prev = np.dot(dA, Wh.T)

        return dx, dh_prev, dc_prev


class TimeLSTM:
    """複数ステップ(T個の入力)をまとめて処理するためのLSTM層
    言語モデル等では、上の `LSTM` を何個も数珠つなぎにした内部構造を持ちます。
    """
    def __init__(self, Wx, Wh, b, stateful=False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None # T個のLSTMセルを格納するリスト

        # LSTMは、h(短期記憶)と、c(長期記憶セル)の両方を伝搬させる必要があります
        self.h, self.c = None, None
        self.dh = None
        self.stateful = stateful # バッチ間で記憶をリセットせず持ち越すか

    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]

        self.layers = []
        hs = np.empty((N, T, H), dtype='f')

        # statefulがFalse(文脈が途切れる)なら、真っさらのゼロ記憶で始めます
        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype='f')
        if not self.stateful or self.c is None:
            self.c = np.zeros((N, H), dtype='f')

        # T 個の時間ステップを順へ進みます
        for t in range(T):
            layer = LSTM(*self.params) # 1ステップ分のLSTMを用意
            
            # 前のステップの h と c を今のステップに渡し、新しい h と c を受け取ります
            self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
            
            hs[:, t, :] = self.h # 全時間のhのログを記録(次のレイヤへ渡すため)
            self.layers.append(layer)

        return hs

    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype='f')
        
        # 逆伝播なので、未来から過去へ伝わる勾配(最初はゼロ)を用意
        dh, dc = 0, 0 
        grads = [0, 0, 0]

        # Tステップを逆順へ遡ります
        for t in reversed(range(T)):
            layer = self.layers[t]
            
            # dhs[:, t, :]:上のレイヤ(Softmax等)から流れてくる勾配
            # dh, dc:自分たちのタイムラインの「未来の時間ステップ」から流れてきた勾配
            # これらを足して、該当ステップのLSTMに流し込みます。
            dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
            dxs[:, t, :] = dx

            # 各ステップのパラメータ勾配を加算
            for i, grad in enumerate(layer.grads):
                grads[i] += grad

        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
            
        self.dh = dh
        return dxs

# リセット機能:別の時系列データを最初から処理する際に、過去の記憶を断ち切るために使います。
    def set_state(self, h, c=None):
        self.h, self.c = h, c

    def reset_state(self):
        self.h, self.c = None, None

if __name__ == "__main__":
    # -------------------------------------------------------------
    # 推論(Inference / Forward)の実行とログ表示のデモ
    # -------------------------------------------------------------
    np.random.seed(42) # 実験の再現性のためシードを固定

    # 1. ハイパーパラメータの設定
    N = 2   # バッチサイズ(一度に処理する系列の数)
    T = 5   # 系列長(タイムステップ数:例:5単語など)
    D = 3   # 入力データの次元数(例:単語の埋め込みベクトルの次元)
    H = 4   # 隠れ状態(LSTMの記憶セル/短期記憶)の次元数

    print(f"=== LSTM 推論(Forward)デモ ===")
    print(f"バッチサイズ={N}, タイムステップ={T}, 入力次元={D}, 隠れ層次元={H}\n")

    # 2. パラメータの初期化 (学習済みモデルの重みを擬似的に用意)
    # Wx: 入力に対する重み (D -> 4*H のアフィン変換用)
    # Wh: 前回の隠れ状態に対する重み (H -> 4*H のアフィン変換用)
    # b: バイアス (4*H 次元)
    # ※ 4*H なのは、f, g, i, o の4つのゲート分の計算を一度にまとめて行うため
    Wx = np.random.randn(D, 4 * H) / np.sqrt(D)
    Wh = np.random.randn(H, 4 * H) / np.sqrt(H)
    b = np.zeros(4 * H)

    # 3. 再帰的につなぐための TimeLSTM インスタンスを作成
    # stateful=True にすると、バッチが変わっても記憶(h, c)を保持し続けます
    time_lstm = TimeLSTM(Wx, Wh, b, stateful=True)

    # 4. ダミーの入力データ (例えば、[2バッチ x 5時間 x 3次元] のランダムデータ)
    xs = np.random.randn(N, T, D)
    print("【入力データ xs】")
    print(f"  Shape: {xs.shape}")
    print(f"  最初のバッチの時刻 t=0 の入力ベクトル: {xs[0, 0, :].round(3)}\n")

    # 5. 推論の実行(順伝播)
    print("--- 順伝播(Forward)を実行 ---")
    # xsを与えると、すべての時間ステップの隠れ状態 hs が計算されて返ってくる
    hs = time_lstm.forward(xs)

    # 6. 推論結果(ログ)の確認
    print("\n【推論結果(全ステップの隠れ状態) hs】")
    print(f"  Shape: {hs.shape} -> (バッチサイズ, タイムステップ, 隠れ層次元)")
    print(f"  バッチ0 の全ステップ出力:\n{np.round(hs[0], 3)}\n")

    # LSTMの内部に保持されている「最新の記憶」を見てみる
    print("【現在保持されている最新の記憶(t=5 終了時点)】")
    print(f"  短期記憶 (h): shape={time_lstm.h.shape}\n{np.round(time_lstm.h, 3)}")
    print(f"  長期記憶 (c): shape={time_lstm.c.shape}\n{np.round(time_lstm.c, 3)}\n")

    # 追加推論テスト(連続した文脈として次のステップを入力した場合)
    xs_next = np.random.randn(N, 1, D) # 次の1ステップ分
    hs_next = time_lstm.forward(xs_next)
    print("【追加推論テスト:次の1ステップを入力】")
    print(f"  追加後の最新の短期記憶(h):\n{np.round(hs_next[:, 0, :], 3)}")
    print("  ※前の記憶(c, h)が引き継がれているため、文脈を維持して推論が行われます。")
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?