0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

気の利いたローリング単回帰の実装メモ

Posted at

ローリング回帰とは,時系列データに対して一定のウィンドウ幅(例: 250日)を設定し,その範囲をずらしながら回帰分析を繰り返し実行する手法です.実装自体は難しくないですが,ここでは以下のような「気の利いた」単回帰の実装方法をメモします.

  • for文を回さない(=高速かつSQLでも実装可能)
  • 回帰係数と(一定の仮定のもとで算出した)その標準誤差を計算
  • 1つのウィンドウで,複数の回帰を実行

アウトプットのイメージは以下です.2つのサンプルに対し,それぞれ10個の測定データがあり,ウィンドウ幅を5としてローリング単回帰をしています.

image.png

単回帰分析の理論の整理

ローリング単回帰の「気の利いた」実装のため,単回帰分析の理論を整理します.
まず,単回帰モデルは
$$
y_t = \alpha + \beta x_t + \epsilon_t
$$
によって表されます.ここで,$t$は時点を表すラベル,$\alpha$は切片,$\beta$は回帰係数,$\epsilon_t$は誤差項(確率変数)です.
また,以下の標準的な仮定を要請します.

  1. 外生性:$x_t$は誤差項と無相関である
  2. 不偏性:誤差項$\epsilon_t$の期待値は$0$である
  3. 無相関性:誤差項$\epsilon_t$は互いに無相関である
  4. 等分散性:誤差項$\epsilon_t$の分散は$\sigma _ {\epsilon}$で一定である

誤差項が自己相関を持つ場合,無相関性が成り立たなくなる点には要注意です.

前提として,以下で定義される平均$\hat{\mu} _ x$,$\hat{\mu} _ y$と分散$\hat{\sigma}_x^2$,$\hat{\sigma}_y^2$および共分散$\hat{\sigma} _ {xy}$の計算メソッドは実装されているとしましょう.目的は,回帰結果を以下の左辺で表すことです.

\displaylines{
\hat { \mu } _ x = \dfrac{1}{N} \sum _ { i = 1 } ^ N x _ t, \quad
\hat{\mu} _ y = \dfrac{1}{N} \sum_{i=1}^N y_t \\
\hat{\sigma}_x^2 = \dfrac{1}{N} \sum_{i=1}^N (x_t - \hat{\mu} _ x)^2, \quad
\hat{\sigma}_y^2 = \dfrac{1}{N} \sum_{i=1}^N (y_t - \hat{\mu} _ y)^2 \\
\hat{\sigma} _ {xy} 
= \dfrac{1}{N} \sum_{i=1}^N (x_t - \hat{\mu} _ x)(y_t - \hat{\mu} _ y)
= \dfrac{1}{N} \sum_{i=1}^N x_t y_t - \hat{\mu} _ x \hat{\mu} _ y
}
細かい補足 `SQL Server`では共分散を計算する関数がなさそうなので,後の実装では自前で計算しています. また,ここで求めているのは標本分散(≠不偏分散)であることに注意してください.

まず,最小二乗法による推定量$\hat{\alpha}$,$\hat{\beta}$およびその分散$\sigma _ {\alpha}^2$,$\sigma _ {\beta}^2$は以下のように求められます.

\displaylines{
\hat{\alpha} = \hat{\mu} _ y - \hat{\beta} \hat{\mu} _ x
,\quad
\hat{\beta} = \dfrac{\hat{\sigma} _ {xy} }
{\hat{\sigma}_x^2}

\\

\sigma _ {\alpha}^2 = \dfrac{\sigma_{\epsilon}^2}{N} \left (1 + \dfrac{ \hat {\mu} _ x ^2}{ \hat{ \sigma} _ x ^ 2} \right )
,\quad
\sigma _ {\beta}^2 = \dfrac{\sigma_{\epsilon}^2}{N \hat{\sigma}_x^2}
}

ここで,未知である誤差項の分散$\sigma_{\epsilon}^2$は以下のように推定され,その期待値$\mathbb{E}[\hat{\sigma} _ {\epsilon}^2]$は真の値$\sigma_{\epsilon}^2$に一致します(少し計算が必要).

$$
\hat{ \sigma } _ { \epsilon} ^ 2
= \dfrac{1}{N-2} \sum_{i=1}^N (y_t - \hat{\alpha} - \hat{\beta} x_t)^2
= \dfrac{N}{N-2} \left ( \hat{\sigma}_y^2 - \hat{\beta}^2 \hat{\sigma} _ {x}^2 \right )
$$

いま,未知である誤差項の分散$\sigma _ {\epsilon}^2$が,推定値$\hat{\sigma} _ {\epsilon}^2$に等しいことを仮定します.その場合,切片と回帰係数の推定値$\hat{\alpha}$,$\hat{\beta}$の標準誤差は以下のように推定されます.

\displaylines{
\hat{\sigma} _ {\alpha}^2 = \dfrac{\hat{\sigma} _ {\epsilon}^2}{N} \left(1 + \dfrac{\hat{\mu} _ x^2}{\hat{\sigma}_x^2} \right)
,\quad
\hat{\sigma} _{\beta}^2 = \dfrac{\hat{\sigma}_{\epsilon}^2}{N \hat{\sigma}_x^2}
}

「誤差項の分散$\sigma _ {\epsilon}^2$が推定値$\hat{\sigma} _ {\epsilon}^2$に等しい」という仮定は,データ数$n$が十分大きい場合には問題にならなさそうです.例えば,誤差項の正規性を追加で要請する場合,$\hat{ \sigma } _ { \epsilon}^2$の分散は$\mathcal{O}(n^{-1})$で$0$に収束することが示せます($(n-2) \hat{ \sigma } _ { \epsilon}^2 / \sigma _ {\epsilon}^2 $が自由度$n-2$の$\chi^2$分布に従うことを使う).ただ,一定の仮定を置いていることを認識する必要はあるでしょう.

ローリング単回帰の実装

実際に,ローリング単回帰を実装すると以下のようになります.PythonではpandaspolarsSQLではSQL Serverを使っています.かなり雑に「書いてみた」程度のコードなので,工夫の余地があればご指摘ください.

Python

pandas

import numpy as np
import pandas as pd

window = 50
df_reg = df.sort_values(["id", "step"]).reset_tndex(drop=True)
df_reg["mu_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).mean().values
df_reg["mu_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).mean().values
df_reg["mu_y"] = df_reg.groupby("id")["y"].rolling(window=window, min_periods=window).mean().values
# pandasではデフォルトで不偏分散を計算するため,標本分散(ddof=0)を計算する
df_reg["sigma_x"] = df_reg.groupby("id")["x"].rolling(window=window, min_periods=window).std(ddof=0).values
df_reg["sigma_y"] = df_reg.groupby("id")["y"].rolling(window=window, min_periods=window).std(ddof=0).values
df_reg["cov_xy"] = df_reg.groupby("id")[["x", "y"]].rolling(window=window, min_periods=window).cov(ddof=0).groupby(level=[0,1]).last()["x"].values

df_reg["beta_hat"] = df_reg["cov_xy"] / (df_reg["sigma_x"] ** 2)
df_reg["alpha_hat"] = df_reg["mu_y"] - df_reg["beta_hat"] * df_reg["mu_x"]
df_reg["sigma_residual_hat"] = np.sqrt(window/(window-2) * (df_reg["sigma_y"] ** 2 - df_reg["beta_hat"] ** 2 * df_reg["sigma_x"] ** 2)) 
df_reg["sigma_alpha_hat"] = df_reg["sigma_residual_hat"] * np.sqrt(1/window + df_reg["mu_x"] ** 2 / (window * df_reg["sigma_x"] ** 2))
df_reg["sigma_beta_hat"] = df_reg["sigma_residual_hat"] / (df_reg["sigma_x"] * np.sqrt(window))

polars

import numpy as np
import polars as pl

df_reg = pl.DataFrame(df)
# convert pandasa to polars
df_reg = df_reg.sort(
    "id", "step"
).with_columns(
    mu_x = pl.col("x").rolling_mean(window).over("id"),
    mu_y = pl.col("y").rolling_mean(window).over("id"),
    sigma_x = pl.col("x").rolling_std(window, ddof=0).over("id"),
    sigma_y = pl.col("y").rolling_std(window, ddof=0).over("id"),
    cov_xy = pl.rolling_cov(pl.col("x"), pl.col("y"), window_size = window, ddof=0).over("id")
).with_columns(
    beta_hat = pl.col("cov_xy") / (pl.col("sigma_x") ** 2),
).with_columns(
    alpha_hat = pl.col("mu_y") - pl.col("beta_hat") * pl.col("mu_x"),    
    sigma_residual_hat = np.sqrt(window/(window-2) * (pl.col("sigma_y") ** 2 - pl.col("beta_hat") ** 2 * pl.col("sigma_x") ** 2)),
).with_columns(
    sigma_alpha_hat = pl.col("sigma_residual_hat") / np.sqrt(window) * np.sqrt(1 + pl.col("mu_x") ** 2 / (pl.col("sigma_x") ** 2)),
    sigma_beta_hat = pl.col("sigma_residual_hat") / (pl.col("sigma_x") * np.sqrt(window))
)

SQL

WITH UNIV_FLAG AS (
SELECT 
	id
	,step
	,x
	,y
	,CASE WHEN COUNT(id) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) = 49+1 THEN 1 ELSE 0 END AS flag_full
FROM TestTable
),

UNIV_AVG_STD AS (
SELECT *
	,CASE WHEN flag_full=1 THEN AVG(x) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS mu_x
	,CASE WHEN flag_full=1 THEN AVG(y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS mu_y
	,CASE WHEN flag_full=1 THEN STDEVP(x) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS sigma_x
	,CASE WHEN flag_full=1 THEN STDEVP(y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) ELSE NULL END AS sigma_y
FROM UNIV_FLAG
),

UNIV_COV AS (
SELECT *
		,AVG(x*y) OVER (PARTITION BY id ORDER BY step ROWS BETWEEN 49 PRECEDING AND CURRENT ROW) - mu_x*mu_y AS cov_xy
FROM UNIV_AVG_STD
),

UNIV_BETA AS (
SELECT *
	,cov_xy / (sigma_x * sigma_x) AS beta_hat
	
FROM UNIV_COV
),

UNIV_RESIDUAL AS (
SELECT *
		,mu_y - beta_hat * mu_x AS alpha_hat
		,SQRT((49+1)/(49-1)) * SQRT(sigma_y * sigma_y - beta_hat * beta_hat * sigma_x * sigma_x) AS sigma_residual_hat
FROM UNIV_BETA
)

SELECT *
	,sigma_residual_hat / SQRT(49+1) * SQRT(1 + mu_x * mu_x / (sigma_x * sigma_x)) AS sigma_alpha_hat
	,sigma_residual_hat / (sigma_x * SQRT(49+1)) AS sigma_beta_hat
FROM UNIV_RESIDUAL

メモ

pandaspolarsSQL Serverの内部的な計算はわかりませんが,ローリングする際に過去時点の計算結果を捨てて計算しているため,逐次的に計算すればもっと(ウィンドウ幅倍程度?)高速になるかもしれません.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?