概要
昨年末、岡山大の数学の先生が機械学習を使った気象の長期予測に関するレクチャをされているのを聴講させた頂いた。使っていたモデルはReservoir Computingだった。恥ずかしながら初耳だったので出版したての下記を購入し早速勉強した。
和書なのに中々の良書で、スクラッチで書かれたコードも多数あり興味があれば手に取っていただきたい。コードはgithubではなく書店のサイトに置かれている。
ここではそのコードは使わず、とことんエコな神パケージを使用させていただく。
- 実施期間: 2025年2月
- 環境:Ubuntu22.04LTS
- Python: condaのPython3.10
1. Dataset
Multivariateにしたいので次のようにsine wave, cosine waveの2featuresとし、1000 time step作成しておく。雰囲気を出すために前者を"Temperature"、後者を"humidity"とする。forecastの対象"Target"は1ts先の"Temperature"とする。また本パケージはsklearnのモデルもreadoutに使用することができるので、classificationさせるときのlabel targetも併せて"quantile"として作成しておく。
元データはDataFrameに入れておくことが多いので、DataFrame化から始める。
実際のdatasetを使うときは時系列共通前処理として、階差を取ってtrend成分を消したりstandardizationしたりすること。
import pandas as pd
import numpy as np
from time import time
import matplotlib.pyplot as plt
from reservoirpy import nodes
from sklearn.metrics import root_mean_squared_error
from sklearn.metrics import multilabel_confusion_matrix
from sklearn.linear_model import RidgeClassifier, LogisticRegression, Perceptron
from reservoirpy.utils import verbosity
verbosity(0)
def make_dataset(start=0, end=100, n_steps=1000):
# ここでは例としてダミーのデータフレームを作成しています。
np.random.seed(42)
df = pd.DataFrame({
'temperature': np.sin(np.linspace(start, end, n_steps)), #+ 0.1 * np.random.randn(n_steps),
'humidity': np.cos(np.linspace(start, end, n_steps)) #+ 0.1 * np.random.randn(n_steps),
})
# classification用に温度を四分位数で区切る
num_quantiles = 4
df['quantile'] = pd.qcut(df['temperature'], q=num_quantiles, labels=False) + 1
# 1step先の気温を予測するため、target列として1行シフトさせる
df['target'] = df['temperature'].shift(-1)
df = df.dropna().reset_index(drop=True) # targetがNaNになった最終行を削除
return df
def split_regression_dataset(df, split_index, target='target'):
# 説明変数はtarget列以外の全列とする
feature_columns = df.columns.drop(['target', 'quantile'])
# NumPy配列に変換
data_features = df[feature_columns].values
data_target = df[target].values.reshape(-1, 1)
# --- データの分割 ---
# データを前半と後半に分割(前半:一括学習、後半:オンライン学習&バックテスト)
X_train = data_features[:split_index]
y_train = data_target[:split_index]
X_test = data_features[split_index:]
y_test = data_target[split_index:]
return X_train, y_train, X_test, y_test
def draw_fig(split_index, X_test, y_test, predictions, png_name):
# 時間軸
t_online = np.arange(split_index, split_index + len(X_test))
_, ax = plt.subplots(figsize=(12, 4))
ax.plot(t_online, y_test, label='True Temperature')
ax.plot(t_online, predictions, label='Predicted Temperature', linestyle='--')
ax.legend()
# plt.show()
plt.savefig(png_name)
arrayのshapeは次のようになっていることは確認する。
data_features.shape
> (999, 2)
data_target.shape
> (999, 1)
2. ReservoirPyの基本
nodeオブジェクト
Reservoir Computingの基本だが、reservoirは高次元ベクトルを発生させるだけのレイヤで、そのベクトルから時系列の特徴を見つけるのがreadoutである。従い、ベクトルを発生させるrun()はreservoir nodeのメソッドであり、そのベクトル(コード中のstatus)からパラメータ最適化を行うfit()はreadout nodeに対して行うメソッドである。これが直感的にわかるよう実装されている。
states = reservoir.run(X_train) # Reservoir nodeの出力を取得
readout.fit(states, y_train) # Readout nodeのtraining
modelオブジェクト
上記ではややコードが煩雑になるため、モデルの構造がある程度決まっているのであればモデルを定義しfit()で各nodeのrunとfitを一気に内部で行う実装が次のコードである。チュートリアルのほとんどがこの手法で書かれている。
model = reservoir >> readout # モデル(ESN)の定義
model.fit(X_train, y_train) # modelのtraining
result = model.run(X_test) # reservoir nodeのrun出力でreadoutがforecast実行
3. Strategy for offline regression problem
1000tsの前半500tsでtrainingを行い、残りはonline運用中を想定し1tsずつ使用する。online運用中は1ts先のforecastのみを実行しreadoutの追加trainingは行わない。
ReservoirPyでは、fit()でreservoir状態を変えながらreadoutのパラメータを最適化し、run()ではreadoutは固定してreservoir状態を変化しforecastを行う。
周期性がある場合、warmupは1周期分を慣らしで使用することが一般的なようなので、ここでもそうしている。
def main_1step_pred():
# --- データ準備 ---
df = make_dataset()
# --- データの分割 ---
split_index = len(df) // 2
X_train, y_train, X_test, y_test = split_regression_dataset(df, split_index)
# --- ReservoirPyモデルの構築 ---
# reservoirとFORCE readoutを連結したモデル
start_time = time()
reservoir = nodes.Reservoir(units=200, sr=0.9)
readout = nodes.Ridge(ridge=1e-3)
model = reservoir >> readout
# --- オフライン学習(前半データで一括学習) ---
# warmup期間を設定(例: 10 time step)
model.fit(X_train, y_train, warmup=63)
# --- オンライン学習&バックテスト(後半データを1stepずつ更新) ---
y_pred = []
back_test_iter_num = len(df) - split_index
for i in range(back_test_iter_num):
x = X_test[i].reshape(1, -1)
pred = model(x)
y_pred.append(pred.item())
print('RMSE: {0}'.format(round(root_mean_squared_error(y_test.ravel(), y_pred), 5)))
png_name = 'offline_training_result1.png'
draw_fig(split_index, X_test, y_test, y_pred, png_name)
stop_time1 = time()
ここまでが通常のoffline運用で、下記ではこのモデルを時系列的に不連続なdatasetで使いまわす例を示す。
# --- Training datasetと非連続な新データ準備 ---
df = make_dataset(20,70,500)
split_index = 0
_, _, X_test_new, y_test_new = split_regression_dataset(df, split_index)
# readoutはfit()でtraining済みなので、reservoirの状態をリセットして、新しいデータで状態を更新する。
model.run(X_test_new[:63], reset=True)
y_pred_new = []
for i in range(63, 499):
x = X_test_new[i].reshape(1, -1)
pred = model(x)
y_pred_new.append(pred.item())
print('RMSE: {0}'.format(round(root_mean_squared_error(y_test_new[63:].ravel(), y_pred_new), 5)))
png_name = 'offline_training_result2.png'
draw_fig(split_index, X_test_new[63:], y_test_new[63:], y_pred_new, png_name)
stop_time2 = time()
print(f'start-stop1: {round((stop_time1 - start_time), 3)}, start-stop2: {round((stop_time2 - start_time) ,3)}')
4. Strategy for online regression problem
1000tsの前半500tsでtrainingを行い、残りはonline運用中を想定しfor loop内で1tsずつ使用する。この中でforecast後に再trainingする。readoutにはonline training用のFORCEアルゴリズムを使用しているが、次バージョンでは削除されるらしく、LMS(Least Mean Squares) かRLS(Recursive Least Squares)を使うことが推奨されている。
def main_online():
# --- データ準備 ---
df = make_dataset()
# --- データの分割 ---
split_index = len(df) // 2
X_train, y_train, X_test, y_test = split_regression_dataset(df, split_index)
# --- ReservoirPyモデルの構築 ---
# reservoirとFORCE readoutを連結したモデル
start_time = time()
reservoir = nodes.Reservoir(units=200, sr=0.9)
force_readout = nodes.FORCE(alpha=1e-4)
model = reservoir >> force_readout
# --- オフライン学習(前半データで一括学習) ---
model.train(X_train, y_train)
stop_time1 = time()
# --- オンライン学習&バックテスト(後半データを1stepずつ更新) ---
y_pred = []
# 後半データの各観測値についてループ
for i in range(len(X_test)):
# 現在の観測値
x_sample = X_test[i].reshape(1, -1)
# 現在のモデルで1step先の気温を予測
pred = model.run(x_sample)
y_pred.append(pred.item())
## この間に1step先の実測値が得られるまでtimer処理等必要
# オンライン更新:現時点の観測値とその1step先の実測値でモデルを更新
y_true = y_test[i].reshape(1, -1)
model.train(x_sample, y_true)
# --- 結果のプロット ---
print('RMSE: {0}'.format(round(root_mean_squared_error(y_test.ravel(), y_pred), 5)))
png_name = 'online_training_result1.png'
draw_fig(split_index, X_test, y_test, y_pred, png_name)
stop_time2 = time()
print(f'start-stop1: {round((stop_time1 - start_time), 3)}, start-stop2: {round((stop_time2 - start_time) ,3)}')
4. strategy for offline classification problem
ReservoirPyの標準readoutにはclassificationモデルはないが、ScikitLearnNodeが用意されており、scikit learnのモデルをoffline用途だがreadoutとして使用することができる。
Targetは"Temperature"を四分位にレベル分けしたquantileを使用する。1000tsの前半500tsでtrainingを行い、残りを1tsずつ使用し評価する。基本はサンプルコードを流用しただけだが、サンプルは日本語の母音波形分類なのでstateful=Falseが指定されていた。
def main_classification():
# --- データ準備 ---
df = make_dataset()
# --- データの分割 ---
split_index = len(df) // 2
X_train, y_train, X_test, y_test = split_regression_dataset(df, split_index, 'quantile')
reservoir = nodes.Reservoir(units=200, sr=0.9, lr=0.1)
sk_ridge = nodes.ScikitLearnNode(RidgeClassifier, name="RidgeClassifier")
sk_logistic = nodes.ScikitLearnNode(LogisticRegression, name="LogisticRegression")
sk_perceptron = nodes.ScikitLearnNode(Perceptron, name="Perceptron")
# model = reservoir >> [sk_ridge, sk_logistic, sk_perceptron]
model = reservoir >> sk_logistic
model.fit(X_train, y_train, warmup=63) # stateful=False,
y_pred = model.run(X_test, stateful=False)
result = multilabel_confusion_matrix(y_test, y_pred)
print(result)
評価にはmultilabel_confusion_matrixを使用している。パラメータ調整を全く行っていないのでこんな感じ。
Optunaを使ったハイパラ調整コードもあるので要参照。
[[[373 1]
[ 0 126]]
[[376 0]
[ 2 122]]
[[373 1]
[ 4 122]]
[[372 4]
[ 0 124]]]
次回は、信頼区間も出力する回帰問題で投稿予定。
5. DeepESN
ReservoirPyが神がかっているのはDeepESNを直感的に構築することができる点であろう。
例えばreservoirを直列接続したDeepESNで回帰と識別を同時に行うモデルは下記のように定義することができる。TensorFlowなどを使用するとデータの形を厳密に記述しなければならないが、ReservorPyはユーザが指定しなくともdatasetの形からそれを良きに推測してくれる。
神。
def deepesn(n_layers, n_units_per_layer, spectral_radii, leak_rates):
# DeepESNの構築
reservoir_layers = []
for i in range(n_layers):
reservoir = nodes.Reservoir(
units=n_units_per_layer,
sr=spectral_radii[i],
lr=leak_rates[i],
input_scaling=0.5,
bias_scaling=0.2
)
reservoir_layers.append(reservoir)
# 層を順に積み重ねる
deep_esn = reservoir_layers[0]
for layer in reservoir_layers[1:]:
deep_esn = deep_esn >> layer # レイヤを直列に重ねる
# リードアウト層の設定
# Ridge 回帰モデル
ridge_readout = nodes.Ridge(ridge=1e-6)
# ロジスティック回帰モデル(分類用)
logistic_readout = nodes.ScikitLearnNode(
LogisticRegression(max_iter=1000, solver='lbfgs')
)
# 並列に回帰と分類の出力を得る
parallel_readout = [ridge_readout, logistic_readout]
# 全体のDeepESNモデル
deep_esn = deep_esn >> parallel_readout
return deep_esn
# DeepESNの設定
# 各層のユニット数
n_layers = 3
n_units_per_layer = 100
# layerごとのハイパーパラメータ設定(例: spectral半径とleaky係数)
spectral_radii = [0.9, 0.8, 0.7] # spectral半径
leak_rates = [0.5, 0.3, 0.1] # leaky係数
deep_esn_model = deepesn(n_layers, n_units_per_layer, spectral_radii, leak_rates)
DeepESNの利点
https://arxiv.org/pdf/1711.05255
Abstructより
効率的な再帰型ニューラルネットワーク(RNN)モデルとして、エコー・ステート・ネットワーク(Echo State Network)などのリザーバー・コンピューティング(RC)モデルが、ここ10年で広く注目を集めています。しかし、時系列データでは大きな成果を上げていますが [1][2]、多くの時系列はマルチスケール構造を持っており、単一の隠れ層を持つRCモデルでは、これを捉えることが難しい場合があります。本論文では、Deep Echo State Networks(Deep-ESN)と呼ぶ新しい階層型リザーバー・コンピューティングの枠組みを提案する。Deep-ESNの最も特徴的な機能は、階層型投影による時系列の処理能力である。具体的には、入力時系列がリザーバーの高次元エコー状態空間に投影されると、その後のエンコーディング層(例えば、PCA、オートエンコーダー、ランダムプロジェクション)がエコー状態表現を低次元空間に投影します。これらの低次元表現は、別のESNによって処理することができます。
階層的枠組みの中で投影層とエンコーディング層を交互に使用することで、Deep-ESNはESNにおける共線性の問題の影響を低減するだけでなく、ESNの時間カーネル特性を最大限に活用して時系列のマルチスケールダイナミクスを探索することができます。 各リザーバーによって得られたマルチスケール表現を融合するために、最後の出力層に各エンコーディング層からの接続を追加します。
理論的分析により、Deep-ESNの安定性はエコー状態性質(ESP)により保証され、時間複雑性は従来のESNと同等であることが証明されています。いくつかの人工および実世界の時系列に関する実験結果から、Deep-ESNはマルチスケールダイナミクスを捉えることができ、標準的なESNおよび従来の階層型ESNベースのモデルの両方を上回ることが実証されています。
以上