0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RNNLM の勉強

0
Posted at

ゼロから作るDeep Learning⓶のRNNLMをAntigravityを使ってコメント付きで出力させました。
勉強の参考にしてください。

import numpy as np

# --- レイヤの実装 ---
# 深層学習では「レイヤ(層)」という単位で処理をブロック化します。
# これにより、ブロックをレゴのように繋げるだけで複雑なネットワークを構築できるようになります。

class RNN:
    """1ステップ(1つの時間t)だけを処理する単体のRNN層"""
    def __init__(self, Wx, Wh, b):
        # Wx: 入力xに対する重み, Wh: 1つ前の隠れ状態hに対する重み, b: バイアス
        self.params = [Wx, Wh, b] # この層が持つ学習パラメータをリストにまとめる
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] # 勾配(パラメータの更新量)を入れる箱
        self.cache = None # 逆伝播(学習時)に使うための中間データを保存する場所

    def forward(self, x, h_prev):
        """順伝播(推論・計算の実行)
        x: 現在の入力, h_prev: 1時刻前の隠れ状態(過去の記憶)
        """
        Wx, Wh, b = self.params
        
        # RNNのコア計算: 現在の入力と過去の記憶を合わせて、新しい記憶(状態)を作る
        t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
        
        # tanh(ハイパボリックタンジェント)関数を使って、値を -1 〜 1 の間に収め、非線形性を持たせる
        h_next = np.tanh(t)

        # 逆伝播で計算を遡るために、今回の入力と状態を記録しておく
        self.cache = (x, h_prev, h_next)
        return h_next

    def backward(self, dh_next):
        """逆伝播(学習のための勾配=修正量の計算)
        dh_next: 次の層(または未来の時間)から伝わってきた「誤差(どれくらい間違っていたか)」
        """
        Wx, Wh, b = self.params
        x, h_prev, h_next = self.cache

        # tanhの微分(逆算)。y = tanh(x) の微分は 1 - y^2 になるという性質を利用
        dt = dh_next * (1 - h_next ** 2)
        
        # バイアス(b)の勾配。バッチ方向に合計する
        db = np.sum(dt, axis=0)
        
        # 隠れ状態への重み(Wh)と、入力への重み(Wx)の勾配
        dWh = np.dot(h_prev.T, dt)
        dWx = np.dot(x.T, dt)
        
        # 1つ前の時間(過去)へ伝える誤差
        dh_prev = np.dot(dt, Wh.T)
        
        # 前の層へ伝える誤差
        dx = np.dot(dt, Wx.T)

        # 計算した勾配を保存しておく(後でまとめて更新するため)
        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        return dx, dh_prev


class TimeRNN:
    """Tステップ分(複数時刻分)のRNNをまとめて1つの層として扱うためのクラス
    言語モデルは一度に複数の単語の並び(ブロック)を処理するため、この層が必要です。
    """
    def __init__(self, Wx, Wh, b, stateful=False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None # 内部に持つ複数のRNN層を格納するリスト
        
        self.h, self.dh = None, None
        
        # statefulがTrueの場合、前回のバッチ処理の終わりの「記憶」を、次のバッチの最初に引き継ぐ(長い文脈を覚える)
        self.stateful = stateful 

    def set_state(self, h):
        """外部から隠れ状態(記憶)を強制的にセットする"""
        self.h = h

    def reset_state(self):
        """隠れ状態(記憶)をリセットし、初期状態に戻す"""
        self.h = None

    def forward(self, xs):
        """順伝播(Tステップ分の入力をまとめて処理)
        xs: 入力データ。(データの個数N, 時系列の長さT, 単語ベクトルの次元D) の3次元配列
        """
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        D, H = Wx.shape # Hは隠れ層のサイズ

        self.layers = []
        hs = np.empty((N, T, H), dtype='f') # Tステップ分のすべての隠れ状態を格納する箱

        # 記憶を引き継がない設定、または最初の処理の場合は、記憶(h)をゼロで初期化
        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype='f')

        # 時系列(T)の数だけループして、内部のRNN層に1ステップずつ渡していく
        for t in range(T):
            layer = RNN(*self.params) # 1ステップ分のRNN層を作成
            self.h = layer.forward(xs[:, t, :], self.h) # 今の単語と過去の記憶を渡して、新しい記憶を得る
            hs[:, t, :] = self.h
            self.layers.append(layer) # 逆伝播で使うためにレイヤを取っておく

        return hs

    def backward(self, dhs):
        """RNN全体での誤差逆伝播(BPTT: Backpropagation Through Time)"""
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D, H = Wx.shape

        dxs = np.empty((N, T, D), dtype='f')
        dh = 0 # 未来から過去へ伝わってくる勾配(一番最初は未来がないので0)
        grads = [0, 0, 0]
        
        # Tステップを「過去に向かって(逆順に)」誤差を伝えていく
        for t in reversed(range(T)):
            layer = self.layers[t]
            
            # 上の層から来た誤差(dhs[:, t, :])と、1つ未来から来た誤差(dh)を足して逆伝播
            dx, dh = layer.backward(dhs[:, t, :] + dh) 
            dxs[:, t, :] = dx

            # 各ステップでの重みの更新量をすべて足し合わせる
            for i, grad in enumerate(layer.grads):
                grads[i] += grad

        # 足し合わせた更新量をこの層の勾配として保存
        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
            
        self.dh = dh # 最終的にTステップ前までたどり着いた誤差を保存しておく

        return dxs


class TimeEmbedding:
    """単語ID(数値)を、意味を持つ密なベクトル(分散表現)に変換する層"""
    def __init__(self, W):
        self.params = [W] # Wは単語の辞書(全ての単語のベクトルが並んだ行列)
        self.grads = [np.zeros_like(W)]
        self.layers = None
        self.W = W

    def forward(self, xs):
        """入ってきた単語ID(xs)に対応するベクトルを、重みWから「引き抜く」だけ"""
        N, T = xs.shape
        V, D = self.W.shape

        out = np.empty((N, T, D), dtype='f')
        self.layers = []

        for t in range(T):
            idx = xs[:, t] # ある時刻tの単語ID
            out[:, t, :] = self.W[idx] # 対応するベクトルを抽出
            self.layers.append(idx)

        return out

    def backward(self, dout):
        """Embedding層の逆伝播は、使われた単語ベクトルの部分だけに誤差を反映する"""
        N, T, D = dout.shape
        dW = np.zeros_like(self.W)

        for t in range(T):
            idx = self.layers[t]
            for i, word_id in enumerate(idx):
                dW[word_id] += dout[i, t, :] # 同じ単語が複数回使われた場合は勾配を加算

        self.grads[0][...] = dW
        return None


class TimeAffine:
    """RNNの出力を、語彙数と同じサイズに変換する全結合層(予測スコアを出すため)"""
    def __init__(self, W, b):
        self.params = [W, b]
        self.grads = [np.zeros_like(W), np.zeros_like(b)]
        self.x = None

    def forward(self, x):
        N, T, D = x.shape
        W, b = self.params

        # 行列の掛け算をしやすくするため、NとTの次元を一度平坦化(reshape)する
        rx = x.reshape(N*T, -1)
        
        # 内積(dot)による掛け算とバイアスの加算。これが全結合層のメイン処理
        out = np.dot(rx, W) + b
        
        self.x = x
        
        # 形を(N, T, 出力次元)に戻す
        return out.reshape(N, T, -1)

    def backward(self, dout):
        """通常の行列の逆算"""
        x = self.x
        N, T, D = x.shape
        W, b = self.params

        dout = dout.reshape(N*T, -1)
        rx = x.reshape(N*T, -1)

        db = np.sum(dout, axis=0)
        dW = np.dot(rx.T, dout)
        dx = np.dot(dout, W.T)
        dx = dx.reshape(N, T, -1)

        self.grads[0][...] = dW
        self.grads[1][...] = db

        return dx


class TimeSoftmaxWithLoss:
    """最終的な予測スコアを確率に変換(Softmax)し、正解との誤差(Loss)を計算する層"""
    def __init__(self):
        self.params, self.grads = [], []
        self.cache = None

    def forward(self, xs, ts):
        """
        xs: モデルが出した各単語の予測スコア
        ts: 正解の単語ID
        """
        N, T, V = xs.shape
        if ts.ndim == 3:
            ts = ts.argmax(axis=2)

        ts = ts.reshape(N * T)
        xs = xs.reshape(N * T, V)

        # --- Softmaxの計算 ---
        # 指数関数(exp)を使う際、値が大きすぎると無限大(オーバーフロー)になるため、最大値を引いてから計算
        xs = xs - xs.max(axis=1, keepdims=True)
        xs = np.exp(xs)
        ys = xs / xs.sum(axis=1, keepdims=True) # 各単語の確率(和が1になる)が計算される

        # --- Cross Entropy Error (交差エントロピー誤差)の計算 ---
        # 正解だった単語の「予測確率だけ」を取り出して対数(log)をとる
        # 1e-7 を足しているのは、log(0)がマイナス無限大になって計算が壊れるのを防ぐため
        ls = np.log(ys[np.arange(N * T), ts] + 1e-7)
        loss = -np.sum(ls) / (N * T) # 平均誤差

        self.cache = (ys, ts, (N, T, V))
        return loss

    def backward(self, dout=1):
        """Softmax-with-Lossレイヤの逆伝播は非常に美しく、手抜き計算ができる特徴があります。
        (モデルの予測確率 - 正解ラベル(1.0)) がそのまま誤差として前の層に渡ります。
        """
        ys, ts, (N, T, V) = self.cache

        dx = ys.copy()
        
        # 正解ラベルの場所だけ確率から1を引き、残りは予測確率そのものを誤差とする
        dx[np.arange(N * T), ts] -= 1
        dx *= dout
        dx /= (N * T)

        dx = dx.reshape(N, T, V)
        return dx


# --- 単純なRNNLMの構築 ---
# ここまで作った「ブロック」を組み合わせて、1つの言語モデル(AI)を作ります。

class SimpleRnnlm:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        # V: 扱う単語の総数, D: 単語ベクトルの次元数, H: 隠れ層の細胞数
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # AIが学習を開始する前の、最初のデタラメな重み(パラメータ)を生成します
        # 割る数(100やsqrt)は、学習がスムーズに進むための「初期値の工夫(Xavierの初期値など)」です
        embed_W = (rn(V, D) / 100).astype('f')
        rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
        rnn_Wh = (rn(H, H) / np.sqrt(H)).astype('f')
        rnn_b = np.zeros(H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 用意したレイヤを、データの流れ順に繋げてリスト(self.layers)に入れます
        # 単語入力 -> 分散表現(ベクトル) -> RNNの時系列処理 -> スコア予測
        self.layers = [
            TimeEmbedding(embed_W),
            TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful=True), # 文脈を繋ぐためにstateful=True
            TimeAffine(affine_W, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss() # 最後の誤差計算層だけは特別扱い
        self.rnn_layer = self.layers[1]

        # 学習時に一気に重みを更新できるよう、全レイヤのパラメータと勾配を一つのリストに集約します
        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, xs, ts):
        """全体を貫通する順伝播。データxsを入れてロス(誤差)を計算します"""
        # 順番にレイヤを通して変換していく
        for layer in self.layers:
            xs = layer.forward(xs)
            
        # 最後に誤差レイヤを通して、正解とのズレを算出
        loss = self.loss_layer.forward(xs, ts)
        return loss

    def backward(self, dout=1):
        """全体を逆走する逆伝播。誤差から、各重みの修正すべき量(勾配)を求めます"""
        dout = self.loss_layer.backward(dout)
        
        # 層を後ろから前へ、逆順に遡る
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        """新しい文章を読み始める時にメモリ(文脈)をリセットする"""
        self.rnn_layer.reset_state()


# --- 学習スクリプト ---

def preprocess(text):
    """テキストを前処理し、単語IDのリストに変換する関数"""
    text = text.lower() # 全て小文字に
    text = text.replace('.', ' .') # ピリオドも一つの単語として扱うためスペースを開ける
    words = text.split(' ') # スペース区切りで単語のリストにする

    word_to_id = {} # 単語 -> IDの辞書
    id_to_word = {} # ID -> 単語の辞書
    
    # 未知の単語が出てくるたびに、新しいIDを付与する
    for word in words:
        if word not in word_to_id:
            new_id = len(word_to_id)
            word_to_id[word] = new_id
            id_to_word[new_id] = word

    # 元のテキストを、単語IDの数値の並びに変換
    corpus = np.array([word_to_id[w] for w in words])
    return corpus, word_to_id, id_to_word


if __name__ == '__main__':
    # 1. 学習するコーパス(データ)の準備
    text = "You say goodbye and I say hello."
    corpus, word_to_id, id_to_word = preprocess(text)
    
    # 2. AIの設計図(ハイパーパラメータ)の設定
    batch_size = 1       # 1回に学習する文の数
    wordvec_size = 8     # 1つの単語をいくつの数字(次元)のベクトルで表現するか
    hidden_size = 8      # RNNの記憶容量の大きさ
    time_size = 2        # Truncated BPTT: 過去何単語分まで遡って学習するか
    lr = 0.5             # 学習率 (1歩でどれくらい大きく重みを修正するか)
    max_epoch = 300      # 全データセットを何周学習させるか
    
    vocab_size = len(word_to_id) # 存在する単語の総数
    
    # 3. モデルの実体化
    model = SimpleRnnlm(vocab_size, wordvec_size, hidden_size)

    # 4. 入力(xs)と正解(ts)の作成。RNNLMでは「次の単語」が正解になります。
    # 例: xs = ["You", "say", "goodbye"]
    #     ts = ["say", "goodbye", "and"]
    xs = corpus[:-1]
    ts = corpus[1:]
    data_size = len(xs)

    print("=== RNNLM 学習開始 ===")
    print(f"コーパス: '{text}'")
    
    # 5. 学習ループ
    for epoch in range(max_epoch):
        model.reset_state() # 1周ごとに文脈をリセット
        
        loss_sum = 0
        iters = 0
        
        # ブロック(time_size)ごとに少しずつ進めながら学習
        for t in range(0, data_size - time_size + 1, time_size):
            # バッチの切り出し
            batch_x = xs[t:t+time_size].reshape(1, time_size)
            batch_t = ts[t:t+time_size].reshape(1, time_size)

            # ① 順伝播(推論)してロス(どれくらい間違えたか)を計算
            loss = model.forward(batch_x, batch_t)
            
            # ② 逆伝播を実行し、各レイヤのパラメータの「修正すべき量(誤差)」を求める
            model.backward()
            
            # ③ パラメータ更新 (SGD: 確率的勾配降下法)
            # 現在のパラメータから、(学習率 × 更新量) を引き算して微修正する
            for param, grad in zip(model.params, model.grads):
                param -= lr * grad
                
            loss_sum += loss
            iters += 1
            
        avg_loss = loss_sum / iters
        
        # 20 Epochごとに、学習がどれくらい進んだかを出力
        if (epoch+1) % 20 == 0:
            # パプレキシティ(Perplexity)は e^(Loss) で計算できる指標。
            # 「次に来る単語の候補を、平均して何個まで絞り込めているか」を表します。1.0なら完璧。
            perplexity = np.exp(avg_loss)
            print(f"Epoch {epoch+1:3d} | Loss: {avg_loss:.4f} | Perplexity: {perplexity:.4f}")

    # --- 生成(推論)テスト ---
    # 学習したAIに、最初の単語だけを与えて続きを予想させます。
    print("\n=== 単語生成ジェネレート ===")
    
    # スタート単語 "you" をAIに与える
    word_id = word_to_id['you']
    print(id_to_word[word_id], end=' ')
    
    for _ in range(6): # 次の6単語を生成
        x = np.array([[word_id]]) # 現在の単語ID
        
        # AIの中(各レイヤ)を順番に通過させて予測スコアを計算
        for layer in model.layers:
            x = layer.forward(x)
            
        p = x[0, 0, :]   # Softmaxに通す前のスコアの羅列
        
        # スコアを確率(0.0〜1.0)に変換。合計が1になる
        p = np.exp(p - np.max(p)) / np.sum(np.exp(p - np.max(p))) 
        
        # 最も確率が一番高い(=AIが一番自信のある)単語IDを取得する (ここでは単純な貪欲法を使用)
        word_id = np.argmax(p)
        print(id_to_word[word_id], end=' ') # 生成された単語を表示し、これが次のループの入力になる
        
    print("\n")
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?