ローリング回帰とは,時系列データに対して一定のウィンドウ幅(例: 250日)を設定し,その範囲をずらしながら回帰分析を繰り返し実行する手法です.実装自体は難しくないですが,ここでは以下のような「気の利いた」単回帰の実装方法をメモします.
-
for
文を回さない(=高速かつSQL
でも実装可能) - 回帰係数と(一定の仮定のもとで算出した)その標準誤差を計算
- 1つのウィンドウで,複数の回帰を実行
アウトプットのイメージは以下です.2つのサンプルに対し,それぞれ10個の測定データがあり,ウィンドウ幅を5としてローリング単回帰をしています.
単回帰分析の理論の整理
ローリング単回帰の「気の利いた」実装のため,単回帰分析の理論を整理します.
まず,単回帰モデルは
$$
y_t = \alpha + \beta x_t + \epsilon_t
$$
によって表されます.ここで,$t$は時点を表すラベル,$\alpha$は切片,$\beta$は回帰係数,$\epsilon_t$は誤差項(確率変数)です.
また,以下の標準的な仮定を要請します.
- 外生性:$x_t$は誤差項と無相関である
- 不偏性:誤差項$\epsilon_t$の期待値は$0$である
- 無相関性:誤差項$\epsilon_t$は互いに無相関である
- 等分散性:誤差項$\epsilon_t$の分散は$\sigma _ {\epsilon}$で一定である
誤差項が自己相関を持つ場合,無相関性が成り立たなくなる点には要注意です.
前提として,以下で定義される平均$\hat{\mu} _ x$,$\hat{\mu} _ y$と分散$\hat{\sigma}_x^2$,$\hat{\sigma}_y^2$および共分散$\hat{\sigma} _ {xy}$の計算メソッドは実装されているとしましょう.目的は,回帰結果を以下の左辺で表すことです.
\displaylines{
\hat { \mu } _ x = \dfrac{1}{N} \sum _ { i = 1 } ^ N x _ t, \quad
\hat{\mu} _ y = \dfrac{1}{N} \sum_{i=1}^N y_t \\
\hat{\sigma}_x^2 = \dfrac{1}{N} \sum_{i=1}^N (x_t - \hat{\mu} _ x)^2, \quad
\hat{\sigma}_y^2 = \dfrac{1}{N} \sum_{i=1}^N (y_t - \hat{\mu} _ y)^2 \\
\hat{\sigma} _ {xy}
= \dfrac{1}{N} \sum_{i=1}^N (x_t - \hat{\mu} _ x)(y_t - \hat{\mu} _ y)
= \dfrac{1}{N} \sum_{i=1}^N x_t y_t - \hat{\mu} _ x \hat{\mu} _ y
}
細かい補足
`SQL Server`では共分散を計算する関数がなさそうなので,後の実装では自前で計算しています. また,ここで求めているのは標本分散(≠不偏分散)であることに注意してください.まず,最小二乗法による推定量$\hat{\alpha}$,$\hat{\beta}$およびその分散$\sigma _ {\alpha}^2$,$\sigma _ {\beta}^2$は以下のように求められます.
\displaylines{
\hat{\alpha} = \hat{\mu} _ y - \hat{\beta} \hat{\mu} _ x
,\quad
\hat{\beta} = \dfrac{\hat{\sigma} _ {xy} }
{\hat{\sigma}_x^2}
\\
\sigma _ {\alpha}^2 = \dfrac{\sigma_{\epsilon}^2}{N} \left (1 + \dfrac{ \hat {\mu} _ x ^2}{ \hat{ \sigma} _ x ^ 2} \right )
,\quad
\sigma _ {\beta}^2 = \dfrac{\sigma_{\epsilon}^2}{N \hat{\sigma}_x^2}
}
ここで,未知である誤差項の分散$\sigma_{\epsilon}^2$は以下のように推定され,その期待値$\mathbb{E}[\hat{\sigma} _ {\epsilon}^2]$は真の値$\sigma_{\epsilon}^2$に一致します(少し計算が必要).
$$
\hat{ \sigma } _ { \epsilon} ^ 2
= \dfrac{1}{N-2} \sum_{i=1}^N (y_t - \hat{\alpha} - \hat{\beta} x_t)^2
= \dfrac{N}{N-2} \left ( \hat{\sigma}_y^2 - \hat{\beta}^2 \hat{\sigma} _ {x}^2 \right )
$$
いま,未知である誤差項の分散$\sigma _ {\epsilon}^2$が,推定値$\hat{\sigma} _ {\epsilon}^2$に等しいことを仮定します.その場合,切片と回帰係数の推定値$\hat{\alpha}$,$\hat{\beta}$の標準誤差は以下のように推定されます.
\displaylines{
\hat{\sigma} _ {\alpha}^2 = \dfrac{\hat{\sigma} _ {\epsilon}^2}{N} \left(1 + \dfrac{\hat{\mu} _ x^2}{\hat{\sigma}_x^2} \right)
,\quad
\hat{\sigma} _{\beta}^2 = \dfrac{\hat{\sigma}_{\epsilon}^2}{N \hat{\sigma}_x^2}
}
「誤差項の分散$\sigma _ {\epsilon}^2$が推定値$\hat{\sigma} _ {\epsilon}^2$に等しい」という仮定は,データ数$n$が十分大きい場合には問題にならなさそうです.例えば,誤差項の正規性を追加で要請する場合,$\hat{ \sigma } _ { \epsilon}^2$の分散は$\mathcal{O}(n^{-1})$で$0$に収束することが示せます($(n-2) \hat{ \sigma } _ { \epsilon}^2 / \sigma _ {\epsilon}^2 $が自由度$n-2$の$\chi^2$分布に従うことを使う).ただ,一定の仮定を置いていることを認識する必要はあるでしょう.
ローリング単回帰の実装
実際に,ローリング単回帰を実装すると以下のようになります.Python
ではpandas
とpolars
,SQL
ではSQL Server
を使っています.かなり雑に「書いてみた」程度のコードなので,工夫の余地があればご指摘ください.
Python
pandas
import numpy as np
import pandas as pd
window = 50
df_reg = df.sort_values(["id", "step"]).reset_tndex(drop=True)
df_reg["mu_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).mean().values
df_reg["mu_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).mean().values
df_reg["mu_y"] = df_reg.groupby("id")["y"].rolling(window=window, min_periods=window).mean().values
# pandasではデフォルトで不偏分散を計算するため,標本分散(ddof=0)を計算する
df_reg["sigma_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).std(ddof=0).values
df_reg["sigma_y"] = df_reg.groupby("id")["y"].rolling(window=window, min_periods=window).std(ddof=0).values
df_reg["cov_xy"] = df_reg.groupby("id")[["x", "y"]].rolling(window=window, min_periods=window).cov(ddof=0).groupby(level=[0,1]).last()["x"].values
df_reg["beta_hat"] = df_reg["cov_xy"] / (df_reg["sigma_x"] ** 2)
df_reg["alpha_hat"] = df_reg["mu_y"] - df_reg["beta_hat"] * df_reg["mu_x"]
df_reg["sigma_residual_hat"] = np.sqrt(window/(window-2) * (df_reg["sigma_y"] ** 2 - df_reg["beta_hat"] ** 2 * df_reg["sigma_x"] ** 2))
df_reg["sigma_alpha_hat"] = df_reg["sigma_residual_hat"] * np.sqrt(1/window + df_reg["mu_x"] ** 2 / (window * df_reg["sigma_x"] ** 2))
df_reg["sigma_beta_hat"] = df_reg["sigma_residual_hat"] / (df_reg["sigma_x"] * np.sqrt(window))
polars
import numpy as np
import polars as pl
df_reg = pl.DataFrame(df)
# convert pandasa to polars
df_reg = df_reg.sort(
"id", "step"
).with_columns(
mu_x = pl.col("x").rolling_mean(window).over("id"),
mu_y = pl.col("y").rolling_mean(window).over("id"),
sigma_x = pl.col("x").rolling_std(window, ddof=0).over("id"),
sigma_y = pl.col("y").rolling_std(window, ddof=0).over("id"),
cov_xy = pl.rolling_cov(pl.col("x"), pl.col("y"), window_size = window, ddof=0).over("id")
).with_columns(
beta_hat = pl.col("cov_xy") / (pl.col("sigma_x") ** 2),
).with_columns(
alpha_hat = pl.col("mu_y") - pl.col("beta_hat") * pl.col("mu_x"),
sigma_residual_hat = np.sqrt(window/(window-2) * (pl.col("sigma_y") ** 2 - pl.col("beta_hat") ** 2 * pl.col("sigma_x") ** 2)),
).with_columns(
sigma_alpha_hat = pl.col("sigma_residual_hat") / np.sqrt(window) * np.sqrt(1 + pl.col("mu_x") ** 2 / (pl.col("sigma_x") ** 2)),
sigma_beta_hat = pl.col("sigma_residual_hat") / (pl.col("sigma_x") * np.sqrt(window))
)
SQL
WITH UNIV_FLAG AS (
SELECT
id
,step
,x
,y
,CASE WHEN COUNT(id) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) = 49+1 THEN 1 ELSE 0 END AS flag_full
FROM TestTable
),
UNIV_AVG_STD AS (
SELECT *
,CASE WHEN flag_full=1 THEN AVG(x) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS mu_x
,CASE WHEN flag_full=1 THEN AVG(y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS mu_y
,CASE WHEN flag_full=1 THEN STDEVP(x) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS sigma_x
,CASE WHEN flag_full=1 THEN STDEVP(y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS sigma_y
FROM UNIV_FLAG
),
UNIV_COV AS (
SELECT *
,AVG(x*y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) - mu_x*mu_y AS cov_xy
FROM UNIV_AVG_STD
),
UNIV_BETA AS (
SELECT *
,cov_xy / (sigma_x * sigma_x) AS beta_hat
FROM UNIV_COV
),
UNIV_RESIDUAL AS (
SELECT *
,mu_y - beta_hat * mu_x AS alpha_hat
,SQRT((49+1)/(49-1)) * SQRT(sigma_y * sigma_y - beta_hat * beta_hat * sigma_x * sigma_x) AS sigma_residual_hat
FROM UNIV_BETA
)
SELECT *
,sigma_residual_hat / SQRT(49+1) * SQRT(1 + mu_x * mu_x / (sigma_x * sigma_x)) AS sigma_alpha_hat
,sigma_residual_hat / (sigma_x * SQRT(49+1)) AS sigma_beta_hat
FROM UNIV_RESIDUAL
メモ
pandas
,polars
,SQL Server
の内部的な計算はわかりませんが,ローリングする際に過去時点の計算結果を捨てて計算しているため,逐次的に計算すればもっと(ウィンドウ幅倍程度?)高速になるかもしれません.