TVP-VARモデル
- Time-varying parameter Vector Autoregression
- 経済構造や政策スタンスの時期による違いを吸収可能
- 推定にはカルマンフィルタなど高度な手法が必要で計算負荷が高い
仮説
- 好景気→持ち家率増加
データ(Federal Reserve Economic Data: アメリカ合衆国のものを利用 2025/11月21日時点取得)
統合データベース
左から実質GDP成長率、持ち家率、住宅ローン金利
1971-04-01 0.54082 64.1 7.41
1971-07-01 0.82245 64.4 7.66
1971-10-01 0.23486 64.5 7.55
1972-01-01 1.83722 64.3 7.35
1972-04-01 2.26929 64.5 7.35
1972-07-01 0.94409 64.3 7.41
1972-10-01 1.67444 64.4 7.43
1973-01-01 2.47494 64.9 7.45
1973-04-01 1.08874 64.4 7.65
1973-07-01 -0.52584 64.4 8.46
1973-10-01 0.94923 64.4 8.63
1974-01-01 -0.85971 64.8 8.46
1974-04-01 0.23770 64.8 8.89
1974-07-01 -0.94528 64.6 9.61
1974-10-01 -0.38841 64.4 9.79
1975-01-01 -1.21763 64.4 9.17
1975-04-01 0.71470 64.9 8.88
1975-07-01 1.71204 64.6 8.98
1975-10-01 1.34709 64.5 9.16
1976-01-01 2.24947 64.6 8.87
1976-04-01 0.73337 64.6 8.78
1976-07-01 0.54741 64.9 8.97
1976-10-01 0.72260 64.8 8.85
1977-01-01 1.18623 64.8 8.69
1977-04-01 1.94192 64.5 8.81
1977-07-01 1.80316 65.0 8.93
1977-10-01 0.00199 64.9 8.94
1978-01-01 0.31984 64.8 9.13
1978-04-01 3.86477 64.4 9.56
1978-07-01 1.00575 65.2 9.76
1978-10-01 1.34413 65.4 10.12
1979-01-01 0.17959 64.8 10.41
1979-04-01 0.10675 64.9 10.76
1979-07-01 0.74280 65.8 11.16
1979-10-01 0.25005 65.4 12.48
1980-01-01 0.31457 65.5 13.68
1980-04-01 -2.06043 65.5 14.42
1980-07-01 -0.11885 65.8 12.64
1980-10-01 1.86492 65.5 14.23
1981-01-01 1.95942 65.6 15.13
1981-04-01 -0.74108 65.3 16.24
1981-07-01 1.19750 65.6 17.38
1981-10-01 -1.08960 65.2 17.74
1982-01-01 -1.55364 64.8 17.41
1982-04-01 0.45618 64.9 16.77
1982-07-01 -0.38227 64.9 16.22
1982-10-01 0.04001 64.5 14.03
1983-01-01 1.31779 64.7 13.03
1983-04-01 2.27552 64.7 12.76
1983-07-01 1.99895 64.8 13.64
1983-10-01 2.08618 64.4 13.46
1984-01-01 1.95479 64.6 13.33
1984-04-01 1.72776 64.6 14.04
1984-07-01 0.96401 64.6 14.49
1984-10-01 0.82077 64.1 13.65
1985-01-01 0.96898 64.1 13.06
1985-04-01 0.88047 64.1 12.79
1985-07-01 1.52727 63.9 12.14
1985-10-01 0.74345 63.5 11.73
1986-01-01 0.93375 63.6 10.58
1986-04-01 0.45030 63.8 10.25
1986-07-01 0.95667 63.8 10.24
1986-10-01 0.53639 63.9 9.68
1987-01-01 0.74184 63.8 9.11
1987-04-01 1.07818 63.8 10.34
1987-07-01 0.86718 64.2 10.48
1987-10-01 1.71696 64.1 10.86
1988-01-01 0.51688 63.7 10.07
1988-04-01 1.31403 63.7 10.36
1988-07-01 0.58597 64.0 10.50
1988-10-01 1.33246 63.8 10.41
1989-01-01 1.01639 63.9 10.82
1989-04-01 0.76310 63.8 10.64
1989-07-01 0.74074 64.1 10.01
1989-10-01 0.19698 63.8 9.81
1990-01-01 1.09288 64.0 10.13
1990-04-01 0.36297 63.7 10.32
1990-07-01 0.06658 64.0 10.10
1990-10-01 -0.91040 64.1 9.96
1991-01-01 -0.46794 63.9 9.50
1991-04-01 0.77969 63.9 9.52
1991-07-01 0.50536 64.2 9.27
1991-10-01 0.34854 64.2 8.69
1992-01-01 1.19725 64.0 8.69
1992-04-01 1.08430 63.9 8.68
1992-07-01 0.98811 64.3 8.02
1992-10-01 1.04286 64.4 8.19
1993-01-01 0.16694 64.2 7.72
1993-04-01 0.58217 64.4 7.45
1993-07-01 0.47715 64.7 7.09
1993-10-01 1.36015 64.6 7.05
1994-01-01 0.97036 63.8 7.30
1994-04-01 1.35518 63.8 8.43
1994-07-01 0.58458 64.1 8.59
1994-10-01 1.14557 64.2 9.11
1995-01-01 0.35478 64.2 8.79
1995-04-01 0.29832 64.7 7.92
1995-07-01 0.85073 65.0 7.70
1995-10-01 0.67908 65.1 7.34
1996-01-01 0.74905 65.1 7.27
1996-04-01 1.66794 65.4 8.10
1996-07-01 0.89703 65.6 8.15
1996-10-01 1.03837 65.4 7.70
1997-01-01 0.64544 65.4 7.79
1997-04-01 1.66522 65.7 7.93
1997-07-01 1.24857 66.0 7.47
1997-10-01 0.85386 65.7 7.21
1998-01-01 1.00375 65.9 7.05
1998-04-01 0.92569 66.0 7.10
1998-07-01 1.25894 66.8 6.87
1998-10-01 1.60920 66.4 6.76
1999-01-01 0.93943 66.7 6.88
1999-04-01 0.83474 66.6 7.18
1999-07-01 1.32588 67.0 7.79
1999-10-01 1.64067 66.9 7.84
2000-01-01 0.36279 67.1 8.26
2000-04-01 1.82129 67.2 8.32
2000-07-01 0.10193 67.7 8.02
2000-10-01 0.59704 67.5 7.62
2001-01-01 -0.32780 67.5 7.01
2001-04-01 0.62450 67.7 7.13
2001-07-01 -0.40064 68.1 6.96
2001-10-01 0.27478 68.0 6.77
2002-01-01 0.83636 67.8 6.97
2002-04-01 0.61271 67.6 6.81
2002-07-01 0.40651 68.0 6.29
2002-10-01 0.12361 68.3 6.08
2003-01-01 0.52664 68.0 5.84
2003-04-01 0.88562 68.0 5.51
2003-07-01 1.66320 68.4 6.03
2003-10-01 1.16072 68.6 5.92
2004-01-01 0.56665 68.6 5.60
2004-04-01 0.77495 69.2 6.11
2004-07-01 0.94859 69.0 5.89
2004-10-01 1.02021 69.2 5.74
2005-01-01 1.10936 69.1 5.77
2005-04-01 0.49262 68.6 5.71
2005-07-01 0.78381 68.8 5.76
2005-10-01 0.55552 69.0 6.23
2006-01-01 1.34538 68.5 6.25
2006-04-01 0.25875 68.7 6.60
2006-07-01 0.14996 69.0 6.56
2006-10-01 0.85945 68.9 6.24
2007-01-01 0.30084 68.4 6.21
2007-04-01 0.61176 68.2 6.36
2007-07-01 0.57603 68.2 6.55
2007-10-01 0.62824 67.8 6.22
2008-01-01 -0.42676 67.8 5.87
2008-04-01 0.59543 68.1 6.09
2008-07-01 -0.52526 67.9 6.32
2008-10-01 -2.18903 67.5 5.84
2009-01-01 -1.13487 67.3 5.06
2009-04-01 -0.17865 67.4 5.01
2009-07-01 0.35119 67.6 5.16
2009-10-01 1.08091 67.2 4.92
2010-01-01 0.48450 67.1 5.00
2010-04-01 0.96759 66.9 4.92
2010-07-01 0.77108 66.9 4.45
2010-10-01 0.52511 66.5 4.44
2011-01-01 -0.23720 66.4 4.85
2011-04-01 0.67658 65.9 4.65
2011-07-01 -0.02231 66.3 4.29
2011-10-01 1.12305 66.0 4.00
2012-01-01 0.83859 65.4 3.92
2012-04-01 0.44633 65.5 3.79
2012-07-01 0.14403 65.5 3.55
2012-10-01 0.11564 65.4 3.36
2013-01-01 0.98656 65.0 3.50
2013-04-01 0.26764 65.0 3.67
2013-07-01 0.85139 65.3 4.44
2013-10-01 0.87174 65.2 4.29
2014-01-01 -0.34510 64.8 4.36
2014-04-01 1.29184 64.7 4.23
2014-07-01 1.21539 64.4 4.14
2014-10-01 0.50573 64.0 3.96
2015-01-01 0.90048 63.7 3.72
2015-04-01 0.61941 63.4 3.82
2015-07-01 0.40025 63.7 3.95
2015-10-01 0.18448 63.8 3.90
2016-01-01 0.57952 63.5 3.74
2016-04-01 0.32112 62.9 3.59
2016-07-01 0.70939 63.5 3.45
2016-10-01 0.55430 63.7 3.84
2017-01-01 0.48689 63.6 4.17
2017-04-01 0.55987 63.7 3.98
2017-07-01 0.78852 63.9 3.88
2017-10-01 1.12705 64.2 3.92
2018-01-01 0.81341 64.2 4.28
2018-04-01 0.53083 64.3 4.54
2018-07-01 0.62370 64.4 4.57
2018-10-01 0.14164 64.8 4.78
2019-01-01 0.62432 64.2 4.37
2019-04-01 0.83515 64.1 4.01
2019-07-01 1.17000 64.8 3.66
2019-10-01 0.68188 65.1 3.70
2020-01-01 -1.31632 65.3 3.52
2020-04-01 -7.87678 67.9 3.24
2020-07-01 7.76228 67.4 2.95
2020-10-01 1.13352 65.8 2.76
2021-01-01 1.39582 65.6 2.88
2021-04-01 1.70189 65.4 3.00
2021-07-01 0.82505 65.4 2.87
2021-10-01 1.71576 65.5 3.08
2022-01-01 -0.25480 65.4 3.82
2022-04-01 0.15655 65.8 5.27
2022-07-01 0.72190 66.0 5.62
2022-10-01 0.69024 65.9 6.66
2023-01-01 0.72385 66.0 6.37
2023-04-01 0.62787 65.9 6.51
2023-07-01 1.15361 66.0 7.04
2023-10-01 0.84406 65.7 7.30
2024-01-01 0.20986 65.6 6.75
2024-04-01 0.88549 65.6 7.00
2024-07-01 0.82478 65.6 6.51
2024-10-01 0.45987 65.7 6.63
2025-01-01 -0.16252 65.1 6.83
2025-04-01 0.94600 65.0 6.79
コード
python
"""
Structural TVP-SVAR with Stochastic Volatility (SV) and Stability Check + TV-IRF
- Model: A_t y_t = H_t beta_t(=B_t x_t) + u_t, u_t ~ N(0, D_t=diag(exp(h_t)))
Reduced-form: y_t = H_t(r) beta_t + eps_t, eps_t ~ N(0, Sigma_t=A_t^{-1} D_t A_t^{-T})
- Gibbs:
* beta_{1:T} via Carter-Kohn FFBS with time-varying Sigma_t
* A_t (lower-triangular off-diagonals) via regression-RW FFBS per row
* h_{i,t} via Gaussian-approx FFBS on log(u_{i,t}^2)
* Q_beta via Inverse-Wishart; sigma^2_{A,ij}, sigma^2_{h,i} via Inverse-Gamma
- IRF: Structural (using A_t and D_t) and Generalized (Pesaran-Shin)
- Stability: Companion matrix spectral radius check (+ projection to stationarity option)
"""
import numpy as np
import scipy.linalg as la
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict
import matplotlib.pyplot as plt
import pandas as pd
from io import StringIO
import time
# ==========
# Utilities
# ==========
def build_var_design(Y: np.ndarray, p: int, include_const: bool = True):
Y = np.asarray(Y, dtype=float)
T, k = Y.shape
assert T > p, "Not enough observations for the chosen lag p."
rows = T - p
m_eq = (k * p) + (1 if include_const else 0)
X = np.zeros((rows, m_eq))
# beta (m_eq, k)
Yp = np.zeros((rows, k))
for t in range(p, T):
parts = []
if include_const:
parts.append(np.array([1.0]))
for L in range(1, p + 1):
parts.append(np.asarray(Y[t - L, :]).ravel())
x = np.concatenate(parts, axis=0)
# x = [1, y(GDP)(p-1) y(inflation)(p-1), ... y(GDP)(0) y(inflation)(0)]
# x = [1, y(GDP)(p) y(inflation)(p), ... y(GDP)(1) y(inflation)(1)]
X[t - p, :] = x
Yp[t - p, :] = Y[t, :]
# Used in forecast
lags = [Y[T - L, :].copy() for L in range(1, p + 1)]
return X, Yp, lags
def kron_Ix(x: np.ndarray, k: int):
# 1 x.reshape(1, -1)
# 1 x.reshape(1, -1)
# 1 x.reshape(1, -1)
# x.reshape(1, -1) is kp + 1
return np.kron(np.eye(k), x.reshape(1, -1))
def ols_multivariate(X: np.ndarray, Y: np.ndarray):
# [y(GDP)p y(inflation)p ] = [1, y(GDP)(p-1) y(inflation)(p-1), ... y(GDP)(0) y(inflation)(0)] [beta(1, 0) beta(2, 0)]
# [y(GDP)p + 1 y(inflation)p + 1] = [1, y(GDP)(p) y(inflation)(p), ... y(GDP)(1) y(inflation)(1)] × [beta(1, 1) beta(2, 1)]
# ・
# [beta(1, p) beta(2, p)]
XtX = X.T @ X
XtY = X.T @ Y
XtX_inv = la.pinvh(XtX)
B = XtX_inv @ XtY
B_hat = B.T
E = Y - X @ B
rows, k = Y.shape
dof = max(rows - X.shape[1], 1)
Sigma = (E.T @ E) / dof
Sigma = 0.5 * (Sigma + Sigma.T)
return B_hat, Sigma
def sample_invwishart(nu: int, S: np.ndarray, rng: np.random.Generator):
# S (p, p)
p = S.shape[0]
assert nu > p - 1, "df too small for IW."
Sinv = la.inv(S)
Sinv = 0.5 * (Sinv + Sinv.T)
# Cholesky decomposition is a method for decomposing a symmetric matrix
# into the product of a lower triangular matrix L and its transpose.
try:
L = la.cholesky(Sinv, lower=True)
except la.LinAlgError:
L = la.cholesky(Sinv + 1e-10 * np.eye(p), lower=True)
Z = rng.standard_normal(size=(p, nu))
W = L @ (Z @ Z.T) @ L.T
IW = la.inv(W)
IW = 0.5 * (IW + IW.T)
return IW
def sample_inv_gamma(shape: float, scale: float, rng: np.random.Generator):
g = rng.gamma(shape, 1.0 / max(scale, 1e-12))
return 1.0 / max(g, 1e-12)
# ==========================================
# FFBS: beta with time-varying Sigma_t
# ==========================================
def forward_filter_timevarying(y_seq: List[np.ndarray], H_seq: List[np.ndarray],
Sigma_seq: List[np.ndarray], Q: np.ndarray,
b0: np.ndarray, P0: np.ndarray):
T = len(y_seq)
m = b0.shape[0]
k = y_seq[0].shape[0]
m_arr = np.zeros((T, m))
C_arr = np.zeros((T, m, m))
a_arr = np.zeros((T, m))
R_arr = np.zeros((T, m, m))
mtm1 = b0.copy()
Ctm1 = P0.copy()
for t in range(T):
Ht = H_seq[t]
yt = y_seq[t]
Sigmat = 0.5 * (Sigma_seq[t] + Sigma_seq[t].T)
at = mtm1
Rt = Ctm1 + Q
Rt = 0.5 * (Rt + Rt.T)
St = Ht @ Rt @ Ht.T + Sigmat
St = 0.5 * (St + St.T)
try:
Kt = Rt @ Ht.T @ la.inv(St)
except la.LinAlgError:
Kt = Rt @ Ht.T @ la.inv(St + 1e-10 * np.eye(k))
vt = yt - Ht @ at
mt = at + Kt @ vt
Ct = Rt - Kt @ Ht @ Rt
Ct = 0.5 * (Ct + Ct.T)
m_arr[t, :] = mt
C_arr[t, :, :] = Ct
a_arr[t, :] = at
R_arr[t, :, :] = Rt
mtm1, Ctm1 = mt, Ct
return m_arr, C_arr, a_arr, R_arr
def backward_sample_state(m_arr: np.ndarray, C_arr: np.ndarray,
a_arr: np.ndarray, R_arr: np.ndarray,
rng: np.random.Generator):
T, m = m_arr.shape
x = np.zeros((T, m))
x[T - 1, :] = rng.multivariate_normal(mean=m_arr[T - 1], cov=C_arr[T - 1])
for t in range(T - 2, -1, -1):
Ct = C_arr[t]
Rt1 = 0.5 * (R_arr[t + 1] + R_arr[t + 1].T)
at1 = a_arr[t + 1]
Jt = Ct @ la.inv(Rt1)
mean = m_arr[t] + Jt @ (x[t + 1] - at1)
cov = Ct - Jt @ Rt1 @ Jt.T
cov = 0.5 * (cov + cov.T)
x[t, :] = rng.multivariate_normal(mean=mean, cov=cov)
return x
# ==========================================
# FFBS: regression RW (scalar measurement)
# ==========================================
def ff_regression_rw(y: np.ndarray, Z_seq: List[np.ndarray], R_seq: np.ndarray,
Q: np.ndarray, m0: np.ndarray, P0: np.ndarray):
"""
y_t = Z_t' alpha_t + e_t, e_t ~ N(0, R_t)
alpha_t = alpha_{t-1} + w_t, w_t ~ N(0, Q)
"""
T = y.shape[0]
r = m0.shape[0]
m_arr = np.zeros((T, r))
C_arr = np.zeros((T, r, r))
a_arr = np.zeros((T, r))
R_arr = np.zeros((T, r, r))
mtm1 = m0.copy()
Ctm1 = P0.copy()
for t in range(T):
Zt = Z_seq[t].reshape(-1) # (r,)
Rt = Ctm1 + Q
Rt = 0.5 * (Rt + Rt.T)
St = float(Zt.T @ Rt @ Zt) + float(R_seq[t])
if St <= 1e-12:
St = 1e-12
Kt = (Rt @ Zt.reshape(-1, 1)) / St # (r,1)
vt = float(y[t] - Zt.T @ mtm1)
mt = mtm1 + (Kt.flatten() * vt)
Ct = Rt - (Kt @ Kt.T) * St
Ct = 0.5 * (Ct + Ct.T)
m_arr[t, :] = mt
C_arr[t, :, :] = Ct
a_arr[t, :] = mtm1
R_arr[t, :, :] = Rt
mtm1, Ctm1 = mt, Ct
return m_arr, C_arr, a_arr, R_arr
def backward_sample_regression(m_arr: np.ndarray, C_arr: np.ndarray,
a_arr: np.ndarray, R_arr: np.ndarray,
rng: np.random.Generator):
T, r = m_arr.shape
alpha = np.zeros((T, r))
alpha[T - 1, :] = rng.multivariate_normal(mean=m_arr[T - 1], cov=C_arr[T - 1])
for t in range(T - 2, -1, -1):
Ct = C_arr[t]
Rt1 = 0.5 * (R_arr[t + 1] + R_arr[t + 1].T)
at1 = a_arr[t + 1]
Jt = Ct @ la.inv(Rt1)
mean = m_arr[t] + Jt @ (alpha[t + 1] - at1)
cov = Ct - Jt @ Rt1 @ Jt.T
cov = 0.5 * (cov + cov.T)
alpha[t, :] = rng.multivariate_normal(mean=mean, cov=cov)
return alpha
# ==========================================
# SV (log-variance) FFBS (Gaussian approx)
# ==========================================
def sv_gaussian_ffbs(logsq: np.ndarray, v_noise: float,
m0_h: float, P0_h: float,
sigma_h2: float, rng: np.random.Generator):
T = logsq.shape[0]
E_logchi = -1.2704
z = logsq - E_logchi
m_arr = np.zeros(T)
C_arr = np.zeros(T)
a_arr = np.zeros(T)
R_arr = np.zeros(T)
mtm1 = m0_h
Ctm1 = P0_h
for t in range(T):
at = mtm1
Rt = Ctm1 + sigma_h2
St = Rt + v_noise
Kt = Rt / St
vt = z[t] - at
mt = at + Kt * vt
Ct = (1.0 - Kt) * Rt
m_arr[t] = mt
C_arr[t] = max(Ct, 1e-12)
a_arr[t] = at
R_arr[t] = max(Rt, 1e-12)
mtm1, Ctm1 = mt, Ct
h = np.zeros(T)
h[T - 1] = rng.normal(loc=m_arr[T - 1], scale=np.sqrt(C_arr[T - 1]))
for t in range(T - 2, -1, -1):
Ct = C_arr[t]
Rt1 = R_arr[t + 1]
at1 = a_arr[t + 1]
Jt = Ct / Rt1
mean = m_arr[t] + Jt * (h[t + 1] - at1)
var = Ct - Jt * Rt1 * Jt
var = max(var, 1e-12)
h[t] = rng.normal(loc=mean, scale=np.sqrt(var))
return h
# ======================
# IRF & Stability
# ======================
def split_B(beta_t: np.ndarray, k: int, p: int, include_const: bool = True):
m_eq = k * p + (1 if include_const else 0)
B = beta_t.reshape(k, m_eq)
off = 1 if include_const else 0
A_list = []
for L in range(p):
cols = B[:, off + L * k : off + (L + 1) * k] # (k x k)
A_list.append(cols)
return A_list # lag matrices
def var_irf_matrices(A_list: List[np.ndarray], H: int):
k = A_list[0].shape[0]
p = len(A_list)
Psi = [np.eye(k)]
for h in range(1, H + 1):
acc = np.zeros((k, k))
for i in range(1, p + 1):
if h - i >= 0:
acc += A_list[i - 1] @ Psi[h - i]
Psi.append(acc)
return Psi
"""
def var_irf_recursive(A_list: List[np.ndarray], H: int):
k = A_list[0].shape[0]
p = len(A_list)
def compute_psi(h, memo):
if h in memo:
return memo[h]
if h == 0:
memo[0] = np.eye(k)
return memo[0]
acc = np.zeros((k, k))
for i in range(1, p + 1):
if h - i >= 0:
acc += A_list[i - 1] @ compute_psi(h - i, memo)
memo[h] = acc
return acc
memo = {}
Psi = [compute_psi(h, memo) for h in range(H + 1)]
return Psi
"""
def irf_structural(Psi_list: List[np.ndarray], A_t: np.ndarray, h_t: np.ndarray, shock_index: int):
# impact vector for structural shock j: v0 = A^{-1} * (sqrt(exp(h_j)) e_j)
k = A_t.shape[0]
A_t = np.tril(A_t, 0) # ensure lower-triangular
A_inv = la.inv(A_t)
d = np.zeros(k); d[shock_index] = np.sqrt(np.exp(h_t[shock_index]))
v0 = A_inv @ d
Hplus1 = len(Psi_list)
out = np.zeros((Hplus1, k))
for h in range(Hplus1):
out[h, :] = Psi_list[h] @ v0
return out
def irf_generalized(Psi_list: List[np.ndarray], Sigma_t: np.ndarray, shock_index: int):
k = Sigma_t.shape[0]
Sigma_t = 0.5 * (Sigma_t + Sigma_t.T)
ej = np.zeros(k); ej[shock_index] = 1.0
denom = np.sqrt(max(Sigma_t[shock_index, shock_index], 1e-12))
v = Sigma_t @ ej / denom
Hplus1 = len(Psi_list)
out = np.zeros((Hplus1, k))
for h in range(Hplus1):
out[h, :] = Psi_list[h] @ v
return out
def companion_matrix(A_list: List[np.ndarray]):
"""
Build VAR(p) companion matrix from lag matrices A1..Ap (k x k each).
"""
k = A_list[0].shape[0]
p = len(A_list)
F = np.zeros((k * p, k * p))
# top block
F[0:k, :] = np.hstack(A_list)
# lower block: identity shifts
if p > 1:
F[k:, 0:(k*(p-1))] = np.eye(k*(p-1))
return F
def stability_check(A_list: List[np.ndarray], tol: float = 1.0):
"""
Return spectral radius and stable flag (max |eig| < tol).
Default tol=1.0 (strict stationarity).
"""
F = companion_matrix(A_list)
eigvals = la.eigvals(F)
rad = np.max(np.abs(eigvals))
return float(rad), bool(rad < tol)
# === PATCH ADD: project VAR lags to (near) stationarity ======================
def project_to_stationary(A_list: List[np.ndarray], margin: float = 0.995):
"""
If the companion spectral radius rho >= 1, uniformly scale all lag matrices
by factor = margin / rho to push eigenvalues inside the unit circle.
Returns: (A_scaled_list, rho_before, changed_flag)
"""
F = companion_matrix(A_list)
eigvals = la.eigvals(F)
rho = float(np.max(np.abs(eigvals)))
if rho < margin:
return A_list, rho, False
factor = float(margin / rho)
A_scaled = [Ai * factor for Ai in A_list]
return A_scaled, rho, True
# ======================
# Structural TVP-SVAR with SV (MCMC)
# ======================
@dataclass
class TVPSVAR_SV_Config:
p: int
include_const: bool = True
iters: int = 3000
burn: int = 1500
thin: int = 3
seed: Optional[int] = 42
# Priors for beta RW
diffuse_scale_beta: float = 10.0
nu_Qbeta0: Optional[int] = None # default: m + 1
S_Qbeta0_scale: float = 0.01
# Priors for A_t RW (lower triangular off-diagonals): per coefficient IG(a,b)
ig_A_a0: float = 0.01
ig_A_b0: float = 0.01
P0_A_scale: float = 10.0 # diffuse P0 for A states
# Priors for SV h_t RW variance per series IG
h0_mean: float = 0.0
h0_var: float = 10.0
ig_h_a0: float = 0.01
ig_h_b0: float = 0.01
# IRF & stability
skip_unstable_draws: bool = True
stability_tol: float = 1.0 # |lambda|max < 1 considered stable
class TVPSVAR_SV:
def __init__(self, cfg: TVPSVAR_SV_Config):
self.cfg = cfg
self.fitted_ = False
def fit(self, Y: np.ndarray):
cfg = self.cfg
rng = np.random.default_rng(cfg.seed)
Y = np.asarray(Y, dtype=float)
T, k = Y.shape
X, Yp, lags = build_var_design(Y, cfg.p, cfg.include_const)
rows = X.shape[0]
m_eq = X.shape[1]
m = k * m_eq
H_seq = [kron_Ix(X[t, :], k) for t in range(rows)]
y_seq = [Yp[t, :] for t in range(rows)]
# OLS init for beta mean
B_hat, Sigma_ols = ols_multivariate(X, Yp)
b0 = B_hat.reshape(-1)
P0_beta = np.eye(m) * cfg.diffuse_scale_beta
# Priors for Q_beta
nu_Qbeta0 = cfg.nu_Qbeta0 if cfg.nu_Qbeta0 is not None else (m + 1)
S_Qbeta0 = np.eye(m) * cfg.S_Qbeta0_scale
Q_beta = np.eye(m) * cfg.S_Qbeta0_scale
# Initialize A_t paths: unit lower-triangular (off-diagonals start at 0)
A_path = np.zeros((rows, k, k))
for t in range(rows):
A_path[t, :, :] = np.eye(k)
# State variances for A coefficients (per row i has r=i-1 coefficients)
sigmaA2 = [np.ones(i) * 0.05 for i in range(k)] # i=0 empty
P0_A = [np.eye(i) * cfg.P0_A_scale if i > 0 else np.zeros((0, 0)) for i in range(k)]
# SV init
eps = 1e-12
resid0 = (Yp - X @ B_hat.T)
logsq0 = np.log(resid0**2 + eps)
E_logchi = -1.2704
v_noise = 4.9348
h_path = np.tile(logsq0.mean(axis=0) - E_logchi, (rows, 1)) # (rows x k)
sigma_h2 = np.ones(k) * 0.05 # RW variance init for h
# Storage controls
iters, burn, thin = cfg.iters, cfg.burn, cfg.thin
keep_idx = []
for it in range(iters):
if (it >= burn) and ((it - burn) % thin == 0):
keep_idx.append(it)
n_keep = len(keep_idx)
beta_draws = np.zeros((n_keep, rows, m))
Qbeta_draws = np.zeros((n_keep, m, m))
A_draws = np.zeros((n_keep, rows, k, k))
h_draws = np.zeros((n_keep, rows, k))
sigmah2_draws = np.zeros((n_keep, k))
sigmaA2_draws = [np.zeros((n_keep, len(sigmaA2[i]))) for i in range(k)]
keep_ptr = 0
for it in range(iters):
# Build Sigma_t from current A_path and h_path
Sigma_seq = []
for t in range(rows):
A_t = np.tril(A_path[t, :, :], 0)
D_t = np.diag(np.exp(h_path[t, :]))
A_inv = la.inv(A_t)
Sigma_t = A_inv @ D_t @ A_inv.T
Sigma_seq.append(Sigma_t)
# 1) Sample beta | A,h,Y
m_arr, C_arr, a_arr, R_arr = forward_filter_timevarying(y_seq, H_seq, Sigma_seq, Q_beta, b0, P0_beta)
beta_path = backward_sample_state(m_arr, C_arr, a_arr, R_arr, rng)
# 2) Sample A off-diagonals per row via regression-RW FFBS
# From A y = H beta + u -> u = A y - H beta
u_struct = np.zeros((rows, k))
for t in range(rows):
A_t = np.tril(A_path[t, :, :], 0)
u_struct[t, :] = A_t @ y_seq[t] - H_seq[t] @ beta_path[t, :]
for i in range(1, k):
# Measurement: y_i - (H beta)_i = Z_i' gamma_i + u_i, gamma_i = (-a_{i1},..,-a_{i,i-1})
y_meas = np.array([y_seq[t][i] - (H_seq[t][i, :] @ beta_path[t, :]) for t in range(rows)])
Z_seq = [y_seq[t][:i] for t in range(rows)] # y_1..y_{i-1}
R_seq_i = np.exp(h_path[:, i]) # Var(u_i) = exp(h_i)
r = i
Q_Ai = np.diag(sigmaA2[i])
m0_i = np.zeros(r)
P0_i = P0_A[i]
mA_arr, CA_arr, aA_arr, RA_arr = ff_regression_rw(y_meas, Z_seq, R_seq_i, Q_Ai, m0_i, P0_i)
gamma_i_path = backward_sample_regression(mA_arr, CA_arr, aA_arr, RA_arr, rng)
# Update A_path: a_{ij} = -gamma_{ij}
for t in range(rows):
A_path[t, i, :i] = -gamma_i_path[t, :]
# Sample sigmaA2[i] per coefficient via IG from RW increments
d_gamma = gamma_i_path[1:, :] - gamma_i_path[:-1, :]
for j in range(r):
shape = cfg.ig_A_a0 + 0.5 * (rows - 1)
scale = cfg.ig_A_b0 + 0.5 * np.dot(d_gamma[:, j], d_gamma[:, j])
sigmaA2[i][j] = sample_inv_gamma(shape, scale, rng)
# 3) Sample h via Gaussian approx FFBS using structural residuals u_struct
for i in range(k):
logsq_i = np.log(u_struct[:, i]**2 + 1e-12)
h_path[:, i] = sv_gaussian_ffbs(
logsq=logsq_i,
v_noise=v_noise,
m0_h=cfg.h0_mean,
P0_h=cfg.h0_var,
sigma_h2=sigma_h2[i],
rng=rng
)
# 4) Sample sigma_h2[i] per series via IG from RW increments of h_i
for i in range(k):
d_h = h_path[1:, i] - h_path[:-1, i]
shape = cfg.ig_h_a0 + 0.5 * (rows - 1)
scale = cfg.ig_h_b0 + 0.5 * np.dot(d_h, d_h)
sigma_h2[i] = sample_inv_gamma(shape, scale, rng)
# Optional: re-run h FFBS using refreshed sigma_h2 (for coherence)
for i in range(k):
logsq_i = np.log(u_struct[:, i]**2 + 1e-12)
h_path[:, i] = sv_gaussian_ffbs(
logsq=logsq_i, v_noise=v_noise,
m0_h=cfg.h0_mean, P0_h=cfg.h0_var,
sigma_h2=sigma_h2[i], rng=rng
)
# 5) Sample Q_beta via IW from RW increments of beta
d_beta = beta_path[1:, :] - beta_path[:-1, :]
S_Qbeta_post = S_Qbeta0 + d_beta.T @ d_beta
nu_Qbeta_post = nu_Qbeta0 + (rows - 1)
Q_beta = sample_invwishart(nu_Qbeta_post, S_Qbeta_post, rng)
# Save draws
if (it >= burn) and ((it - burn) % thin == 0):
beta_draws[keep_ptr, :, :] = beta_path
Qbeta_draws[keep_ptr, :, :] = Q_beta
A_draws[keep_ptr, :, :, :] = A_path
h_draws[keep_ptr, :, :] = h_path
sigmah2_draws[keep_ptr, :] = sigma_h2
for i in range(k):
if i > 0:
sigmaA2_draws[i][keep_ptr, :] = sigmaA2[i]
keep_ptr += 1
# Posterior means
beta_mean = beta_draws.mean(axis=0)
A_mean = A_draws.mean(axis=0)
h_mean = h_draws.mean(axis=0)
Qbeta_mean = Qbeta_draws.mean(axis=0)
# Reduced-form Sigma_t mean path
Sigma_t_mean = []
for t in range(rows):
A_t = np.tril(A_mean[t, :, :], 0)
D_t = np.diag(np.exp(h_mean[t, :]))
Sigma_t_mean.append(la.inv(A_t) @ D_t @ la.inv(A_t).T)
# Save
self.k_ = k
self.p_ = cfg.p
self.include_const_ = cfg.include_const
self.X_ = X
self.Yp_ = Yp
self.lags_ = lags
self.m_eq_ = m_eq
self.m_ = m
self.beta_draws_ = beta_draws
self.A_draws_ = A_draws
self.h_draws_ = h_draws
self.Qbeta_draws_ = Qbeta_draws
self.sigmah2_draws_ = sigmah2_draws
self.sigmaA2_draws_ = sigmaA2_draws
self.beta_mean_ = beta_mean
self.A_mean_ = A_mean
self.h_mean_ = h_mean
self.Qbeta_mean_ = Qbeta_mean
self.Sigma_t_mean_ = Sigma_t_mean
self.fitted_ = True
return {
"beta_draws": beta_draws,
"A_draws": A_draws,
"h_draws": h_draws,
"Qbeta_draws": Qbeta_draws,
"beta_mean": beta_mean,
"A_mean": A_mean,
"h_mean": h_mean,
"Qbeta_mean": Qbeta_mean,
}
def coef_path_mean(self):
assert self.fitted_, "Call fit() first."
rows, m = self.beta_mean_.shape
k, m_eq = self.k_, self.m_eq_
out = np.zeros((rows, k, m_eq))
for t in range(rows):
out[t, :, :] = self.beta_mean_[t, :].reshape(k, m_eq)
return out
def check_stability(self, time_index: int = None, use_mean: bool = True):
"""
Return spectral radius and stable flag at a time.
If use_mean=False, returns a list per draw for the selected time.
"""
assert self.fitted_, "Call fit() first."
t = self.beta_mean_.shape[0] - 1 if time_index is None else int(time_index)
k, p = self.k_, self.p_
if use_mean:
beta_t = self.beta_mean_[t, :]
A_list = split_B(beta_t, k, p, self.include_const_)
rad, stable = stability_check(A_list, tol=self.cfg.stability_tol)
return rad, stable
else:
beta_draws = self.beta_draws_
rads, flags = [], []
for j in range(beta_draws.shape[0]):
A_list = split_B(beta_draws[j, t, :], k, p, self.include_const_)
rad, st = stability_check(A_list, tol=self.cfg.stability_tol)
rads.append(rad); flags.append(st)
return np.array(rads), np.array(flags)
# === PATCHED TV-IRF with stability enforcement ===
def tvirf(self, time_index: int = None, H: int = 20, shocks="all",
identification: str = "structural", n_draws: int = 200,
enforce_stability: bool = False, proj_margin: float = 0.995,
rng: np.random.Generator = None):
"""
Time-varying IRFs at chosen time using posterior draws.
identification: 'structural' or 'generalized'
If enforce_stability=True, any unstable draw is projected to near-stationary
by uniformly shrinking VAR lag matrices so that spectral radius ≈ proj_margin.
Unstable draws can also be skipped if cfg.skip_unstable_draws=True.
"""
assert self.fitted_, "Call fit() first."
rng = np.random.default_rng() if rng is None else rng
beta_draws = self.beta_draws_
A_draws = self.A_draws_
h_draws = self.h_draws_
rows = beta_draws.shape[1]
k, p = self.k_, self.p_
include_const = self.include_const_
t = rows - 1 if time_index is None else int(time_index)
assert 0 <= t < rows, "time_index out of range."
n_avail = beta_draws.shape[0]
use = min(n_draws, n_avail)
idx = rng.choice(n_avail, size=use, replace=False)
shock_list = list(range(k)) if shocks == "all" else list(shocks)
sims = np.zeros((use, len(shock_list), H + 1, k))
valid = np.zeros(use, dtype=bool)
for ii, j in enumerate(idx):
beta_t = beta_draws[j, t, :]
A_t = np.tril(A_draws[j, t, :, :], 0)
h_t = h_draws[j, t, :]
A_list = split_B(beta_t, k, p, include_const)
# stability check (companion spectral radius)
rad, st = stability_check(A_list, tol=self.cfg.stability_tol)
if not st:
if enforce_stability:
A_list, rho0, projected = project_to_stationary(A_list, margin=proj_margin)
# re-check
rad2, st2 = stability_check(A_list, tol=self.cfg.stability_tol)
if not st2 and self.cfg.skip_unstable_draws:
continue
elif self.cfg.skip_unstable_draws:
continue
Psi = var_irf_matrices(A_list, H)
for s_idx, shock_j in enumerate(shock_list):
if identification.lower() == "structural":
irf = irf_structural(Psi, A_t, h_t, shock_j)
elif identification.lower() == "generalized":
D_t = np.diag(np.exp(h_t))
Sigma_t = la.inv(A_t) @ D_t @ la.inv(A_t).T
irf = irf_generalized(Psi, Sigma_t, shock_j)
else:
raise ValueError("identification must be 'structural' or 'generalized'")
sims[ii, s_idx, :, :] = irf
valid[ii] = True
sims = sims[valid]
if sims.shape[0] == 0:
raise RuntimeError(
"All draws were unstable (even after projection if enabled). "
"Try enforce_stability=True, increase proj_margin (e.g., 0.995), "
"relax stability_tol (e.g., 1.02), or set skip_unstable_draws=False."
)
irf_mean = sims.mean(axis=0)
irf_q05 = np.quantile(sims, 0.05, axis=0)
irf_q95 = np.quantile(sims, 0.95, axis=0)
return irf_mean, irf_q05, irf_q95
def forecast(self, h: int = 1, n_sim: int = 300, rng: Optional[np.random.Generator] = None):
"""
h-step ahead predictive simulation using posterior draws:
beta_{T+1} = beta_T + u_beta, u_beta ~ N(0, Q_beta)
gamma_{i,T+1} = gamma_{i,T} + w_A, w_A ~ N(0, diag(sigmaA2_i))
h_{i,T+1} = h_{i,T} + w_h, w_h ~ N(0, sigma_h2_i)
y_{T+1} = A_{T+1}^{-1} ( (B x)_{T+1} + u_struct ), u_struct ~ N(0, D_{T+1})
"""
assert self.fitted_, "Call fit() first."
rng = np.random.default_rng() if rng is None else rng
beta_draws = self.beta_draws_
A_draws = self.A_draws_
h_draws = self.h_draws_
Qbeta_draws = self.Qbeta_draws_
sigmah2_draws = self.sigmah2_draws_
sigmaA2_draws = self.sigmaA2_draws_
k, p = self.k_, self.p_
include_const = self.include_const_
n_avail = beta_draws.shape[0]
use = min(n_sim, n_avail)
idx = rng.choice(n_avail, size=use, replace=False)
sims = np.zeros((use, h, k))
for ii, j in enumerate(idx):
beta_T = beta_draws[j, -1, :]
A_T = np.tril(A_draws[j, -1, :, :], 0)
h_T = h_draws[j, -1, :]
Qbeta = Qbeta_draws[j, :, :]
sigma_h2 = sigmah2_draws[j, :]
sigmaA2 = [None]*k
for i in range(k):
if i == 0:
sigmaA2[i] = np.array([])
else:
sigmaA2[i] = sigmaA2_draws[i][j, :]
lags = [v.copy() for v in self.lags_]
for step in range(h):
parts = []
if include_const:
parts.append(np.array([1.0]))
for L in range(p):
parts.append(lags[L])
x = np.concatenate(parts, axis=0)
H = kron_Ix(x, k)
beta_T = rng.multivariate_normal(mean=beta_T, cov=Qbeta)
A_next = np.eye(k)
for i in range(1, k):
gamma_i = -A_T[i, :i]
gamma_i = gamma_i + rng.normal(0.0, np.sqrt(sigmaA2[i]), size=i)
A_next[i, :i] = -gamma_i
A_T = A_next
h_T = h_T + rng.normal(0.0, np.sqrt(sigma_h2), size=k)
D_T = np.diag(np.exp(h_T))
mu_struct = H @ beta_T
u = rng.multivariate_normal(mean=np.zeros(k), cov=D_T)
y = la.inv(A_T) @ (mu_struct + u)
sims[ii, step, :] = y
lags = [y] + lags[:p-1]
mean_fore = sims.mean(axis=0)
q05 = np.quantile(sims, 0.05, axis=0)
q95 = np.quantile(sims, 0.95, axis=0)
return mean_fore, {"5%": q05, "95%": q95}
def _main():
data_str = """ here is data
"""
df = pd.read_csv(StringIO(data_str), sep=r"\s+", header=None)
df.columns = ["RGDPGowthRate", "HouseOwnerRate", "MortgageRate"]
df_standardized = (df - df.mean()) / df.std()
Y = df_standardized.values
T, k = Y.shape
print(f"data load complete: {T} periods, {k} variable")
p = 2
# Fit structural TVP-SVAR-SV
cfg = TVPSVAR_SV_Config(
p=p, include_const=True,
iters=3000, burn=1500, thin=3, seed=42,
diffuse_scale_beta=10.0, S_Qbeta0_scale=0.01,
ig_A_a0=0.01, ig_A_b0=0.01,
h0_mean=0.0, h0_var=10.0, ig_h_a0=0.01, ig_h_b0=0.01,
skip_unstable_draws=True, stability_tol=1.02 # 少し緩める
)
model = TVPSVAR_SV(cfg)
post = model.fit(Y)
# Stability check at last period (posterior mean)
rad, stable = model.check_stability(time_index=None, use_mean=True)
print(f"Spectral radius (last): {rad:.3f}, stable={stable}")
# IRF (structural) at last period with stability enforcement
H = 20
irf_mean, irf_q05, irf_q95 = model.tvirf(
time_index=None, H=H, shocks=list(range(len(df.columns))),
identification="generalized", n_draws=300,
enforce_stability=True, proj_margin=0.995
)
# Plot IRF for each shock -> all variables
# vars_label = [f"y{i+1}" for i in range(k)]
vars_label = [df.columns[i] for i in range(len(df.columns))]
h_grid = np.arange(H+1)
for s in range(len(df.columns)):
plt.figure(figsize=(10,6))
for v in range(k):
m = irf_mean[s, :, v]
lo = irf_q05[s, :, v]
hi = irf_q95[s, :, v]
plt.plot(h_grid, m, label=f"{vars_label[v]} (mean)")
plt.fill_between(h_grid, lo, hi, alpha=0.2, label=f"{vars_label[v]} 90% band")
plt.axhline(0, color="black", linewidth=0.8)
plt.title(f"TV-IRF (structural) at last period, shock {s+1}")
plt.xlabel("horizon")
plt.legend()
plt.grid(True)
plt.show()
# Forecast
"""mean_fore, q = model.forecast(h=5, n_sim=300)
print("Predictive mean (1-step):", mean_fore[0])
print("Predictive 90% interval (1-step):")
print(" 5% :", q["5%"][0])
print(" 95% :", q["95%"][0])
x = range(1, 6) # step num(1〜5)
for var_index, label in enumerate(df.columns):
y_mean = [step[var_index] for step in mean_fore]
y_lower = [step[var_index] for step in q["5%"]]
y_upper = [step[var_index] for step in q["95%"]]
plt.figure(figsize=(8, 5))
plt.plot(x, y_mean, label="Predictive Mean", color="blue", marker='o')
plt.fill_between(x, y_lower, y_upper, color="lightblue", alpha=0.5, label="90% Prediction Interval")
plt.title(f"Forecast Results for {label} (h=5)")
plt.xlabel("Step")
plt.ylabel("Value")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()"""
if __name__ == "__main__":
_main()
実行環境
- Google CoLab: 実行に時間がかかるためGPUを利用
結果(インパルスレスポンス)
結論
- 実質GDP成長率の増加は持ち家率を高める
今後
- ラグ以外の外生変数をモデルに付加
- DSGEモデルのインパルスレスポンスと突き合わせ理論側の解釈を立証
参考
- Time-Varying Parameter VAR Model with Stochastic Volatility: An Overview of Methodology and Empirical Applications: Jouchi Nakajima
- Time-varying parameter (TVP) models



