RNN
時系列解析(特に予測、異常検知、分類など)では、伝統的にRNN(Recurrent Neural Network)とその派生(LSTM、GRU)が主力でした。一方、近年(2023〜2025年)の研究でLLM(Large Language Models、Transformerベースの大規模言語モデル)の適用が急速に進んでいます。
RNN(およびLSTM/GRU)のポジション
- 伝統的な標準アプローチ:時系列データのシーケンシャルな性質に特化して設計されたモデル。
- 強み:
- シーケンシャル処理で時間構造を厳密に尊重。
- 計算効率が良く(特に短いシーケンスやリアルタイム処理で)、パラメータ数が少ない。
- 弱み:
- 並列処理が難しく、長いシーケンスで訓練が遅い。
- 長期依存の学習が限界あり(LSTMで緩和されるが完全ではない)。
- 2025年現在、Transformer系に比べて性能が劣るケースが増加(特に長期間予測)。
GRU
GRUは、RNN(Recurrent Neural Network)の改良版で、時系列データやシーケンシャルデータを扱うためのニューラルネットワークの一種です。2014年に論文で提案され、LSTM(Long Short-Term Memory)と並んで最も広く使われるRNNの派生形です。
GRUの最大の特徴は、LSTMよりも構造がシンプルでパラメータ数が少なく、訓練が速いのに、長期依存関係を効果的に学習できる点です。
GRUの基本構造
通常のRNNは、過去の情報を隠れ状態(hidden state)として次に伝えますが、長いシーケンスになると勾配消失問題(過去の情報が学習されにづらくなる)が発生します。
GRUはこの問題をゲート機構で解決しています。GRUには以下の2つのゲートがあります:
-
Update Gate(更新ゲート)
- 「どれだけ過去の隠れ状態を保持するか」を制御します。
- 値が1に近い → 過去の情報をほぼそのまま保持
- 値が0に近い → 過去の情報をほとんど無視して新しい情報を優先
-
Reset Gate(リセットゲート)
- 「過去の隠れ状態をどれだけリセット(忘れる)するか」を制御します。
- 主に、現在の入力と過去の情報を組み合わせる際に、過去の情報をどれだけ使うかを決めます。
これら2つのゲートを使って、GRUは重要な過去情報を選択的に記憶し、不要な情報は忘れるという賢い仕組みを実現しています。
GRUの計算の流れ(簡略化)
各タイムステップで以下の処理を行います:
- 現在の入力 $ x_t $ と前時刻の隠れ状態 $ h_{t-1} $ を受け取る
- Reset Gate $ r_t $ を計算 → 過去の情報をどれだけ使うか決定
- Update Gate $ z_t $ を計算 → 新しい情報と過去の情報のバランスを決定
- リセットされた過去情報を使って「候補の新しい隠れ状態」 $ \tilde{h}_t $ を計算
- Update Gateを使って、過去の $ h_{t-1} $ と新しい候補 $ \tilde{h}_t $ を混ぜて最終の隠れ状態 $ h_t $ を出力
内部では各時点 $t$ で、入力 $x_t$ と前の状態 $h_{t-1}$ から次を計算しています:
$$
z_t=\sigma(W_z x_t + U_z h_{t-1}+b_z)
$$
$$
r_t=\sigma(W_r x_t + U_r h_{t-1}+b_r)
$$
$$
\tilde h_t=\tanh(W_h x_t + U_h (r_t \odot h_{t-1})+b_h)
$$
$$
h_t=(1-z_t)\odot h_{t-1}+z_t\odot \tilde h_t
$$
ここで
- $z_t$:更新ゲート(どれだけ新情報に置き換えるか)
- $r_t$:リセットゲート(過去をどれだけ無視するか)
- $h_t$ が 16次元(これが “16”)
直観的には、GRU(16) は「過去L点を見て、将来予測に使える特徴を 16個 自動で作る要約器」です。
$$
h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t
$$
(ここで $ \odot $ は要素ごとの積)GRUの本質はこの1行です。$z_t\in (0,1)$はどれだけ更新するかを決める定数です。各時点で平滑化係数が変わるEWMAだと考えればよいのです。
GRUのメリット
- LSTMと同等の性能を出しながら、パラメータが少なく訓練が速い
- 過学習しにくい
- 小〜中規模のデータセットやリソース制限がある環境(ラップトップなど)に最適
- 実務で広く使われている(株価予測、音声認識、自然言語処理など)
GRUのデメリット
- 非常に長い依存関係(数百ステップ以上)では、LSTMにわずかに劣る場合がある
- セル状態(cell state)がなく、記憶の制御がLSTMよりシンプル(=柔軟性がやや低い)
簡単な例
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
# ===== 1) 合成データ(ノイズ付きサイン波)を作る =====
np.random.seed(0)
tf.random.set_seed(0)
T = 4000
t = np.arange(T, dtype=np.float32)
x = np.sin(0.02 * t) + 0.1 * np.random.randn(T).astype(np.float32)
L = 30 # 過去30点を見て次を予測
X, y = [], []
for i in range(T - L - 1):
X.append(x[i:i+L])
y.append(x[i+L])
X = np.array(X, dtype=np.float32)[..., None] # (N, L, 1)
y = np.array(y, dtype=np.float32)[:, None] # (N, 1)
# train/test split(時系列なのでシャッフルしない)
n = len(X)
split = int(n * 0.8)
Xtr, Xte = X[:split], X[split:]
ytr, yte = y[:split], y[split:]
# ===== 2) いちばん小さいGRUモデル =====
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(L, 1)),
tf.keras.layers.GRU(16), # ★これがGRU(過去L点を要約して隠れ状態にする)
tf.keras.layers.Dense(1) # 1ステップ先
])
model.compile(optimizer="adam", loss="mse")
# ===== 3) 学習 =====
model.fit(Xtr, ytr, epochs=5, batch_size=64, verbose=1)
# ===== 4) 評価 =====
mse = model.evaluate(Xte, yte, verbose=0)
pred = model.predict(Xte[:5], verbose=0)
print("test MSE:", float(mse))
print("pred vs true (first 5):")
for p, yt in zip(pred.flatten(), yte[:5].flatten()):
print(f" pred={p:+.4f} true={yt:+.4f}")
plt.plot(pred)
plt.plot(pred)
plt.plot(yte)
日経225とトレーディングシステム
import numpy as np
import pandas as pd
from pandas_datareader import data as pdr
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras import layers, regularizers
import matplotlib.pyplot as plt
# =========================
# 設定
# =========================
START = "1949-01-01"
LAM = 0.94 # EWMA
L = 30 # lookback
COST = 1e-4 # 片道コスト率(例:1bp)
# ★ refit/val を「月単位」で指定(半年=6, 四半期=3)
START_DATE_PRED = "1950-01-01"
START_DATE_CHOOSE = "1951-01-01"
REFIT_MONTHS = 12 # 6=半年, 3=四半期, 12=年次
VAL_MONTHS = 12 # 検証に使う過去期間
K_LIST = (5,7,10) #(1, 2, 3, 5, 8, 10)
TAU_LIST = (0,0.01,0.02,0.05,0.10) # 例:ノイズカット(ロングのみなら意味が出やすい)
TARGET_AVG_ABS_W = 1
TF_CAP = 1.0
LONG_ONLY = True # True: ロングのみ(0..1), False: ロングショート(-1..1)
UNITS = 32
L2 = 1e-4
EPOCHS = 5
BATCH = 512
SEED = 1
# =========================
# 1) FREDからNIKKEI225
# =========================
def load_nikkei225_fred(start="1949-01-01", end=None):
df = pdr.DataReader("NIKKEI225", "fred", start=start, end=end)
df = df.rename(columns={"NIKKEI225": "close"}).dropna()
df = df[df["close"] > 0]
return df
# =========================
# 2) リターンとEWMAボラ(リークしない)
# - 学習特徴:log ret
# - 現金決済損益:simple ret
# - ★境界の1日リーク潰し用に date_next を作る
# =========================
def add_returns_and_ewma_vol(df, lam=0.94):
df = df.copy()
c = df["close"].astype(float)
# 単純リターン(現金決済の口座更新に使う)
df["ret_simple"] = c.pct_change()
df["ret_next_simple"] = df["ret_simple"].shift(-1)
# ログリターン(特徴量・ターゲットに使う)
df["logp"] = np.log(c)
df["ret_log"] = df["logp"].diff()
df["ret_next_log"] = df["ret_log"].shift(-1)
# ★次日の「日付」も保存(検証区間の境界リーク対策)
df["date_next"] = df.index.to_series().shift(-1)
# EWMA: ewma_vol[t] は ret_log[t-1] までで更新(翌日情報を見ない)
r = df["ret_log"].to_numpy(np.float64)
r2 = r * r
s2 = np.full_like(r2, np.nan)
first = np.where(np.isfinite(r2))[0]
if len(first) == 0:
raise ValueError("retが作れませんでした(データを確認)")
i0 = first[0]
s2[i0] = r2[i0]
for i in range(i0 + 1, len(r2)):
if np.isfinite(r2[i - 1]) and np.isfinite(s2[i - 1]):
s2[i] = lam * s2[i - 1] + (1.0 - lam) * r2[i - 1]
else:
s2[i] = s2[i - 1]
df["ewma_vol"] = np.sqrt(s2)
return df
# =========================
# 3) GRU用 X,y,idx(リークなし)
# 目的変数:y = ret_next_log / vol_t
# =========================
def make_xy(df, L=30, eps=1e-12):
feat = pd.DataFrame(index=df.index)
feat["ret"] = df["ret_log"]
feat["vol"] = df["ewma_vol"]
feat["absret"] = df["ret_log"].abs()
feat["ret_over_vol"] = df["ret_log"] / (df["ewma_vol"] + eps)
valid = feat.notna().all(axis=1) & df["ret_next_log"].notna()
idx_all = df.index[valid]
feat = feat.loc[idx_all]
y_all = (df.loc[idx_all, "ret_next_log"] / (df.loc[idx_all, "ewma_vol"] + eps)).astype(float)
F = feat.shape[1]
arr = feat.to_numpy(np.float32)
yv = y_all.to_numpy(np.float32)
X_list, y_list, idx_t = [], [], []
for t in range(L, len(idx_all)):
X_list.append(arr[t - L:t, :])
y_list.append(yv[t])
idx_t.append(idx_all[t])
X = np.stack(X_list).astype(np.float32) # (N,L,F)
y = np.asarray(y_list, np.float32) # (N,)
idx_t = pd.DatetimeIndex(idx_t)
return X, y, idx_t
# =========================
# 4) GRUモデル
# =========================
def build_gru(F, L, units=32, l2=1e-4):
model = tf.keras.Sequential([
layers.Input(shape=(L, F)), # ★ここを (None,F) から固定へ
layers.GRU(units, kernel_regularizer=regularizers.l2(l2)),
layers.Dense(1),
])
model.compile(optimizer="adam", loss="mse")
return model
# =========================
# 補助:月次の区間開始列を作る
# =========================
def _make_starts(start_date, end_date, step_months):
t = pd.Timestamp(start_date)
end = pd.Timestamp(end_date)
out = []
while t < end:
out.append(t)
t = t + pd.DateOffset(months=step_months)
return out
# =========================
# 5) 月次ウォークフォワードで pred を全期間作る(リークなし)
# =========================
def walkforward_pred_gru_monthly(
X, y, idx_t,
start_date="1950-01-01",
refit_months=12,
units=32, l2=1e-4,
epochs=5, batch=512,
seed=1
):
tf.keras.utils.set_random_seed(seed)
F = X.shape[2]
starts = _make_starts(start_date, idx_t.max() + pd.Timedelta(days=1), refit_months)
preds_all, idx_all = [], []
for j, t0 in enumerate(starts):
t1 = t0 + pd.DateOffset(months=refit_months)
m_train = (idx_t < t0)
m_test = (idx_t >= t0) & (idx_t < t1)
if m_train.sum() == 0 or m_test.sum() == 0:
continue
Xtr, ytr = X[m_train], y[m_train]
Xte = X[m_test]
# 標準化は train のみで fit(リーク防止)
sc = StandardScaler()
Xtr2 = Xtr.reshape(-1, F)
sc.fit(Xtr2)
Xtr_s = sc.transform(Xtr2).reshape(Xtr.shape)
Xte_s = sc.transform(Xte.reshape(-1, F)).reshape(Xte.shape)
# (任意)期間ごとに初期乱数を変えたい場合:
# tf.keras.utils.set_random_seed(seed + j)
model = build_gru(F=F, L=L,units=units, l2=l2)
model.fit(Xtr_s, ytr, epochs=epochs, batch_size=batch, verbose=0, shuffle=False)
pred = model.predict(Xte_s, batch_size=2048, verbose=0).ravel().astype(np.float32)
preds_all.append(pred)
idx_all.append(idx_t[m_test])
if len(preds_all) == 0:
raise ValueError("predが作れませんでした(start_date等を確認)")
pred_v = np.concatenate(preds_all)
idx_v = pd.DatetimeIndex(np.concatenate([ix.values for ix in idx_all]))
return pd.Series(pred_v, index=idx_v, name="pred").sort_index()
# =========================
# 6) ポジション関数(ボラ・ゲート無し)
# =========================
def make_w(pred, k=5.0, tf_=1.0, tau=0.0, long_only=True):
if long_only:
w = np.clip(k * pred, 0.0, 1.0) * tf_
else:
w = np.clip(k * pred, -1.0, 1.0) * tf_
if tau > 0:
w = np.where(np.abs(w) < tau, 0.0, w)
return w.astype(np.float64)
def sharpe_252(x):
x = np.asarray(x, float)
mu = np.mean(x)
sd = np.std(x)
return np.nan if sd == 0 else (mu / sd) * np.sqrt(252)
# =========================
# 7) 検証期間で (k,tau,tf) を選んで次期に適用(リークなし)
# ★境界リーク対策:val の末尾で next day が test_start を跨ぐ日を除外
# =========================
def walkforward_select_k_tau_monthly(
pred_s,
ret_next_simple_s,
date_next_s,
cost=1e-4,
start_date="1951-01-01",
refit_months=12,
val_months=12,
k_list=(1, 2, 3, 5, 8, 10),
tau_list=(0.00, 0.0001),
target_avg_abs_w=0.5,
tf_cap=1.0,
long_only=True
):
starts = _make_starts(start_date, pred_s.index.max() + pd.Timedelta(days=1), refit_months)
rows = []
for t0 in starts:
t1 = t0 + pd.DateOffset(months=refit_months)
val0 = t0 - pd.DateOffset(months=val_months)
val1 = t0
pred_val = pred_s.loc[(pred_s.index >= val0) & (pred_s.index < val1)]
pred_tst = pred_s.loc[(pred_s.index >= t0) & (pred_s.index < t1)]
if len(pred_val) == 0 or len(pred_tst) == 0:
continue
# ★ここが肝:ret_next_simple[t] が参照する次日が test_start(t0) 以上になる t を除外
ok = date_next_s.loc[pred_val.index] < val1
pred_val = pred_val.loc[ok]
if len(pred_val) == 0:
continue
ret_val = ret_next_simple_s.loc[pred_val.index]
best = (-np.inf, None) # (sharpe, (k,tau,tf))
for k in k_list:
for tau in tau_list:
w_raw = make_w(pred_val.values, k=k, tf_=1.0, tau=tau, long_only=long_only)
avg_abs = np.mean(np.abs(w_raw))
if avg_abs < 1e-12:
continue
tf_ = min(tf_cap, target_avg_abs_w / avg_abs)
w = make_w(pred_val.values, k=k, tf_=tf_, tau=tau, long_only=long_only)
dw = np.abs(np.diff(np.r_[0.0, w]))
r = (w * ret_val.values.astype(np.float64)) - cost * dw
s = sharpe_252(r)
if np.isfinite(s) and s > best[0]:
best = (s, (k, tau, tf_))
if best[1] is None:
continue
k_, tau_, tf_ = best[1]
rows.append([t0, t1, k_, tau_, tf_, best[0]])
chosen = pd.DataFrame(rows, columns=["test_start", "test_end", "k", "tau", "tf", "val_sharpe"])
return chosen
# =========================
# 8) chosen から全期間の w と「現金決済の口座推移」を構築
# =========================
def build_cash_settled_strategy(
pred_s,
ret_next_simple_s,
chosen,
cost=1e-4,
long_only=True,
E0=1.0
):
w_all = []
idx_all = []
for _, r in chosen.iterrows():
t0, t1 = r["test_start"], r["test_end"]
sl = (pred_s.index >= t0) & (pred_s.index < t1)
p = pred_s.loc[sl]
if len(p) == 0:
continue
rr = ret_next_simple_s.loc[p.index].astype(np.float64)
# 末尾がNaN(最後の日の next return)を除外しておく
ok = np.isfinite(rr.values)
p = p.loc[ok]
rr = rr.loc[p.index]
if len(p) == 0:
continue
w = make_w(
p.values,
k=float(r["k"]),
tf_=float(r["tf"]),
tau=float(r["tau"]),
long_only=long_only
)
w_all.append(w)
idx_all.append(p.index)
if len(w_all) == 0:
raise ValueError("chosenが空です(start_date等を確認)")
idx = pd.DatetimeIndex(np.concatenate([ix.values for ix in idx_all]))
w = np.concatenate(w_all).astype(np.float64)
ret = ret_next_simple_s.loc[idx].to_numpy(np.float64)
dw = np.abs(np.diff(np.r_[0.0, w]))
strat_r = w * ret - cost * dw
# 口座推移(self-financing)
eq_strat = np.empty_like(strat_r, dtype=np.float64)
eq_index = np.empty_like(strat_r, dtype=np.float64)
e_s = float(E0)
e_i = float(E0)
for t in range(len(strat_r)):
e_s *= (1.0 + strat_r[t])
e_i *= (1.0 + ret[t]) # 指数は常にフルエクスポージャ
eq_strat[t] = e_s
eq_index[t] = e_i
out = pd.DataFrame(
{
"w": w,
"ret_next_simple": ret,
"dw_abs": dw,
"strat_ret": strat_r,
"eq_strat": eq_strat,
"eq_index": eq_index,
},
index=idx
).sort_index()
stats = {
"Sharpe_cost": sharpe_252(out["strat_ret"].values),
"avg_abs_w": float(np.mean(np.abs(out["w"].values))),
"active": float(np.mean(np.abs(out["w"].values) > 1e-12)),
"avg_abs_dw": float(np.mean(out["dw_abs"].values)),
"entries_0_to_non0": int(np.sum((np.r_[0.0, out["w"].values[:-1]] == 0.0) & (out["w"].values != 0.0))),
"total_strat": float(out["eq_strat"].iloc[-1] / E0 - 1.0),
"total_index": float(out["eq_index"].iloc[-1] / E0 - 1.0),
}
return out, stats
# =========================
# 9) 実行
# =========================
def main():
df = load_nikkei225_fred(start=START)
df = add_returns_and_ewma_vol(df, lam=LAM)
X, y, idx_t = make_xy(df, L=L)
pred_s = walkforward_pred_gru_monthly(
X, y, idx_t,
start_date=START_DATE_PRED,
refit_months=REFIT_MONTHS,
units=UNITS, l2=L2,
epochs=EPOCHS, batch=BATCH,
seed=SEED
)
# 現金決済に使う「次日単純リターン」と「次日の日付」を pred_s の日付に揃える
ret_s = df.loc[pred_s.index, "ret_next_simple"].astype(float)
date_next_s = df.loc[pred_s.index, "date_next"]
chosen = walkforward_select_k_tau_monthly(
pred_s,
ret_s,
date_next_s,
cost=COST,
start_date=START_DATE_CHOOSE,
refit_months=REFIT_MONTHS,
val_months=VAL_MONTHS,
k_list=K_LIST,
tau_list=TAU_LIST,
target_avg_abs_w=TARGET_AVG_ABS_W,
tf_cap=TF_CAP,
long_only=LONG_ONLY
)
out, stats = build_cash_settled_strategy(
pred_s, ret_s, chosen,
cost=COST, long_only=LONG_ONLY, E0=1.0
)
print("REFIT_MONTHS",REFIT_MONTHS,"VAL_MONTHS", VAL_MONTHS)
print("k_List",K_LIST,"Tau_list",TAU_LIST,"target_average_abs_w",TARGET_AVG_ABS_W)
print("pred range:", pred_s.index.min(), "->", pred_s.index.max(), "N=", len(pred_s))
if len(chosen) > 0:
print("chosen periods:", chosen["test_start"].min(), "->", chosen["test_end"].max(), "rows=", len(chosen))
print("stats:", stats)
# =========================
# 10) プロット(全期間)
# =========================
plt.figure(figsize=(12, 4))
plt.plot(out.index, out["eq_index"], label="Index (cash-settled equity)")
plt.plot(out.index, out["eq_strat"], label="Strategy (cash-settled equity)")
plt.yscale("log")
plt.legend()
plt.title("Index vs Strategy (cash settlement, log y)")
plt.tight_layout()
plt.show()
plt.figure(figsize=(12, 3))
plt.plot(out.index, out["w"], lw=0.7)
plt.axhline(0.0, lw=0.8)
plt.title("Position w(t) (full period)")
plt.tight_layout()
plt.show()
plt.figure(figsize=(12, 3))
plt.plot(out.index, out["strat_ret"], lw=0.6)
plt.axhline(0.0, lw=0.8)
plt.title("Strategy daily return (w*R - cost*|dw|)")
plt.tight_layout()
plt.show()
# (B) 通常軸(右軸/左軸)
fig, ax1 = plt.subplots(figsize=(12, 5))
ax1.plot(out.index, out["eq_index"], label="Index", lw=1.0)
ax1.set_ylabel("Index equity (start=1)")
ax2 = ax1.twinx()
ax2.plot(out.index, out["eq_strat"], label="Strategy", lw=1.0)
ax2.set_ylabel("Strategy equity (start=1)")
ax1.set_title("Index vs Strategy (dual axis)")
fig.tight_layout()
plt.show()
# (C) w(ポジション)
plt.figure(figsize=(12, 4))
plt.plot(out.index, out["w"], lw=0.8, label="w (position weight)")
plt.axhline(0, lw=0.8)
plt.legend()
plt.title("Position w")
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()





