2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python3ではじめるシステムトレード:GRUでトレーディングしてみる

Posted at

RNN

時系列解析(特に予測、異常検知、分類など)では、伝統的にRNN(Recurrent Neural Network)とその派生(LSTM、GRU)が主力でした。一方、近年(2023〜2025年)の研究でLLM(Large Language Models、Transformerベースの大規模言語モデル)の適用が急速に進んでいます。

RNN(およびLSTM/GRU)のポジション

  • 伝統的な標準アプローチ:時系列データのシーケンシャルな性質に特化して設計されたモデル。
  • 強み:
    • シーケンシャル処理で時間構造を厳密に尊重。
    • 計算効率が良く(特に短いシーケンスやリアルタイム処理で)、パラメータ数が少ない。
  • 弱み:
    • 並列処理が難しく、長いシーケンスで訓練が遅い。
    • 長期依存の学習が限界あり(LSTMで緩和されるが完全ではない)。
    • 2025年現在、Transformer系に比べて性能が劣るケースが増加(特に長期間予測)。

GRU

GRUは、RNN(Recurrent Neural Network)の改良版で、時系列データやシーケンシャルデータを扱うためのニューラルネットワークの一種です。2014年に論文で提案され、LSTM(Long Short-Term Memory)と並んで最も広く使われるRNNの派生形です。

GRUの最大の特徴は、LSTMよりも構造がシンプルでパラメータ数が少なく、訓練が速いのに、長期依存関係を効果的に学習できる点です。

GRUの基本構造
通常のRNNは、過去の情報を隠れ状態(hidden state)として次に伝えますが、長いシーケンスになると勾配消失問題(過去の情報が学習されにづらくなる)が発生します。

GRUはこの問題をゲート機構で解決しています。GRUには以下の2つのゲートがあります:

  1. Update Gate(更新ゲート)

    • 「どれだけ過去の隠れ状態を保持するか」を制御します。
    • 値が1に近い → 過去の情報をほぼそのまま保持
    • 値が0に近い → 過去の情報をほとんど無視して新しい情報を優先
  2. Reset Gate(リセットゲート)

    • 「過去の隠れ状態をどれだけリセット(忘れる)するか」を制御します。
    • 主に、現在の入力と過去の情報を組み合わせる際に、過去の情報をどれだけ使うかを決めます。

これら2つのゲートを使って、GRUは重要な過去情報を選択的に記憶し、不要な情報は忘れるという賢い仕組みを実現しています。

GRUの計算の流れ(簡略化)
各タイムステップで以下の処理を行います:

  1. 現在の入力 $ x_t $ と前時刻の隠れ状態 $ h_{t-1} $ を受け取る
  2. Reset Gate $ r_t $ を計算 → 過去の情報をどれだけ使うか決定
  3. Update Gate $ z_t $ を計算 → 新しい情報と過去の情報のバランスを決定
  4. リセットされた過去情報を使って「候補の新しい隠れ状態」 $ \tilde{h}_t $ を計算
  5. Update Gateを使って、過去の $ h_{t-1} $ と新しい候補 $ \tilde{h}_t $ を混ぜて最終の隠れ状態 $ h_t $ を出力

内部では各時点 $t$ で、入力 $x_t$ と前の状態 $h_{t-1}$ から次を計算しています:

$$
z_t=\sigma(W_z x_t + U_z h_{t-1}+b_z)
$$
$$
r_t=\sigma(W_r x_t + U_r h_{t-1}+b_r)
$$
$$
\tilde h_t=\tanh(W_h x_t + U_h (r_t \odot h_{t-1})+b_h)
$$
$$
h_t=(1-z_t)\odot h_{t-1}+z_t\odot \tilde h_t
$$

ここで

  • $z_t$:更新ゲート(どれだけ新情報に置き換えるか)
  • $r_t$:リセットゲート(過去をどれだけ無視するか)
  • $h_t$ が 16次元(これが “16”)

直観的には、GRU(16) は「過去L点を見て、将来予測に使える特徴を 16個 自動で作る要約器」です。

$$
h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t
$$
(ここで $ \odot $ は要素ごとの積)GRUの本質はこの1行です。$z_t\in (0,1)$はどれだけ更新するかを決める定数です。各時点で平滑化係数が変わるEWMAだと考えればよいのです。

GRUのメリット

  • LSTMと同等の性能を出しながら、パラメータが少なく訓練が速い
  • 過学習しにくい
  • 小〜中規模のデータセットやリソース制限がある環境(ラップトップなど)に最適
  • 実務で広く使われている(株価予測、音声認識、自然言語処理など)

GRUのデメリット

  • 非常に長い依存関係(数百ステップ以上)では、LSTMにわずかに劣る場合がある
  • セル状態(cell state)がなく、記憶の制御がLSTMよりシンプル(=柔軟性がやや低い)

簡単な例

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# ===== 1) 合成データ(ノイズ付きサイン波)を作る =====
np.random.seed(0)
tf.random.set_seed(0)

T = 4000
t = np.arange(T, dtype=np.float32)
x = np.sin(0.02 * t) + 0.1 * np.random.randn(T).astype(np.float32)

L = 30  # 過去30点を見て次を予測
X, y = [], []
for i in range(T - L - 1):
    X.append(x[i:i+L])
    y.append(x[i+L])
X = np.array(X, dtype=np.float32)[..., None]  # (N, L, 1)
y = np.array(y, dtype=np.float32)[:, None]    # (N, 1)

# train/test split(時系列なのでシャッフルしない)
n = len(X)
split = int(n * 0.8)
Xtr, Xte = X[:split], X[split:]
ytr, yte = y[:split], y[split:]

# ===== 2) いちばん小さいGRUモデル =====
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(L, 1)),
    tf.keras.layers.GRU(16),      # ★これがGRU(過去L点を要約して隠れ状態にする)
    tf.keras.layers.Dense(1)      # 1ステップ先
])
model.compile(optimizer="adam", loss="mse")

# ===== 3) 学習 =====
model.fit(Xtr, ytr, epochs=5, batch_size=64, verbose=1)

# ===== 4) 評価 =====
mse = model.evaluate(Xte, yte, verbose=0)
pred = model.predict(Xte[:5], verbose=0)
print("test MSE:", float(mse))
print("pred vs true (first 5):")
for p, yt in zip(pred.flatten(), yte[:5].flatten()):
    print(f"  pred={p:+.4f}  true={yt:+.4f}")

plt.plot(pred)

image.png

plt.plot(pred)
plt.plot(yte)

image.png

日経225とトレーディングシステム

import numpy as np
import pandas as pd
from pandas_datareader import data as pdr
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras import layers, regularizers
import matplotlib.pyplot as plt


# =========================
# 設定
# =========================
START = "1949-01-01"

LAM = 0.94          # EWMA
L = 30              # lookback
COST = 1e-4         # 片道コスト率(例:1bp)

# ★ refit/val を「月単位」で指定(半年=6, 四半期=3)
START_DATE_PRED = "1950-01-01"
START_DATE_CHOOSE = "1951-01-01"
REFIT_MONTHS = 12     # 6=半年, 3=四半期, 12=年次
VAL_MONTHS = 12       # 検証に使う過去期間

K_LIST = (5,7,10) #(1, 2, 3, 5, 8, 10)
TAU_LIST = (0,0.01,0.02,0.05,0.10)  # 例:ノイズカット(ロングのみなら意味が出やすい)
TARGET_AVG_ABS_W = 1
TF_CAP = 1.0

LONG_ONLY = True     # True: ロングのみ(0..1), False: ロングショート(-1..1)

UNITS = 32
L2 = 1e-4
EPOCHS = 5
BATCH = 512
SEED = 1


# =========================
# 1) FREDからNIKKEI225
# =========================
def load_nikkei225_fred(start="1949-01-01", end=None):
    df = pdr.DataReader("NIKKEI225", "fred", start=start, end=end)
    df = df.rename(columns={"NIKKEI225": "close"}).dropna()
    df = df[df["close"] > 0]
    return df


# =========================
# 2) リターンとEWMAボラ(リークしない)
#    - 学習特徴:log ret
#    - 現金決済損益:simple ret
#    - ★境界の1日リーク潰し用に date_next を作る
# =========================
def add_returns_and_ewma_vol(df, lam=0.94):
    df = df.copy()
    c = df["close"].astype(float)

    # 単純リターン(現金決済の口座更新に使う)
    df["ret_simple"] = c.pct_change()
    df["ret_next_simple"] = df["ret_simple"].shift(-1)

    # ログリターン(特徴量・ターゲットに使う)
    df["logp"] = np.log(c)
    df["ret_log"] = df["logp"].diff()
    df["ret_next_log"] = df["ret_log"].shift(-1)

    # ★次日の「日付」も保存(検証区間の境界リーク対策)
    df["date_next"] = df.index.to_series().shift(-1)

    # EWMA: ewma_vol[t] は ret_log[t-1] までで更新(翌日情報を見ない)
    r = df["ret_log"].to_numpy(np.float64)
    r2 = r * r
    s2 = np.full_like(r2, np.nan)

    first = np.where(np.isfinite(r2))[0]
    if len(first) == 0:
        raise ValueError("retが作れませんでした(データを確認)")
    i0 = first[0]
    s2[i0] = r2[i0]

    for i in range(i0 + 1, len(r2)):
        if np.isfinite(r2[i - 1]) and np.isfinite(s2[i - 1]):
            s2[i] = lam * s2[i - 1] + (1.0 - lam) * r2[i - 1]
        else:
            s2[i] = s2[i - 1]

    df["ewma_vol"] = np.sqrt(s2)
    return df


# =========================
# 3) GRU用 X,y,idx(リークなし)
#    目的変数:y = ret_next_log / vol_t
# =========================
def make_xy(df, L=30, eps=1e-12):
    feat = pd.DataFrame(index=df.index)
    feat["ret"] = df["ret_log"]
    feat["vol"] = df["ewma_vol"]
    feat["absret"] = df["ret_log"].abs()
    feat["ret_over_vol"] = df["ret_log"] / (df["ewma_vol"] + eps)

    valid = feat.notna().all(axis=1) & df["ret_next_log"].notna()
    idx_all = df.index[valid]
    feat = feat.loc[idx_all]

    y_all = (df.loc[idx_all, "ret_next_log"] / (df.loc[idx_all, "ewma_vol"] + eps)).astype(float)

    F = feat.shape[1]
    arr = feat.to_numpy(np.float32)
    yv = y_all.to_numpy(np.float32)

    X_list, y_list, idx_t = [], [], []
    for t in range(L, len(idx_all)):
        X_list.append(arr[t - L:t, :])
        y_list.append(yv[t])
        idx_t.append(idx_all[t])

    X = np.stack(X_list).astype(np.float32)   # (N,L,F)
    y = np.asarray(y_list, np.float32)        # (N,)
    idx_t = pd.DatetimeIndex(idx_t)
    return X, y, idx_t


# =========================
# 4) GRUモデル
# =========================


def build_gru(F, L, units=32, l2=1e-4):
    model = tf.keras.Sequential([
        layers.Input(shape=(L, F)),   # ★ここを (None,F) から固定へ
        layers.GRU(units, kernel_regularizer=regularizers.l2(l2)),
        layers.Dense(1),
    ])
    model.compile(optimizer="adam", loss="mse")
    return model


# =========================
# 補助:月次の区間開始列を作る
# =========================
def _make_starts(start_date, end_date, step_months):
    t = pd.Timestamp(start_date)
    end = pd.Timestamp(end_date)
    out = []
    while t < end:
        out.append(t)
        t = t + pd.DateOffset(months=step_months)
    return out


# =========================
# 5) 月次ウォークフォワードで pred を全期間作る(リークなし)
# =========================
def walkforward_pred_gru_monthly(
    X, y, idx_t,
    start_date="1950-01-01",
    refit_months=12,
    units=32, l2=1e-4,
    epochs=5, batch=512,
    seed=1
):
    tf.keras.utils.set_random_seed(seed)

    F = X.shape[2]
    starts = _make_starts(start_date, idx_t.max() + pd.Timedelta(days=1), refit_months)

    preds_all, idx_all = [], []

    for j, t0 in enumerate(starts):
        t1 = t0 + pd.DateOffset(months=refit_months)

        m_train = (idx_t < t0)
        m_test = (idx_t >= t0) & (idx_t < t1)

        if m_train.sum() == 0 or m_test.sum() == 0:
            continue

        Xtr, ytr = X[m_train], y[m_train]
        Xte = X[m_test]

        # 標準化は train のみで fit(リーク防止)
        sc = StandardScaler()
        Xtr2 = Xtr.reshape(-1, F)
        sc.fit(Xtr2)
        Xtr_s = sc.transform(Xtr2).reshape(Xtr.shape)
        Xte_s = sc.transform(Xte.reshape(-1, F)).reshape(Xte.shape)

        # (任意)期間ごとに初期乱数を変えたい場合:
        # tf.keras.utils.set_random_seed(seed + j)

        model = build_gru(F=F, L=L,units=units, l2=l2)
        model.fit(Xtr_s, ytr, epochs=epochs, batch_size=batch, verbose=0, shuffle=False)

        pred = model.predict(Xte_s, batch_size=2048, verbose=0).ravel().astype(np.float32)
        preds_all.append(pred)
        idx_all.append(idx_t[m_test])

    if len(preds_all) == 0:
        raise ValueError("predが作れませんでした(start_date等を確認)")

    pred_v = np.concatenate(preds_all)
    idx_v = pd.DatetimeIndex(np.concatenate([ix.values for ix in idx_all]))
    return pd.Series(pred_v, index=idx_v, name="pred").sort_index()


# =========================
# 6) ポジション関数(ボラ・ゲート無し)
# =========================
def make_w(pred, k=5.0, tf_=1.0, tau=0.0, long_only=True):
    if long_only:
        w = np.clip(k * pred, 0.0, 1.0) * tf_
    else:
        w = np.clip(k * pred, -1.0, 1.0) * tf_

    if tau > 0:
        w = np.where(np.abs(w) < tau, 0.0, w)

    return w.astype(np.float64)


def sharpe_252(x):
    x = np.asarray(x, float)
    mu = np.mean(x)
    sd = np.std(x)
    return np.nan if sd == 0 else (mu / sd) * np.sqrt(252)


# =========================
# 7) 検証期間で (k,tau,tf) を選んで次期に適用(リークなし)
#    ★境界リーク対策:val の末尾で next day が test_start を跨ぐ日を除外
# =========================
def walkforward_select_k_tau_monthly(
    pred_s,
    ret_next_simple_s,
    date_next_s,
    cost=1e-4,
    start_date="1951-01-01",
    refit_months=12,
    val_months=12,
    k_list=(1, 2, 3, 5, 8, 10),
    tau_list=(0.00, 0.0001),
    target_avg_abs_w=0.5,
    tf_cap=1.0,
    long_only=True
):
    starts = _make_starts(start_date, pred_s.index.max() + pd.Timedelta(days=1), refit_months)
    rows = []

    for t0 in starts:
        t1 = t0 + pd.DateOffset(months=refit_months)

        val0 = t0 - pd.DateOffset(months=val_months)
        val1 = t0

        pred_val = pred_s.loc[(pred_s.index >= val0) & (pred_s.index < val1)]
        pred_tst = pred_s.loc[(pred_s.index >= t0) & (pred_s.index < t1)]
        if len(pred_val) == 0 or len(pred_tst) == 0:
            continue

        # ★ここが肝:ret_next_simple[t] が参照する次日が test_start(t0) 以上になる t を除外
        ok = date_next_s.loc[pred_val.index] < val1
        pred_val = pred_val.loc[ok]
        if len(pred_val) == 0:
            continue

        ret_val = ret_next_simple_s.loc[pred_val.index]

        best = (-np.inf, None)  # (sharpe, (k,tau,tf))

        for k in k_list:
            for tau in tau_list:
                w_raw = make_w(pred_val.values, k=k, tf_=1.0, tau=tau, long_only=long_only)
                avg_abs = np.mean(np.abs(w_raw))
                if avg_abs < 1e-12:
                    continue

                tf_ = min(tf_cap, target_avg_abs_w / avg_abs)
                w = make_w(pred_val.values, k=k, tf_=tf_, tau=tau, long_only=long_only)

                dw = np.abs(np.diff(np.r_[0.0, w]))
                r = (w * ret_val.values.astype(np.float64)) - cost * dw

                s = sharpe_252(r)
                if np.isfinite(s) and s > best[0]:
                    best = (s, (k, tau, tf_))

        if best[1] is None:
            continue

        k_, tau_, tf_ = best[1]
        rows.append([t0, t1, k_, tau_, tf_, best[0]])

    chosen = pd.DataFrame(rows, columns=["test_start", "test_end", "k", "tau", "tf", "val_sharpe"])
    return chosen


# =========================
# 8) chosen から全期間の w と「現金決済の口座推移」を構築
# =========================
def build_cash_settled_strategy(
    pred_s,
    ret_next_simple_s,
    chosen,
    cost=1e-4,
    long_only=True,
    E0=1.0
):
    w_all = []
    idx_all = []

    for _, r in chosen.iterrows():
        t0, t1 = r["test_start"], r["test_end"]
        sl = (pred_s.index >= t0) & (pred_s.index < t1)

        p = pred_s.loc[sl]
        if len(p) == 0:
            continue

        rr = ret_next_simple_s.loc[p.index].astype(np.float64)

        # 末尾がNaN(最後の日の next return)を除外しておく
        ok = np.isfinite(rr.values)
        p = p.loc[ok]
        rr = rr.loc[p.index]
        if len(p) == 0:
            continue

        w = make_w(
            p.values,
            k=float(r["k"]),
            tf_=float(r["tf"]),
            tau=float(r["tau"]),
            long_only=long_only
        )

        w_all.append(w)
        idx_all.append(p.index)

    if len(w_all) == 0:
        raise ValueError("chosenが空です(start_date等を確認)")

    idx = pd.DatetimeIndex(np.concatenate([ix.values for ix in idx_all]))
    w = np.concatenate(w_all).astype(np.float64)
    ret = ret_next_simple_s.loc[idx].to_numpy(np.float64)

    dw = np.abs(np.diff(np.r_[0.0, w]))
    strat_r = w * ret - cost * dw

    # 口座推移(self-financing)
    eq_strat = np.empty_like(strat_r, dtype=np.float64)
    eq_index = np.empty_like(strat_r, dtype=np.float64)

    e_s = float(E0)
    e_i = float(E0)
    for t in range(len(strat_r)):
        e_s *= (1.0 + strat_r[t])
        e_i *= (1.0 + ret[t])  # 指数は常にフルエクスポージャ
        eq_strat[t] = e_s
        eq_index[t] = e_i

    out = pd.DataFrame(
        {
            "w": w,
            "ret_next_simple": ret,
            "dw_abs": dw,
            "strat_ret": strat_r,
            "eq_strat": eq_strat,
            "eq_index": eq_index,
        },
        index=idx
    ).sort_index()

    stats = {
        "Sharpe_cost": sharpe_252(out["strat_ret"].values),
        "avg_abs_w": float(np.mean(np.abs(out["w"].values))),
        "active": float(np.mean(np.abs(out["w"].values) > 1e-12)),
        "avg_abs_dw": float(np.mean(out["dw_abs"].values)),
        "entries_0_to_non0": int(np.sum((np.r_[0.0, out["w"].values[:-1]] == 0.0) & (out["w"].values != 0.0))),
        "total_strat": float(out["eq_strat"].iloc[-1] / E0 - 1.0),
        "total_index": float(out["eq_index"].iloc[-1] / E0 - 1.0),
    }
    return out, stats


# =========================
# 9) 実行
# =========================
def main():
    df = load_nikkei225_fred(start=START)
    df = add_returns_and_ewma_vol(df, lam=LAM)

    X, y, idx_t = make_xy(df, L=L)

    pred_s = walkforward_pred_gru_monthly(
        X, y, idx_t,
        start_date=START_DATE_PRED,
        refit_months=REFIT_MONTHS,
        units=UNITS, l2=L2,
        epochs=EPOCHS, batch=BATCH,
        seed=SEED
    )

    # 現金決済に使う「次日単純リターン」と「次日の日付」を pred_s の日付に揃える
    ret_s = df.loc[pred_s.index, "ret_next_simple"].astype(float)
    date_next_s = df.loc[pred_s.index, "date_next"]

    chosen = walkforward_select_k_tau_monthly(
        pred_s,
        ret_s,
        date_next_s,
        cost=COST,
        start_date=START_DATE_CHOOSE,
        refit_months=REFIT_MONTHS,
        val_months=VAL_MONTHS,
        k_list=K_LIST,
        tau_list=TAU_LIST,
        target_avg_abs_w=TARGET_AVG_ABS_W,
        tf_cap=TF_CAP,
        long_only=LONG_ONLY
    )

    out, stats = build_cash_settled_strategy(
        pred_s, ret_s, chosen,
        cost=COST, long_only=LONG_ONLY, E0=1.0
    )
    print("REFIT_MONTHS",REFIT_MONTHS,"VAL_MONTHS", VAL_MONTHS)  
    print("k_List",K_LIST,"Tau_list",TAU_LIST,"target_average_abs_w",TARGET_AVG_ABS_W)
    print("pred range:", pred_s.index.min(), "->", pred_s.index.max(), "N=", len(pred_s))
    if len(chosen) > 0:
        print("chosen periods:", chosen["test_start"].min(), "->", chosen["test_end"].max(), "rows=", len(chosen))
    print("stats:", stats)

    # =========================
    # 10) プロット(全期間)
    # =========================
    plt.figure(figsize=(12, 4))
    plt.plot(out.index, out["eq_index"], label="Index (cash-settled equity)")
    plt.plot(out.index, out["eq_strat"], label="Strategy (cash-settled equity)")
    plt.yscale("log")
    plt.legend()
    plt.title("Index vs Strategy (cash settlement, log y)")
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(12, 3))
    plt.plot(out.index, out["w"], lw=0.7)
    plt.axhline(0.0, lw=0.8)
    plt.title("Position w(t) (full period)")
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(12, 3))
    plt.plot(out.index, out["strat_ret"], lw=0.6)
    plt.axhline(0.0, lw=0.8)
    plt.title("Strategy daily return (w*R - cost*|dw|)")
    plt.tight_layout()
    plt.show()

    # (B) 通常軸(右軸/左軸)
    fig, ax1 = plt.subplots(figsize=(12, 5))
    ax1.plot(out.index, out["eq_index"], label="Index", lw=1.0)
    ax1.set_ylabel("Index equity (start=1)")
    ax2 = ax1.twinx()
    ax2.plot(out.index, out["eq_strat"], label="Strategy", lw=1.0)
    ax2.set_ylabel("Strategy equity (start=1)")
    ax1.set_title("Index vs Strategy (dual axis)")
    fig.tight_layout()
    plt.show()

    # (C) w(ポジション)
    plt.figure(figsize=(12, 4))
    plt.plot(out.index, out["w"], lw=0.8, label="w (position weight)")
    plt.axhline(0, lw=0.8)
    plt.legend()
    plt.title("Position w")
    plt.tight_layout()
    plt.show()
if __name__ == "__main__":
    main()

image.png

image.png

image.png

image.png

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?