記事の概要:
本記事では、リザバーコンピューティング(RC)を用いてサイン波時系列を学習し、未来の波形を自己生成的に予測する方法をPythonで実装しました。
モチベーション:
一度、記事を掲載しましたが、理解が足りておらず取り下げました。より基本的な部分から勉強しなおしたいと思った次第です。
全体の流れ:
今回はEcho State Network (ESN)でサイン波の時系列予測をします。
1.サイン波データを生成
2.データを標準化(平均0・分散1)
3.入力層・リザバー層の重みをランダム生成
4.リザバーを入力時系列で駆動し状態を記録
5.出力層の重みをリッジ回帰で学習
6.学習済みモデルでサイン波の続きを自己生成的に予測
7.結果の可視化とステップ先予測の応用例
必要なライブラリのインポート
import numpy as np
from scipy import linalg
import matplotlib.pyplot as plt
np.set_printoptions(precision=4, suppress=True)
1. サイン波データを生成
- サイン波を生成する関数を定義
- サンプル数・振幅・周波数・位相を自由に指定可能
- 時系列データ data と時間配列 t を取得
# 1. サイン波生成関数
def generate_sin(start_time=0, end_time=10, num_time_steps=100,
amplitude=1.0, frequency=1.0, phase=0.0):
"""
サイン波を生成する関数。
- start_time, end_time : 時間軸の開始と終了
- num_time_steps : サンプル数
- amplitude : 振幅
- frequency : 周波数 [Hz]
- phase : 位相 [rad]
戻り値:
t : (num_time_steps,) 時間配列
y : (num_time_steps, 1) の列ベクトル形式のサイン波
"""
t = np.linspace(start_time, end_time, num_time_steps)
y = amplitude * np.sin(2 * np.pi * frequency * t + phase)
return t, y.reshape(-1, 1)
# サイン波データを生成
t, data = generate_sin(0, 10, 200, 1.0, 0.5)
print("t:", t.shape)
print("data:", data.shape)
plt.figure(figsize=(8, 3))
plt.plot(data)
plt.title("Original Sine Wave (Unstandardized)")
plt.xlabel("Time Step") # ← 実際の時間ではなくサンプル番号
plt.ylabel("Amplitude")
plt.grid(True)
plt.show()
2. データを標準化(平均0・分散1)
- 平均0・分散1に正規化
- tanh活性化関数と相性が良く学習安定化
- リザバーの飽和を防ぎ数値安定性を向上
# 2. 標準化(平均0・分散1にスケーリング)
data_mean = np.mean(data) # 平均値
data_std = np.std(data) # 標準偏差
data_stdized = (data - data_mean) / data_std # 標準化データ
print(f"Data mean={data_mean:.3f}, std={data_std:.3f}")
# 標準化後データの確認(時間を無視してインデックスで描画)
plt.figure(figsize=(8, 3))
plt.plot(data_stdized)
plt.title("Standardized Sine Wave")
plt.xlabel("Time Step") # ← 時間ではなく単なるステップ番号
plt.ylabel("Normalized Amplitude")
plt.grid(True)
plt.show()
3. 入力層・リザバー層の重みをランダム生成
- 値は ±scale(デフォルト0.1)でランダムに決定されます。
- この重みは学習されず、固定のままです。
(リザバーコンピューティングでは出力層のみを学習します) - 入力信号をランダムに変換して高次元特徴空間に埋め込む。
# 3. 入力重み(W_in)を生成
def generate_variational_weights(num_pre_nodes, num_post_nodes, scale=0.1):
"""
入力からリザバーへの重み行列を生成。
- 0/1をランダムに作って±scaleに変換
- scale : 重みの大きさを調整するスケール係数
"""
weights = np.random.randint(0, 2, num_pre_nodes * num_post_nodes)
weights = (weights.reshape(num_pre_nodes, num_post_nodes) * 2 - 1) * scale
return weights
# 入力1次元 -> リザバー50ノード
W_in = generate_variational_weights(1, 50)
print("W_in shape:", W_in.shape)
4. リザバーを入力時系列で駆動し状態を記録
- リザーバ層はランダムな再帰的ネットワーク。
- ノード間の結合行列 W_res を正規乱数から作り、スペクトル半径(固有値の最大絶対値)を 0.95 にスケーリングする。スペクトル半径が1未満であることで、ネットワークが安定して過去情報を保持できるようになる。
# 4. リザバー内部重み(W_res)を生成
def generate_reservoir_weights(num_nodes, desired_spectral_radius=0.95):
"""
リザバー内部の重み行列 W_res を生成。
スペクトル半径(最大固有値の絶対値)を desired_spectral_radius に調整。
"""
weights = np.random.normal(0, 1, (num_nodes, num_nodes)) # 正規分布乱数で初期化
eigs = linalg.eigvals(weights) # 固有値を計算
spectral_radius = max(abs(eigs)) # 最大固有値の絶対値
return weights * (desired_spectral_radius / spectral_radius) # スケーリング
W_res = generate_reservoir_weights(50)
print("W_res shape:", W_res.shape)
5. リザバー状態更新
- 各時刻におけるリザバー状態の更新式は以下です:
xt+1 = (1 - α)xt + αtanh(utWin + xt*Wres) - α : リーク率(leak rate).値が小さいほど、第1項のxtの寄与が大きくなる=直前の値utの寄与が小さい。
- Win : 入力重み
- Wres : リザバー内部の重み
- 入力時系列を非線形に変換し、リザバーの高次元状態を更新。
# 5. リザバー状態更新
def get_next_reservoir_nodes(input_value, current_state, W_in, W_res,
leak_rate, activator=np.tanh):
"""
リザバー状態を1ステップ更新する関数。
方程式:
x_{t+1} = (1 - α)x_t + α * f( u_t W_in + x_t W_res )
- input_value : 入力値 (スカラーでもOK)
- current_state : 現在のリザバー状態ベクトル
- leak_rate : α, 0 < α <= 1(leaky integrator)
- activator : 活性化関数(既定は tanh)
"""
u = np.array(input_value).reshape(1, -1) # 入力を1×D行列に
next_state = (1 - leak_rate) * current_state # 前状態の影響
next_state += leak_rate * (u @ W_in + current_state @ W_res).ravel()
return activator(next_state)
6. 出力重み(W_out)をリッジ回帰で学習
- リザバー状態(出力値)から目標出力を予測する線形モデルを構築。
- リザバーの状態ログを教師信号(次の時刻の値)と照らし合わせ、リッジ回帰(正則化付き最小二乗法)で出力重みを学習します。ここで学習されるのは 出力層のみ です。入力層やリザバー層は固定のまま(これがリザバーコンピューティングの特徴)。
# 6. 出力重み(W_out)をリッジ回帰で学習
def update_weights_output(log_reservoir_nodes, targets, ridge_lambda):
"""
リッジ回帰によって出力重み W_out を学習する。
W = (X^T X + λI)^(-1) X^T Y
- log_reservoir_nodes : (T, N) 各時刻のリザーバ状態
- targets : (T, 1) 教師信号
- ridge_lambda : 正則化パラメータ(λ)
"""
X = log_reservoir_nodes
Y = targets
N = X.shape[1]
reg = np.eye(N) * ridge_lambda
inv = np.linalg.inv(X.T @ X + reg)
W_out = inv @ X.T @ Y
return W_out
7. 学習フェーズ
- サイン波の各データを順にリザバーに入力し、各時刻の状態ベクトルを記録します。これが学習データ X になる。
- 教師信号 Y には、1ステップ先の出力値を使用します。(時系列予測タスクの典型的な設定)
- 記録した状態と教師信号で出力重みを学習。
# 7. 学習フェーズ(リザーバの状態記録)
leak_rate = 0.2
num_nodes = W_res.shape[0]
current_state = np.zeros(num_nodes) # 初期状態をゼロで開始
log_reservoir_nodes = [] # 全時刻の状態を記録するリスト
# サイン波入力を1ステップずつ入力し、リザーバ状態を更新
for u in data_stdized:
current_state = get_next_reservoir_nodes(u, current_state, W_in, W_res, leak_rate)
log_reservoir_nodes.append(current_state)
'''
このループでは:
1. サイン波の各点 u を入力する
2. 現在のリザーバ状態 current_state と組み合わせて → 新しい状態 next_state を計算
3. それを記録して次に進む
これを全時刻分繰り返して、最終的に X = [x1, x2, x3, … ,xT]という
リザーバ状態系列(状態ベクトルの時系列)を得ます。
これが「リザーバの出力ログ」であり、
次のステップ(線形回帰)で教師信号と結びつけて学習に使います。
'''
# numpy配列に変換 (T, N)
log_reservoir_nodes = np.array(log_reservoir_nodes)
print("log_reservoir_nodes shape:", log_reservoir_nodes.shape)
# 教師信号として1ステップ先の出力を使用(時系列予測)
X_train = log_reservoir_nodes[:-1] # 入力側(過去)
Y_train = data_stdized[1:] # 教師側(次の値)
# 出力重みを学習
W_out = update_weights_output(X_train, Y_train, ridge_lambda=0.01)
print("W_out shape:", W_out.shape)
8. 予測フェーズ
学習した出力重み Wout を使って、新しい時系列を自己回帰的に生成します。
- 最後の出力を次の入力として使う
- リザーバを更新し出力を生成
- それをまた次の入力に使う
# 8. 予測フェーズ(生成モード)
def predict(initial_input, initial_state, W_in, W_res, W_out,
leak_rate, activator=np.tanh, length_of_sequence=50):
"""
学習済みモデルを使って、自己生成的に系列を予測する。
手順:
1) 直前の出力を次の入力とみなす
2) リザバー状態を更新
3) 出力を生成して次入力に渡す
4) これを length_of_sequence 回繰り返す
"""
predicted_outputs = [initial_input] # 最初の入力を記録
reservoir_state = initial_state.copy() # 現在の状態をコピー
for _ in range(length_of_sequence):
# リザーバ状態更新
reservoir_state = get_next_reservoir_nodes(predicted_outputs[-1],
reservoir_state, W_in, W_res, leak_rate, activator)
# 出力計算
next_output = reservoir_state @ W_out
predicted_outputs.append(next_output)
# 初期入力を除いた出力系列を返す (T, 1)
return np.array(predicted_outputs[1:]).reshape(-1, 1)
# 予測開始条件(最後の値を入力、最後の状態を初期状態に)
initial_input = data_stdized[-1]
initial_state = log_reservoir_nodes[-1]
# 200ステップ先まで予測
predicted_std = predict(
initial_input=initial_input,
initial_state=initial_state,
W_in=W_in,
W_res=W_res,
W_out=W_out,
leak_rate=leak_rate,
length_of_sequence=200
)
print(predicted_std.shape)
9. 標準化を元に戻して、可視化。
# 標準化を逆変換して元のスケールに戻す
predicted = predicted_std * data_std + data_mean
# 10. 結果を可視化
plt.figure(figsize=(10, 4))
plt.plot(range(len(data)), data, label="Training Data")
plt.plot(range(len(data), len(data) + len(predicted)), predicted, label="Predicted", linestyle='--')
plt.legend()
plt.title("Reservoir Computing with Standardization")
plt.xlabel("Time Step")
plt.ylabel("Amplitude")
plt.grid(True)
plt.show()
print("Predicted shape:", predicted.shape)
print("First 5 predicted values:\n", predicted[:5])
