0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

TVP-VARモデルによる持ち家率の簡素な分析

Last updated at Posted at 2025-11-26
TVP-VARモデル
  • Time-varying parameter Vector Autoregression
  • 経済構造や政策スタンスの時期による違いを吸収可能
  • 推定にはカルマンフィルタなど高度な手法が必要で計算負荷が高い
仮説
  • 好景気→持ち家率増加
データ(Federal Reserve Economic Data: アメリカ合衆国のものを利用 2025/11月21日時点取得)
統合データベース
左から実質GDP成長率、持ち家率、住宅ローン金利
1971-04-01	 0.54082	64.1	7.41
1971-07-01	 0.82245	64.4	7.66
1971-10-01	 0.23486	64.5	7.55
1972-01-01	 1.83722	64.3	7.35
1972-04-01	 2.26929	64.5	7.35
1972-07-01	 0.94409	64.3	7.41
1972-10-01	 1.67444	64.4	7.43
1973-01-01	 2.47494	64.9	7.45
1973-04-01	 1.08874	64.4	7.65
1973-07-01	-0.52584	64.4	8.46
1973-10-01	 0.94923	64.4	8.63
1974-01-01	-0.85971	64.8	8.46
1974-04-01	 0.23770	64.8	8.89
1974-07-01	-0.94528	64.6	9.61
1974-10-01	-0.38841	64.4	9.79
1975-01-01	-1.21763	64.4	9.17
1975-04-01	 0.71470	64.9	8.88
1975-07-01	 1.71204	64.6	8.98
1975-10-01	 1.34709	64.5	9.16
1976-01-01	 2.24947	64.6	8.87
1976-04-01	 0.73337	64.6	8.78
1976-07-01	 0.54741	64.9	8.97
1976-10-01	 0.72260	64.8	8.85
1977-01-01	 1.18623	64.8	8.69
1977-04-01	 1.94192	64.5	8.81
1977-07-01	 1.80316	65.0	8.93
1977-10-01	 0.00199	64.9	8.94
1978-01-01	 0.31984	64.8	9.13
1978-04-01	 3.86477	64.4	9.56
1978-07-01	 1.00575	65.2	9.76
1978-10-01	 1.34413	65.4	10.12
1979-01-01	 0.17959	64.8	10.41
1979-04-01	 0.10675	64.9	10.76
1979-07-01	 0.74280	65.8	11.16
1979-10-01	 0.25005	65.4	12.48
1980-01-01	 0.31457	65.5	13.68
1980-04-01	-2.06043	65.5	14.42
1980-07-01	-0.11885	65.8	12.64
1980-10-01	 1.86492	65.5	14.23
1981-01-01	 1.95942	65.6	15.13
1981-04-01	-0.74108	65.3	16.24
1981-07-01	 1.19750	65.6	17.38
1981-10-01	-1.08960	65.2	17.74
1982-01-01	-1.55364	64.8	17.41
1982-04-01	 0.45618	64.9	16.77
1982-07-01	-0.38227	64.9	16.22
1982-10-01	 0.04001	64.5	14.03
1983-01-01	 1.31779	64.7	13.03
1983-04-01	 2.27552	64.7	12.76
1983-07-01	 1.99895	64.8	13.64
1983-10-01	 2.08618	64.4	13.46
1984-01-01	 1.95479	64.6	13.33
1984-04-01	 1.72776	64.6	14.04
1984-07-01	 0.96401	64.6	14.49
1984-10-01	 0.82077	64.1	13.65
1985-01-01	 0.96898	64.1	13.06
1985-04-01	 0.88047	64.1	12.79
1985-07-01	 1.52727	63.9	12.14
1985-10-01	 0.74345	63.5	11.73
1986-01-01	 0.93375	63.6	10.58
1986-04-01	 0.45030	63.8	10.25
1986-07-01	 0.95667	63.8	10.24
1986-10-01	 0.53639	63.9	9.68
1987-01-01	 0.74184	63.8	9.11
1987-04-01	 1.07818	63.8	10.34
1987-07-01	 0.86718	64.2	10.48
1987-10-01	 1.71696	64.1	10.86
1988-01-01	 0.51688	63.7	10.07
1988-04-01	 1.31403	63.7	10.36
1988-07-01	 0.58597	64.0	10.50
1988-10-01	 1.33246	63.8	10.41
1989-01-01	 1.01639	63.9	10.82
1989-04-01	 0.76310	63.8	10.64
1989-07-01	 0.74074	64.1	10.01
1989-10-01	 0.19698	63.8	9.81
1990-01-01	 1.09288	64.0	10.13
1990-04-01	 0.36297	63.7	10.32
1990-07-01	 0.06658	64.0	10.10
1990-10-01	-0.91040	64.1	9.96
1991-01-01	-0.46794	63.9	9.50
1991-04-01	 0.77969	63.9	9.52
1991-07-01	 0.50536	64.2	9.27
1991-10-01	 0.34854	64.2	8.69
1992-01-01	 1.19725	64.0	8.69
1992-04-01	 1.08430	63.9	8.68
1992-07-01	 0.98811	64.3	8.02
1992-10-01	 1.04286	64.4	8.19
1993-01-01	 0.16694	64.2	7.72
1993-04-01	 0.58217	64.4	7.45
1993-07-01	 0.47715	64.7	7.09
1993-10-01	 1.36015	64.6	7.05
1994-01-01	 0.97036	63.8	7.30
1994-04-01	 1.35518	63.8	8.43
1994-07-01	 0.58458	64.1	8.59
1994-10-01	 1.14557	64.2	9.11
1995-01-01	 0.35478	64.2	8.79
1995-04-01	 0.29832	64.7	7.92
1995-07-01	 0.85073	65.0	7.70
1995-10-01	 0.67908	65.1	7.34
1996-01-01	 0.74905	65.1	7.27
1996-04-01	 1.66794	65.4	8.10
1996-07-01	 0.89703	65.6	8.15
1996-10-01	 1.03837	65.4	7.70
1997-01-01	 0.64544	65.4	7.79
1997-04-01	 1.66522	65.7	7.93
1997-07-01	 1.24857	66.0	7.47
1997-10-01	 0.85386	65.7	7.21
1998-01-01	 1.00375	65.9	7.05
1998-04-01	 0.92569	66.0	7.10
1998-07-01	 1.25894	66.8	6.87
1998-10-01	 1.60920	66.4	6.76
1999-01-01	 0.93943	66.7	6.88
1999-04-01	 0.83474	66.6	7.18
1999-07-01	 1.32588	67.0	7.79
1999-10-01	 1.64067	66.9	7.84
2000-01-01	 0.36279	67.1	8.26
2000-04-01	 1.82129	67.2	8.32
2000-07-01	 0.10193	67.7	8.02
2000-10-01	 0.59704	67.5	7.62
2001-01-01	-0.32780	67.5	7.01
2001-04-01	 0.62450	67.7	7.13
2001-07-01	-0.40064	68.1	6.96
2001-10-01	 0.27478	68.0	6.77
2002-01-01	 0.83636	67.8	6.97
2002-04-01	 0.61271	67.6	6.81
2002-07-01	 0.40651	68.0	6.29
2002-10-01	 0.12361	68.3	6.08
2003-01-01	 0.52664	68.0	5.84
2003-04-01	 0.88562	68.0	5.51
2003-07-01	 1.66320	68.4	6.03
2003-10-01	 1.16072	68.6	5.92
2004-01-01	 0.56665	68.6	5.60
2004-04-01	 0.77495	69.2	6.11
2004-07-01	 0.94859	69.0	5.89
2004-10-01	 1.02021	69.2	5.74
2005-01-01	 1.10936	69.1	5.77
2005-04-01	 0.49262	68.6	5.71
2005-07-01	 0.78381	68.8	5.76
2005-10-01	 0.55552	69.0	6.23
2006-01-01	 1.34538	68.5	6.25
2006-04-01	 0.25875	68.7	6.60
2006-07-01	 0.14996	69.0	6.56
2006-10-01	 0.85945	68.9	6.24
2007-01-01	 0.30084	68.4	6.21
2007-04-01	 0.61176	68.2	6.36
2007-07-01	 0.57603	68.2	6.55
2007-10-01	 0.62824	67.8	6.22
2008-01-01	-0.42676	67.8	5.87
2008-04-01	 0.59543	68.1	6.09
2008-07-01	-0.52526	67.9	6.32
2008-10-01	-2.18903	67.5	5.84
2009-01-01	-1.13487	67.3	5.06
2009-04-01	-0.17865	67.4	5.01
2009-07-01	 0.35119	67.6	5.16
2009-10-01	 1.08091	67.2	4.92
2010-01-01	 0.48450	67.1	5.00
2010-04-01	 0.96759	66.9	4.92
2010-07-01	 0.77108	66.9	4.45
2010-10-01	 0.52511	66.5	4.44
2011-01-01	-0.23720	66.4	4.85
2011-04-01	 0.67658	65.9	4.65
2011-07-01	-0.02231	66.3	4.29
2011-10-01	 1.12305	66.0	4.00
2012-01-01	 0.83859	65.4	3.92
2012-04-01	 0.44633	65.5	3.79
2012-07-01	 0.14403	65.5	3.55
2012-10-01	 0.11564	65.4	3.36
2013-01-01	 0.98656	65.0	3.50
2013-04-01	 0.26764	65.0	3.67
2013-07-01	 0.85139	65.3	4.44
2013-10-01	 0.87174	65.2	4.29
2014-01-01	-0.34510	64.8	4.36
2014-04-01	 1.29184	64.7	4.23
2014-07-01	 1.21539	64.4	4.14
2014-10-01	 0.50573	64.0	3.96
2015-01-01	 0.90048	63.7	3.72
2015-04-01	 0.61941	63.4	3.82
2015-07-01	 0.40025	63.7	3.95
2015-10-01	 0.18448	63.8	3.90
2016-01-01	 0.57952	63.5	3.74
2016-04-01	 0.32112	62.9	3.59
2016-07-01	 0.70939	63.5	3.45
2016-10-01	 0.55430	63.7	3.84
2017-01-01	 0.48689	63.6	4.17
2017-04-01	 0.55987	63.7	3.98
2017-07-01	 0.78852	63.9	3.88
2017-10-01	 1.12705	64.2	3.92
2018-01-01	 0.81341	64.2	4.28
2018-04-01	 0.53083	64.3	4.54
2018-07-01	 0.62370	64.4	4.57
2018-10-01	 0.14164	64.8	4.78
2019-01-01	 0.62432	64.2	4.37
2019-04-01	 0.83515	64.1	4.01
2019-07-01	 1.17000	64.8	3.66
2019-10-01	 0.68188	65.1	3.70
2020-01-01	-1.31632	65.3	3.52
2020-04-01	-7.87678	67.9	3.24
2020-07-01	 7.76228	67.4	2.95
2020-10-01	 1.13352	65.8	2.76
2021-01-01	 1.39582	65.6	2.88
2021-04-01	 1.70189	65.4	3.00
2021-07-01	 0.82505	65.4	2.87
2021-10-01	 1.71576	65.5	3.08
2022-01-01	-0.25480	65.4	3.82
2022-04-01	 0.15655	65.8	5.27
2022-07-01	 0.72190	66.0	5.62
2022-10-01	 0.69024	65.9	6.66
2023-01-01	 0.72385	66.0	6.37
2023-04-01	 0.62787	65.9	6.51
2023-07-01	 1.15361	66.0	7.04
2023-10-01	 0.84406	65.7	7.30
2024-01-01	 0.20986	65.6	6.75
2024-04-01	 0.88549	65.6	7.00
2024-07-01	 0.82478	65.6	6.51
2024-10-01	 0.45987	65.7	6.63
2025-01-01	-0.16252	65.1	6.83
2025-04-01	 0.94600	65.0	6.79
コード
python
"""
Structural TVP-SVAR with Stochastic Volatility (SV) and Stability Check + TV-IRF
- Model: A_t y_t = H_t beta_t(=B_t x_t) + u_t,  u_t ~ N(0, D_t=diag(exp(h_t)))
         Reduced-form: y_t = H_t(r) beta_t + eps_t, eps_t ~ N(0, Sigma_t=A_t^{-1} D_t A_t^{-T})
- Gibbs:
    * beta_{1:T} via Carter-Kohn FFBS with time-varying Sigma_t
    * A_t (lower-triangular off-diagonals) via regression-RW FFBS per row
    * h_{i,t} via Gaussian-approx FFBS on log(u_{i,t}^2)
    * Q_beta via Inverse-Wishart;  sigma^2_{A,ij}, sigma^2_{h,i} via Inverse-Gamma
- IRF: Structural (using A_t and D_t) and Generalized (Pesaran-Shin)
- Stability: Companion matrix spectral radius check (+ projection to stationarity option)
"""

import numpy as np
import scipy.linalg as la
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict
import matplotlib.pyplot as plt
import pandas as pd
from io import StringIO
import time

# ==========
# Utilities
# ==========

def build_var_design(Y: np.ndarray, p: int, include_const: bool = True):
    Y = np.asarray(Y, dtype=float)
    T, k = Y.shape
    assert T > p, "Not enough observations for the chosen lag p."
    rows = T - p
    m_eq = (k * p) + (1 if include_const else 0)

    X = np.zeros((rows, m_eq))
    # beta (m_eq, k)
    Yp = np.zeros((rows, k))
    for t in range(p, T):
        parts = []
        if include_const:
            parts.append(np.array([1.0]))
        for L in range(1, p + 1):
            parts.append(np.asarray(Y[t - L, :]).ravel())
        x = np.concatenate(parts, axis=0)
        # x = [1, y(GDP)(p-1) y(inflation)(p-1), ... y(GDP)(0) y(inflation)(0)]
        # x = [1, y(GDP)(p)   y(inflation)(p),   ... y(GDP)(1) y(inflation)(1)]
        X[t - p, :] = x
        Yp[t - p, :] = Y[t, :]

    # Used in forecast
    lags = [Y[T - L, :].copy() for L in range(1, p + 1)]
    return X, Yp, lags

def kron_Ix(x: np.ndarray, k: int):
    # 1           x.reshape(1, -1)
    #    1                        x.reshape(1, -1)
    #        1                                    x.reshape(1, -1)
    # x.reshape(1, -1) is kp + 1
    return np.kron(np.eye(k), x.reshape(1, -1))

def ols_multivariate(X: np.ndarray, Y: np.ndarray):
    # [y(GDP)p     y(inflation)p    ] = [1, y(GDP)(p-1) y(inflation)(p-1), ... y(GDP)(0) y(inflation)(0)]   [beta(1, 0) beta(2, 0)] 
    # [y(GDP)p + 1 y(inflation)p + 1] = [1, y(GDP)(p)   y(inflation)(p),   ... y(GDP)(1) y(inflation)(1)] × [beta(1, 1) beta(2, 1)]                             
    #                                                                                                                  ・
    #                                                                                                       [beta(1, p) beta(2, p)]  
    XtX = X.T @ X
    XtY = X.T @ Y
    XtX_inv = la.pinvh(XtX)
    B = XtX_inv @ XtY
    B_hat = B.T
    E = Y - X @ B
    rows, k = Y.shape
    dof = max(rows - X.shape[1], 1)
    Sigma = (E.T @ E) / dof
    Sigma = 0.5 * (Sigma + Sigma.T)
    return B_hat, Sigma

def sample_invwishart(nu: int, S: np.ndarray, rng: np.random.Generator):
    # S (p, p)
    p = S.shape[0]
    assert nu > p - 1, "df too small for IW."
    Sinv = la.inv(S)
    Sinv = 0.5 * (Sinv + Sinv.T)
    # Cholesky decomposition is a method for decomposing a symmetric matrix 
    # into the product of a lower triangular matrix L and its transpose.
    try:
        L = la.cholesky(Sinv, lower=True)
    except la.LinAlgError:
        L = la.cholesky(Sinv + 1e-10 * np.eye(p), lower=True)
    Z = rng.standard_normal(size=(p, nu))
    W = L @ (Z @ Z.T) @ L.T
    IW = la.inv(W)
    IW = 0.5 * (IW + IW.T)
    return IW

def sample_inv_gamma(shape: float, scale: float, rng: np.random.Generator):
    g = rng.gamma(shape, 1.0 / max(scale, 1e-12))
    return 1.0 / max(g, 1e-12)

# ==========================================
# FFBS: beta with time-varying Sigma_t
# ==========================================

def forward_filter_timevarying(y_seq: List[np.ndarray], H_seq: List[np.ndarray],
                               Sigma_seq: List[np.ndarray], Q: np.ndarray,
                               b0: np.ndarray, P0: np.ndarray):
    T = len(y_seq)
    m = b0.shape[0]
    k = y_seq[0].shape[0]

    m_arr = np.zeros((T, m))
    C_arr = np.zeros((T, m, m))
    a_arr = np.zeros((T, m))
    R_arr = np.zeros((T, m, m))

    mtm1 = b0.copy()
    Ctm1 = P0.copy()

    for t in range(T):
        Ht = H_seq[t]
        yt = y_seq[t]
        Sigmat = 0.5 * (Sigma_seq[t] + Sigma_seq[t].T)

        at = mtm1
        Rt = Ctm1 + Q
        Rt = 0.5 * (Rt + Rt.T)

        St = Ht @ Rt @ Ht.T + Sigmat
        St = 0.5 * (St + St.T)
        try:
            Kt = Rt @ Ht.T @ la.inv(St)
        except la.LinAlgError:
            Kt = Rt @ Ht.T @ la.inv(St + 1e-10 * np.eye(k))
        vt = yt - Ht @ at
        mt = at + Kt @ vt
        Ct = Rt - Kt @ Ht @ Rt
        Ct = 0.5 * (Ct + Ct.T)

        m_arr[t, :] = mt
        C_arr[t, :, :] = Ct
        a_arr[t, :] = at
        R_arr[t, :, :] = Rt

        mtm1, Ctm1 = mt, Ct

    return m_arr, C_arr, a_arr, R_arr

def backward_sample_state(m_arr: np.ndarray, C_arr: np.ndarray,
                          a_arr: np.ndarray, R_arr: np.ndarray,
                          rng: np.random.Generator):
    T, m = m_arr.shape
    x = np.zeros((T, m))
    x[T - 1, :] = rng.multivariate_normal(mean=m_arr[T - 1], cov=C_arr[T - 1])
    for t in range(T - 2, -1, -1):
        Ct = C_arr[t]
        Rt1 = 0.5 * (R_arr[t + 1] + R_arr[t + 1].T)
        at1 = a_arr[t + 1]
        Jt = Ct @ la.inv(Rt1)
        mean = m_arr[t] + Jt @ (x[t + 1] - at1)
        cov = Ct - Jt @ Rt1 @ Jt.T
        cov = 0.5 * (cov + cov.T)
        x[t, :] = rng.multivariate_normal(mean=mean, cov=cov)
    return x

# ==========================================
# FFBS: regression RW (scalar measurement)
# ==========================================

def ff_regression_rw(y: np.ndarray, Z_seq: List[np.ndarray], R_seq: np.ndarray,
                     Q: np.ndarray, m0: np.ndarray, P0: np.ndarray):
    """
    y_t = Z_t' alpha_t + e_t,  e_t ~ N(0, R_t)
    alpha_t = alpha_{t-1} + w_t, w_t ~ N(0, Q)
    """
    T = y.shape[0]
    r = m0.shape[0]
    m_arr = np.zeros((T, r))
    C_arr = np.zeros((T, r, r))
    a_arr = np.zeros((T, r))
    R_arr = np.zeros((T, r, r))

    mtm1 = m0.copy()
    Ctm1 = P0.copy()

    for t in range(T):
        Zt = Z_seq[t].reshape(-1)          # (r,)
        Rt = Ctm1 + Q
        Rt = 0.5 * (Rt + Rt.T)

        St = float(Zt.T @ Rt @ Zt) + float(R_seq[t])
        if St <= 1e-12:
            St = 1e-12
        Kt = (Rt @ Zt.reshape(-1, 1)) / St  # (r,1)
        vt = float(y[t] - Zt.T @ mtm1)
        mt = mtm1 + (Kt.flatten() * vt)
        Ct = Rt - (Kt @ Kt.T) * St
        Ct = 0.5 * (Ct + Ct.T)

        m_arr[t, :] = mt
        C_arr[t, :, :] = Ct
        a_arr[t, :] = mtm1
        R_arr[t, :, :] = Rt

        mtm1, Ctm1 = mt, Ct

    return m_arr, C_arr, a_arr, R_arr

def backward_sample_regression(m_arr: np.ndarray, C_arr: np.ndarray,
                               a_arr: np.ndarray, R_arr: np.ndarray,
                               rng: np.random.Generator):
    T, r = m_arr.shape
    alpha = np.zeros((T, r))
    alpha[T - 1, :] = rng.multivariate_normal(mean=m_arr[T - 1], cov=C_arr[T - 1])
    for t in range(T - 2, -1, -1):
        Ct = C_arr[t]
        Rt1 = 0.5 * (R_arr[t + 1] + R_arr[t + 1].T)
        at1 = a_arr[t + 1]
        Jt = Ct @ la.inv(Rt1)
        mean = m_arr[t] + Jt @ (alpha[t + 1] - at1)
        cov = Ct - Jt @ Rt1 @ Jt.T
        cov = 0.5 * (cov + cov.T)
        alpha[t, :] = rng.multivariate_normal(mean=mean, cov=cov)
    return alpha

# ==========================================
# SV (log-variance) FFBS (Gaussian approx)
# ==========================================

def sv_gaussian_ffbs(logsq: np.ndarray, v_noise: float,
                     m0_h: float, P0_h: float,
                     sigma_h2: float, rng: np.random.Generator):
    T = logsq.shape[0]
    E_logchi = -1.2704
    z = logsq - E_logchi

    m_arr = np.zeros(T)
    C_arr = np.zeros(T)
    a_arr = np.zeros(T)
    R_arr = np.zeros(T)

    mtm1 = m0_h
    Ctm1 = P0_h

    for t in range(T):
        at = mtm1
        Rt = Ctm1 + sigma_h2
        St = Rt + v_noise
        Kt = Rt / St
        vt = z[t] - at
        mt = at + Kt * vt
        Ct = (1.0 - Kt) * Rt

        m_arr[t] = mt
        C_arr[t] = max(Ct, 1e-12)
        a_arr[t] = at
        R_arr[t] = max(Rt, 1e-12)

        mtm1, Ctm1 = mt, Ct

    h = np.zeros(T)
    h[T - 1] = rng.normal(loc=m_arr[T - 1], scale=np.sqrt(C_arr[T - 1]))
    for t in range(T - 2, -1, -1):
        Ct = C_arr[t]
        Rt1 = R_arr[t + 1]
        at1 = a_arr[t + 1]
        Jt = Ct / Rt1
        mean = m_arr[t] + Jt * (h[t + 1] - at1)
        var = Ct - Jt * Rt1 * Jt
        var = max(var, 1e-12)
        h[t] = rng.normal(loc=mean, scale=np.sqrt(var))
    return h

# ======================
# IRF & Stability
# ======================

def split_B(beta_t: np.ndarray, k: int, p: int, include_const: bool = True):
    m_eq = k * p + (1 if include_const else 0)
    B = beta_t.reshape(k, m_eq)
    off = 1 if include_const else 0
    A_list = []
    for L in range(p):
        cols = B[:, off + L * k : off + (L + 1) * k]  # (k x k)
        A_list.append(cols)
    return A_list  # lag matrices

def var_irf_matrices(A_list: List[np.ndarray], H: int):
    k = A_list[0].shape[0]
    p = len(A_list)
    Psi = [np.eye(k)]
    for h in range(1, H + 1):
        acc = np.zeros((k, k))
        for i in range(1, p + 1):
            if h - i >= 0:
                acc += A_list[i - 1] @ Psi[h - i]
        Psi.append(acc)
    return Psi

"""
def var_irf_recursive(A_list: List[np.ndarray], H: int):
    k = A_list[0].shape[0]
    p = len(A_list)

    def compute_psi(h, memo):
        if h in memo:
            return memo[h]
        if h == 0:
            memo[0] = np.eye(k)
            return memo[0]

        acc = np.zeros((k, k))
        for i in range(1, p + 1):
            if h - i >= 0:
                acc += A_list[i - 1] @ compute_psi(h - i, memo)
        memo[h] = acc
        return acc

    memo = {}
    Psi = [compute_psi(h, memo) for h in range(H + 1)]
    return Psi
"""

def irf_structural(Psi_list: List[np.ndarray], A_t: np.ndarray, h_t: np.ndarray, shock_index: int):
    # impact vector for structural shock j: v0 = A^{-1} * (sqrt(exp(h_j)) e_j)
    k = A_t.shape[0]
    A_t = np.tril(A_t, 0)  # ensure lower-triangular
    A_inv = la.inv(A_t)
    d = np.zeros(k); d[shock_index] = np.sqrt(np.exp(h_t[shock_index]))
    v0 = A_inv @ d
    Hplus1 = len(Psi_list)
    out = np.zeros((Hplus1, k))
    for h in range(Hplus1):
        out[h, :] = Psi_list[h] @ v0
    return out

def irf_generalized(Psi_list: List[np.ndarray], Sigma_t: np.ndarray, shock_index: int):
    k = Sigma_t.shape[0]
    Sigma_t = 0.5 * (Sigma_t + Sigma_t.T)
    ej = np.zeros(k); ej[shock_index] = 1.0
    denom = np.sqrt(max(Sigma_t[shock_index, shock_index], 1e-12))
    v = Sigma_t @ ej / denom
    Hplus1 = len(Psi_list)
    out = np.zeros((Hplus1, k))
    for h in range(Hplus1):
        out[h, :] = Psi_list[h] @ v
    return out

def companion_matrix(A_list: List[np.ndarray]):
    """
    Build VAR(p) companion matrix from lag matrices A1..Ap (k x k each).
    """
    k = A_list[0].shape[0]
    p = len(A_list)
    F = np.zeros((k * p, k * p))
    # top block
    F[0:k, :] = np.hstack(A_list)
    # lower block: identity shifts
    if p > 1:
        F[k:, 0:(k*(p-1))] = np.eye(k*(p-1))
    return F

def stability_check(A_list: List[np.ndarray], tol: float = 1.0):
    """
    Return spectral radius and stable flag (max |eig| < tol).
    Default tol=1.0 (strict stationarity).
    """
    F = companion_matrix(A_list)
    eigvals = la.eigvals(F)
    rad = np.max(np.abs(eigvals))
    return float(rad), bool(rad < tol)

# === PATCH ADD: project VAR lags to (near) stationarity ======================
def project_to_stationary(A_list: List[np.ndarray], margin: float = 0.995):
    """
    If the companion spectral radius rho >= 1, uniformly scale all lag matrices
    by factor = margin / rho to push eigenvalues inside the unit circle.
    Returns: (A_scaled_list, rho_before, changed_flag)
    """
    F = companion_matrix(A_list)
    eigvals = la.eigvals(F)
    rho = float(np.max(np.abs(eigvals)))
    if rho < margin:
        return A_list, rho, False
    factor = float(margin / rho)
    A_scaled = [Ai * factor for Ai in A_list]
    return A_scaled, rho, True

# ======================
# Structural TVP-SVAR with SV (MCMC)
# ======================

@dataclass
class TVPSVAR_SV_Config:
    p: int
    include_const: bool = True
    iters: int = 3000
    burn: int = 1500
    thin: int = 3
    seed: Optional[int] = 42

    # Priors for beta RW
    diffuse_scale_beta: float = 10.0
    nu_Qbeta0: Optional[int] = None   # default: m + 1
    S_Qbeta0_scale: float = 0.01

    # Priors for A_t RW (lower triangular off-diagonals): per coefficient IG(a,b)
    ig_A_a0: float = 0.01
    ig_A_b0: float = 0.01
    P0_A_scale: float = 10.0  # diffuse P0 for A states

    # Priors for SV h_t RW variance per series IG
    h0_mean: float = 0.0
    h0_var: float = 10.0
    ig_h_a0: float = 0.01
    ig_h_b0: float = 0.01

    # IRF & stability
    skip_unstable_draws: bool = True
    stability_tol: float = 1.0  # |lambda|max < 1 considered stable

class TVPSVAR_SV:
    def __init__(self, cfg: TVPSVAR_SV_Config):
        self.cfg = cfg
        self.fitted_ = False

    def fit(self, Y: np.ndarray):
        cfg = self.cfg
        rng = np.random.default_rng(cfg.seed)

        Y = np.asarray(Y, dtype=float)
        T, k = Y.shape
        X, Yp, lags = build_var_design(Y, cfg.p, cfg.include_const)
        rows = X.shape[0]
        m_eq = X.shape[1]
        m = k * m_eq

        H_seq = [kron_Ix(X[t, :], k) for t in range(rows)]
        y_seq = [Yp[t, :] for t in range(rows)]

        # OLS init for beta mean
        B_hat, Sigma_ols = ols_multivariate(X, Yp)
        b0 = B_hat.reshape(-1)
        P0_beta = np.eye(m) * cfg.diffuse_scale_beta

        # Priors for Q_beta
        nu_Qbeta0 = cfg.nu_Qbeta0 if cfg.nu_Qbeta0 is not None else (m + 1)
        S_Qbeta0 = np.eye(m) * cfg.S_Qbeta0_scale
        Q_beta = np.eye(m) * cfg.S_Qbeta0_scale

        # Initialize A_t paths: unit lower-triangular (off-diagonals start at 0)
        A_path = np.zeros((rows, k, k))
        for t in range(rows):
            A_path[t, :, :] = np.eye(k)

        # State variances for A coefficients (per row i has r=i-1 coefficients)
        sigmaA2 = [np.ones(i) * 0.05 for i in range(k)]  # i=0 empty
        P0_A = [np.eye(i) * cfg.P0_A_scale if i > 0 else np.zeros((0, 0)) for i in range(k)]

        # SV init
        eps = 1e-12
        resid0 = (Yp - X @ B_hat.T)
        logsq0 = np.log(resid0**2 + eps)
        E_logchi = -1.2704
        v_noise = 4.9348
        h_path = np.tile(logsq0.mean(axis=0) - E_logchi, (rows, 1))  # (rows x k)
        sigma_h2 = np.ones(k) * 0.05  # RW variance init for h

        # Storage controls
        iters, burn, thin = cfg.iters, cfg.burn, cfg.thin
        keep_idx = []
        for it in range(iters):
            if (it >= burn) and ((it - burn) % thin == 0):
                keep_idx.append(it)
        n_keep = len(keep_idx)

        beta_draws = np.zeros((n_keep, rows, m))
        Qbeta_draws = np.zeros((n_keep, m, m))
        A_draws = np.zeros((n_keep, rows, k, k))
        h_draws = np.zeros((n_keep, rows, k))
        sigmah2_draws = np.zeros((n_keep, k))
        sigmaA2_draws = [np.zeros((n_keep, len(sigmaA2[i]))) for i in range(k)]

        keep_ptr = 0

        for it in range(iters):
            # Build Sigma_t from current A_path and h_path
            Sigma_seq = []
            for t in range(rows):
                A_t = np.tril(A_path[t, :, :], 0)
                D_t = np.diag(np.exp(h_path[t, :]))
                A_inv = la.inv(A_t)
                Sigma_t = A_inv @ D_t @ A_inv.T
                Sigma_seq.append(Sigma_t)

            # 1) Sample beta | A,h,Y
            m_arr, C_arr, a_arr, R_arr = forward_filter_timevarying(y_seq, H_seq, Sigma_seq, Q_beta, b0, P0_beta)
            beta_path = backward_sample_state(m_arr, C_arr, a_arr, R_arr, rng)

            # 2) Sample A off-diagonals per row via regression-RW FFBS
            # From A y = H beta + u -> u = A y - H beta
            u_struct = np.zeros((rows, k))
            for t in range(rows):
                A_t = np.tril(A_path[t, :, :], 0)
                u_struct[t, :] = A_t @ y_seq[t] - H_seq[t] @ beta_path[t, :]

            for i in range(1, k):
                # Measurement: y_i - (H beta)_i = Z_i' gamma_i + u_i, gamma_i = (-a_{i1},..,-a_{i,i-1})
                y_meas = np.array([y_seq[t][i] - (H_seq[t][i, :] @ beta_path[t, :]) for t in range(rows)])
                Z_seq = [y_seq[t][:i] for t in range(rows)]  # y_1..y_{i-1}
                R_seq_i = np.exp(h_path[:, i])  # Var(u_i) = exp(h_i)

                r = i
                Q_Ai = np.diag(sigmaA2[i])
                m0_i = np.zeros(r)
                P0_i = P0_A[i]

                mA_arr, CA_arr, aA_arr, RA_arr = ff_regression_rw(y_meas, Z_seq, R_seq_i, Q_Ai, m0_i, P0_i)
                gamma_i_path = backward_sample_regression(mA_arr, CA_arr, aA_arr, RA_arr, rng)

                # Update A_path: a_{ij} = -gamma_{ij}
                for t in range(rows):
                    A_path[t, i, :i] = -gamma_i_path[t, :]

                # Sample sigmaA2[i] per coefficient via IG from RW increments
                d_gamma = gamma_i_path[1:, :] - gamma_i_path[:-1, :]
                for j in range(r):
                    shape = cfg.ig_A_a0 + 0.5 * (rows - 1)
                    scale = cfg.ig_A_b0 + 0.5 * np.dot(d_gamma[:, j], d_gamma[:, j])
                    sigmaA2[i][j] = sample_inv_gamma(shape, scale, rng)

            # 3) Sample h via Gaussian approx FFBS using structural residuals u_struct
            for i in range(k):
                logsq_i = np.log(u_struct[:, i]**2 + 1e-12)
                h_path[:, i] = sv_gaussian_ffbs(
                    logsq=logsq_i,
                    v_noise=v_noise,
                    m0_h=cfg.h0_mean,
                    P0_h=cfg.h0_var,
                    sigma_h2=sigma_h2[i],
                    rng=rng
                )

            # 4) Sample sigma_h2[i] per series via IG from RW increments of h_i
            for i in range(k):
                d_h = h_path[1:, i] - h_path[:-1, i]
                shape = cfg.ig_h_a0 + 0.5 * (rows - 1)
                scale = cfg.ig_h_b0 + 0.5 * np.dot(d_h, d_h)
                sigma_h2[i] = sample_inv_gamma(shape, scale, rng)

            # Optional: re-run h FFBS using refreshed sigma_h2 (for coherence)
            for i in range(k):
                logsq_i = np.log(u_struct[:, i]**2 + 1e-12)
                h_path[:, i] = sv_gaussian_ffbs(
                    logsq=logsq_i, v_noise=v_noise,
                    m0_h=cfg.h0_mean, P0_h=cfg.h0_var,
                    sigma_h2=sigma_h2[i], rng=rng
                )

            # 5) Sample Q_beta via IW from RW increments of beta
            d_beta = beta_path[1:, :] - beta_path[:-1, :]
            S_Qbeta_post = S_Qbeta0 + d_beta.T @ d_beta
            nu_Qbeta_post = nu_Qbeta0 + (rows - 1)
            Q_beta = sample_invwishart(nu_Qbeta_post, S_Qbeta_post, rng)

            # Save draws
            if (it >= burn) and ((it - burn) % thin == 0):
                beta_draws[keep_ptr, :, :] = beta_path
                Qbeta_draws[keep_ptr, :, :] = Q_beta
                A_draws[keep_ptr, :, :, :] = A_path
                h_draws[keep_ptr, :, :] = h_path
                sigmah2_draws[keep_ptr, :] = sigma_h2
                for i in range(k):
                    if i > 0:
                        sigmaA2_draws[i][keep_ptr, :] = sigmaA2[i]
                keep_ptr += 1

        # Posterior means
        beta_mean = beta_draws.mean(axis=0)
        A_mean = A_draws.mean(axis=0)
        h_mean = h_draws.mean(axis=0)
        Qbeta_mean = Qbeta_draws.mean(axis=0)

        # Reduced-form Sigma_t mean path
        Sigma_t_mean = []
        for t in range(rows):
            A_t = np.tril(A_mean[t, :, :], 0)
            D_t = np.diag(np.exp(h_mean[t, :]))
            Sigma_t_mean.append(la.inv(A_t) @ D_t @ la.inv(A_t).T)

        # Save
        self.k_ = k
        self.p_ = cfg.p
        self.include_const_ = cfg.include_const
        self.X_ = X
        self.Yp_ = Yp
        self.lags_ = lags
        self.m_eq_ = m_eq
        self.m_ = m

        self.beta_draws_ = beta_draws
        self.A_draws_ = A_draws
        self.h_draws_ = h_draws
        self.Qbeta_draws_ = Qbeta_draws
        self.sigmah2_draws_ = sigmah2_draws
        self.sigmaA2_draws_ = sigmaA2_draws

        self.beta_mean_ = beta_mean
        self.A_mean_ = A_mean
        self.h_mean_ = h_mean
        self.Qbeta_mean_ = Qbeta_mean
        self.Sigma_t_mean_ = Sigma_t_mean

        self.fitted_ = True

        return {
            "beta_draws": beta_draws,
            "A_draws": A_draws,
            "h_draws": h_draws,
            "Qbeta_draws": Qbeta_draws,
            "beta_mean": beta_mean,
            "A_mean": A_mean,
            "h_mean": h_mean,
            "Qbeta_mean": Qbeta_mean,
        }

    def coef_path_mean(self):
        assert self.fitted_, "Call fit() first."
        rows, m = self.beta_mean_.shape
        k, m_eq = self.k_, self.m_eq_
        out = np.zeros((rows, k, m_eq))
        for t in range(rows):
            out[t, :, :] = self.beta_mean_[t, :].reshape(k, m_eq)
        return out

    def check_stability(self, time_index: int = None, use_mean: bool = True):
        """
        Return spectral radius and stable flag at a time.
        If use_mean=False, returns a list per draw for the selected time.
        """
        assert self.fitted_, "Call fit() first."
        t = self.beta_mean_.shape[0] - 1 if time_index is None else int(time_index)
        k, p = self.k_, self.p_

        if use_mean:
            beta_t = self.beta_mean_[t, :]
            A_list = split_B(beta_t, k, p, self.include_const_)
            rad, stable = stability_check(A_list, tol=self.cfg.stability_tol)
            return rad, stable
        else:
            beta_draws = self.beta_draws_
            rads, flags = [], []
            for j in range(beta_draws.shape[0]):
                A_list = split_B(beta_draws[j, t, :], k, p, self.include_const_)
                rad, st = stability_check(A_list, tol=self.cfg.stability_tol)
                rads.append(rad); flags.append(st)
            return np.array(rads), np.array(flags)

    # === PATCHED TV-IRF with stability enforcement ===
    def tvirf(self, time_index: int = None, H: int = 20, shocks="all",
              identification: str = "structural", n_draws: int = 200,
              enforce_stability: bool = False, proj_margin: float = 0.995,
              rng: np.random.Generator = None):
        """
        Time-varying IRFs at chosen time using posterior draws.
        identification: 'structural' or 'generalized'
        If enforce_stability=True, any unstable draw is projected to near-stationary
        by uniformly shrinking VAR lag matrices so that spectral radius ≈ proj_margin.
        Unstable draws can also be skipped if cfg.skip_unstable_draws=True.
        """
        assert self.fitted_, "Call fit() first."
        rng = np.random.default_rng() if rng is None else rng

        beta_draws = self.beta_draws_
        A_draws = self.A_draws_
        h_draws = self.h_draws_
        rows = beta_draws.shape[1]
        k, p = self.k_, self.p_
        include_const = self.include_const_

        t = rows - 1 if time_index is None else int(time_index)
        assert 0 <= t < rows, "time_index out of range."

        n_avail = beta_draws.shape[0]
        use = min(n_draws, n_avail)
        idx = rng.choice(n_avail, size=use, replace=False)

        shock_list = list(range(k)) if shocks == "all" else list(shocks)
        sims = np.zeros((use, len(shock_list), H + 1, k))
        valid = np.zeros(use, dtype=bool)

        for ii, j in enumerate(idx):
            beta_t = beta_draws[j, t, :]
            A_t = np.tril(A_draws[j, t, :, :], 0)
            h_t = h_draws[j, t, :]
            A_list = split_B(beta_t, k, p, include_const)

            # stability check (companion spectral radius)
            rad, st = stability_check(A_list, tol=self.cfg.stability_tol)
            if not st:
                if enforce_stability:
                    A_list, rho0, projected = project_to_stationary(A_list, margin=proj_margin)
                    # re-check
                    rad2, st2 = stability_check(A_list, tol=self.cfg.stability_tol)
                    if not st2 and self.cfg.skip_unstable_draws:
                        continue
                elif self.cfg.skip_unstable_draws:
                    continue

            Psi = var_irf_matrices(A_list, H)
            for s_idx, shock_j in enumerate(shock_list):
                if identification.lower() == "structural":
                    irf = irf_structural(Psi, A_t, h_t, shock_j)
                elif identification.lower() == "generalized":
                    D_t = np.diag(np.exp(h_t))
                    Sigma_t = la.inv(A_t) @ D_t @ la.inv(A_t).T
                    irf = irf_generalized(Psi, Sigma_t, shock_j)
                else:
                    raise ValueError("identification must be 'structural' or 'generalized'")
                sims[ii, s_idx, :, :] = irf
            valid[ii] = True

        sims = sims[valid]
        if sims.shape[0] == 0:
            raise RuntimeError(
                "All draws were unstable (even after projection if enabled). "
                "Try enforce_stability=True, increase proj_margin (e.g., 0.995), "
                "relax stability_tol (e.g., 1.02), or set skip_unstable_draws=False."
            )

        irf_mean = sims.mean(axis=0)
        irf_q05 = np.quantile(sims, 0.05, axis=0)
        irf_q95 = np.quantile(sims, 0.95, axis=0)
        return irf_mean, irf_q05, irf_q95

    def forecast(self, h: int = 1, n_sim: int = 300, rng: Optional[np.random.Generator] = None):
        """
        h-step ahead predictive simulation using posterior draws:
          beta_{T+1} = beta_T + u_beta,  u_beta ~ N(0, Q_beta)
          gamma_{i,T+1} = gamma_{i,T} + w_A,  w_A ~ N(0, diag(sigmaA2_i))
          h_{i,T+1} = h_{i,T} + w_h,  w_h ~ N(0, sigma_h2_i)
          y_{T+1} = A_{T+1}^{-1} ( (B x)_{T+1} + u_struct ),  u_struct ~ N(0, D_{T+1})
        """
        assert self.fitted_, "Call fit() first."
        rng = np.random.default_rng() if rng is None else rng

        beta_draws = self.beta_draws_
        A_draws = self.A_draws_
        h_draws = self.h_draws_
        Qbeta_draws = self.Qbeta_draws_
        sigmah2_draws = self.sigmah2_draws_
        sigmaA2_draws = self.sigmaA2_draws_

        k, p = self.k_, self.p_
        include_const = self.include_const_

        n_avail = beta_draws.shape[0]
        use = min(n_sim, n_avail)
        idx = rng.choice(n_avail, size=use, replace=False)

        sims = np.zeros((use, h, k))

        for ii, j in enumerate(idx):
            beta_T = beta_draws[j, -1, :]
            A_T = np.tril(A_draws[j, -1, :, :], 0)
            h_T = h_draws[j, -1, :]
            Qbeta = Qbeta_draws[j, :, :]
            sigma_h2 = sigmah2_draws[j, :]
            sigmaA2 = [None]*k
            for i in range(k):
                if i == 0:
                    sigmaA2[i] = np.array([])
                else:
                    sigmaA2[i] = sigmaA2_draws[i][j, :]

            lags = [v.copy() for v in self.lags_]

            for step in range(h):
                parts = []
                if include_const:
                    parts.append(np.array([1.0]))
                for L in range(p):
                    parts.append(lags[L])
                x = np.concatenate(parts, axis=0)
                H = kron_Ix(x, k)

                beta_T = rng.multivariate_normal(mean=beta_T, cov=Qbeta)
                A_next = np.eye(k)
                for i in range(1, k):
                    gamma_i = -A_T[i, :i]
                    gamma_i = gamma_i + rng.normal(0.0, np.sqrt(sigmaA2[i]), size=i)
                    A_next[i, :i] = -gamma_i
                A_T = A_next

                h_T = h_T + rng.normal(0.0, np.sqrt(sigma_h2), size=k)
                D_T = np.diag(np.exp(h_T))

                mu_struct = H @ beta_T
                u = rng.multivariate_normal(mean=np.zeros(k), cov=D_T)
                y = la.inv(A_T) @ (mu_struct + u)
                sims[ii, step, :] = y

                lags = [y] + lags[:p-1]

        mean_fore = sims.mean(axis=0)
        q05 = np.quantile(sims, 0.05, axis=0)
        q95 = np.quantile(sims, 0.95, axis=0)
        return mean_fore, {"5%": q05, "95%": q95}

def _main():
    data_str = """ here is data
               """
    df = pd.read_csv(StringIO(data_str), sep=r"\s+", header=None)
    df.columns = ["RGDPGowthRate", "HouseOwnerRate", "MortgageRate"]
    df_standardized = (df - df.mean()) / df.std()
    Y = df_standardized.values
    T, k = Y.shape
    print(f"data load complete: {T} periods, {k} variable")
    p = 2
    # Fit structural TVP-SVAR-SV
    cfg = TVPSVAR_SV_Config(
        p=p, include_const=True,
        iters=3000, burn=1500, thin=3, seed=42,
        diffuse_scale_beta=10.0, S_Qbeta0_scale=0.01,
        ig_A_a0=0.01, ig_A_b0=0.01,
        h0_mean=0.0, h0_var=10.0, ig_h_a0=0.01, ig_h_b0=0.01,
        skip_unstable_draws=True, stability_tol=1.02  # 少し緩める
    )
    model = TVPSVAR_SV(cfg)
    post = model.fit(Y)

    # Stability check at last period (posterior mean)
    rad, stable = model.check_stability(time_index=None, use_mean=True)
    print(f"Spectral radius (last): {rad:.3f}, stable={stable}")

    # IRF (structural) at last period with stability enforcement
    H = 20
    irf_mean, irf_q05, irf_q95 = model.tvirf(
        time_index=None, H=H, shocks=list(range(len(df.columns))),
        identification="generalized", n_draws=300,
        enforce_stability=True, proj_margin=0.995
    )

    # Plot IRF for each shock -> all variables
    # vars_label = [f"y{i+1}" for i in range(k)]
    vars_label = [df.columns[i] for i in range(len(df.columns))]
    h_grid = np.arange(H+1)
    for s in range(len(df.columns)):
        plt.figure(figsize=(10,6))
        for v in range(k):
            m = irf_mean[s, :, v]
            lo = irf_q05[s, :, v]
            hi = irf_q95[s, :, v]
            plt.plot(h_grid, m, label=f"{vars_label[v]} (mean)")
            plt.fill_between(h_grid, lo, hi, alpha=0.2, label=f"{vars_label[v]} 90% band")
        plt.axhline(0, color="black", linewidth=0.8)
        plt.title(f"TV-IRF (structural) at last period, shock {s+1}")
        plt.xlabel("horizon")
        plt.legend()
        plt.grid(True)
        plt.show()

    # Forecast
    """mean_fore, q = model.forecast(h=5, n_sim=300)
    print("Predictive mean (1-step):", mean_fore[0])
    print("Predictive 90% interval (1-step):")
    print("  5% :", q["5%"][0])
    print(" 95% :", q["95%"][0])
    
    x = range(1, 6)  # step num(1〜5)

    for var_index, label in enumerate(df.columns):
        y_mean = [step[var_index] for step in mean_fore]
        y_lower = [step[var_index] for step in q["5%"]]
        y_upper = [step[var_index] for step in q["95%"]]

        plt.figure(figsize=(8, 5))
        plt.plot(x, y_mean, label="Predictive Mean", color="blue", marker='o')
        plt.fill_between(x, y_lower, y_upper, color="lightblue", alpha=0.5, label="90% Prediction Interval")
        plt.title(f"Forecast Results for {label} (h=5)")
        plt.xlabel("Step")
        plt.ylabel("Value")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()"""

if __name__ == "__main__":
    _main()
実行環境
結果(インパルスレスポンス)
  • 1標準偏差の実質GDP成長率構造ショックに対する反応
    image.png
    • GDP成長率の増加→所得成長率の増加→持ち家の購入→持ち家率の増加
結論
  • 実質GDP成長率の増加は持ち家率を高める

今後

  • ラグ以外の外生変数をモデルに付加
  • DSGEモデルのインパルスレスポンスと突き合わせ理論側の解釈を立証
参考
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?