23
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

多変量時系列データ予測モデルVARを使ってみた

Last updated at Posted at 2022-01-06

ARモデル

時系列データ予測で使われる$p+1$次の自己回帰モデル(AR Model)は以下のように与えられます。

x_{t+1} = a_0x_t+a_1x_{t-1}+a_2x_{t-2}+...+a_px_{t-p}+\epsilon_t

なので、$a_0,a_1,a_2,...,a_p$が決まれば、時刻$t+1$以降の時系列データは次のように過去の時系列データと新たに生成された未来の時系列を再帰的に使って順次求めることができます。

図1.png

$a_0,a_1,a_2,...,a_p$の求め方は、次の通り。($p+1=3$次の時)

x_{t+1} = a_0x_t+a_1x_{t-1}+a_2x_{t-2}+\epsilon_t\\
x_{t+2} = a_0x_{t+1}+a_1x_t+a_2x_{t-1}+\epsilon_t\\
x_{t+3} = a_0x_{t+2}+a_1x_{t+1}+a_2x_t+\epsilon_t\\

の両辺に$x_t$をかけて期待値$E[]$をとリます。

E[x_t,x_{t+1}] = a_0E[x_t,x_t]+a_1E[x_t,x_{t-1}]+a_2E[x_t,x_{t-2}]+E[x_t,\epsilon_t]\\
E[x_t,x_{t+2}] = a_0E[x_t,x_{t+1}]+a_1E[x_t,x_t]+a_2E[x_t,x_{t-1}]+E[x_t,\epsilon_t]\\
E[x_t,x_{t+3}] = a_0E[x_t,x_{t+2}]+a_1E[x_t,x_{t+1}]+a_2E[x_t,x_t]+E[x_t,\epsilon_t]\\

$\rho(1)=E[x_t,x_{t+1}]$はラグ1の自己相関と見做せ、また$E[x_t,\epsilon]=0$なので、係数$a_i$に関する次のような方程式が得られます。

\rho(1) = a_0\rho(0)+a_1\rho(1)+a_2\rho(2)\\
\rho(2) = a_0\rho(1)+a_1\rho(2)+a_2\rho(3)\\
\rho(3) = a_0\rho(2)+a_1\rho(1)+a_2\rho(0)\\

未知数3に対して方程式3ですので、未知数$a_i$を求めることができるようになりました。(Yule-Walker方程式)
時系列データ予測においては、このように係数$a_i$が決まれば元系列データから順次未来の時系列データを求めることができますので、訓練(train)データのみでモデルを作成し、未来のデータを予測することができるようになります。
(ARMAモデルの移動平均モデル(MA Model)部分の係数$b_i$はこのように簡単に求めることはできません。)

VARモデルで多変量時系列データ予測

このARモデルを多変量の時系列データに適用できるようにしたものが、VARモデル(vector AR model、ベクトル自己回帰モデル)で、多次元の時系列データ$X_T$は次のようにモデル化されます。

X_{t+1}=A_0X_t+A_1X_{t-1}+A_2X_{t-2}+...+\epsilon_t

これも一次元のARモデルと同様、以下のYule-Walker方程式によって係数$A_i$を求めることができます。

\Phi_k=\sum_{i=1}^{n}A_i\Phi_{k-i}

$\Phi_k$が自己相関行列となります。

このVARで仮想通貨価格予測をしてみました。データはNishikaさんのものを使います。

ライブラリインポート

import os
from datetime import datetime
import time
from sklearn.preprocessing import StandardScaler
import plotly.express as px
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import seaborn as sns
import math

import pandas as pd

データ読み込み

仮想通貨の種類は、ETH:イーサリアム、ZEC:ジーキャッシュ、LTC:ライトコイン、BTC:ビットコインで、価格は、始値(Open)、高値(High)、底値(Low)、終値(Close)が1分おきに与えられ、期間は、2019-03-19 00:00:00 〜 2021-03-18 23:59:00です。

df = pd.read_csv('data/train.csv')
close_log_df = df[['Date','ETHUSD_Close_log','ZECUSD_High_log','LTCUSD_Low_log','BTCUSD_Close_log']]
close_log_df.plot(kind='line',x=close_log_df.columns[0],y=close_log_df.columns[1:5],figsize=(20,5),grid=True)

close_log.png

ローソク足チャート

データを一部切り出し、ローソク足チャートを描かせてみます。

eth_df.set_index('Date',inplace=True)
def dur(start,end,data):
    df=data.loc[start : end]
    return df
eth2020_log= dur(start = '2020-04-30 05:00:00', end= '2020-04-30 06:00:00', data=eth_df)

# 対数データを元データに戻す
eth2020 = 10**eth2020_log[['ETHUSD_Open_log','ETHUSD_High_log','ETHUSD_Low_log','ETHUSD_Close_log']]

# ローソク足チャート
def c_chart(data,label):
    data.rename(columns={data.columns[0]:'Open'},inplace=True)
    data.rename(columns={data.columns[1]:'High'},inplace=True)
    data.rename(columns={data.columns[2]:'Low'},inplace=True)
    data.rename(columns={data.columns[3]:'Close'},inplace=True)
    #print(data.columns)
    candlestick = go.Figure(data = [go.Candlestick(x =data.index, 
                                               open = data[('Open')], 
                                               high = data[('High')], 
                                               low = data[('Low')], 
                                               close = data[('Close')])])
    
    candlestick.update_xaxes(title_text = 'Time',
                             rangeslider_visible = True)

    candlestick.update_layout(
    title = {
        'text': '{:} Candelstick Chart'.format(label),
        "y":0.8,
        "x":0.5,
        'xanchor': 'center',
        'yanchor': 'top'})

    candlestick.update_yaxes(title_text = 'Price in USD', ticksuffix = '$')
    return candlestick

eth_candle=c_chart(eth2020, label="ETH Price")
eth_candle.show()

newplot-3.png
コードはこちらを参照しました。

train test split

2021-01-01 00:00:00から2021-01-01 23:59:00までのデータを使って、2021-01-02 00:00:00から2021-01-02 00:15:00の15分のデータを予測することを考えます。

close_log_df = df[['Date','ETHUSD_Close_log','ZECUSD_Close_log','LTCUSD_Close_log','BTCUSD_Close_log']].set_index('Date')
close_log = dur(start = '2021-01-01 00:00:00', end= '2021-01-02 00:15:00', data=close_log_df)

n_split = 15
df_train , df_test = close_log[:-n_split] , close_log[-n_split:]

ここで、testデータはモデルの作成には不要ですが、予測結果との比較のために切り出します。

2つの時系列のグランジャー非因果性の4つの検定

グランジャー因果関係テストにより2変数間の相関を確認します。使う関数(ライブラリ)は以下の通りです。
statsmodels.tsa.stattools.grangercausalitytests(x, maxlag, addconst=True, verbose=True)
Parameters:
x(配列、2d)–2列目の時系列の時系列が最初の列の時系列を引き起こすかどうかをテストするためのデータ
maxlag(integer)–グレンジャー因果関係テスト結果は、maxlagまでのすべてのラグについて計算されます
verbose(bool)–trueの場合、結果を出力します
Returns:
結果 –すべてのテスト結果、辞書のキーはラグの数です。各ラグの値はタプルで、最初の要素はteststatistic、pvalues、自由度のディクショナリ、2番目の要素は制限されたモデル、制限されていないモデル、およびパラメーターの制限(コントラスト)行列のOLS推定結果です。
戻り値のタイプ
すべてのテスト結果(dictionary)

grangercausalitytestsのNull仮説は、2番目の列の時系列x2が、1番目の列の時系列x1をグランジャー原因にしないということです。グランジャー因果関係とは、過去のx2の値が、過去のx1の値を回帰子として考慮して、現在のx1の値に統計的に有意な影響を与えることを意味します。検定の希望サイズ以下のp値であれば、x2がグランジャー原因x1を引き起こさないという帰無仮説を棄却します。

4つの検定すべての帰無仮説は、第2時系列の過去値に対応する係数がゼロであることです。
'params_ftest'、'ssr_ftest0はF分布に基づいています。
'ssr_chi2test'、'lrtest'はカイ2乗分布に基づいています。

import numpy as np
from statsmodels.tsa.stattools import grangercausalitytests
maxlag=10
test = 'ssr_chi2test'

def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):    
    """Check Granger Causality of all possible combinations of the Time series.
    The rows are the response variable, columns are predictors. The values in the table 
    are the P-Values. P-Values lesser than the significance level (0.05), implies 
    the Null Hypothesis that the coefficients of the corresponding past values is 
    zero, that is, the X does not cause Y can be rejected.

    data      : pandas dataframe containing the time series variables
    variables : list containing names of the time series variables.
    """
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            # p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            p_values = [round(test_result[i+1][0][test][1],7) for i in range(maxlag)]
            # p_values = [test_result[i+1][0][test][1] for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(df_train, variables = df_train.columns)
ETHUSD_Close_log_x ZECUSD_Close_log_x LTCUSD_Close_log_x BTCUSD_Close_log_x
ETHUSD_Close_log_y 0.99999 0.0 0.000000 0.000000
ZECUSD_Close_log_y 0.00000 1.0 0.000000 0.000000
LTCUSD_Close_log_y 0.00000 0.0 0.999985 0.000000
BTCUSD_Close_log_y 0.00000 0.0 0.000000 0.999973

共和分検定

ヨハンセン(またはジョハンセン)検定による共和分検定。以下の関数(ライブラリ)を使います。
statsmodels.tsa.vector_ar.vecm.coint_johansen(endog,det_order,k_ar_diff)
Parameters:
endog-テストするデータ
det_order– -1:非決定項、0:定数項、1:線形トレンド
k_ar_diff:ラグ次数
Returns:
結果–すべてのテスト結果(dictionary)

from statsmodels.tsa.vector_ar.vecm import coint_johansen

def cointegration_test(df, alpha=0.05): 
    """Perform Johanson's Cointegration Test and Report Summary"""
    out = coint_johansen(df,-1,5)
    d = {'0.90':0, '0.95':1, '0.99':2}
    traces = out.lr1
    cvts = out.cvt[:, d[str(1-alpha)]]
    def adjust(val, length= 6): return str(val).ljust(length)

    # Summary
    print('Name   ::  Test Stat > C(95%)    =>   Signif  \n', '--'*20)
    for col, trace, cvt in zip(df.columns, traces, cvts):
        print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' =>  ' , trace > cvt)

cointegration_test(df_train)

# Name   ::  Test Stat > C(95%)    =>   Signif  
#  ----------------------------------------
# ETHUSD_Close_log ::  38.1      > 40.1749   =>   False
# ZECUSD_Close_log ::  15.34     > 24.2761   =>   False
# LTCUSD_Close_log ::  6.1       > 12.3212   =>   False
# BTCUSD_Close_log ::  1.95      > 4.1296    =>   False

定常性検定

拡張ディッキー=フラー(ADF)検定により、ラグp次のAR(p)過程が単位根過程であるかを検定します。以下の関数(ライブラリ)を使います。
statsmodels.tsa.stattools.adfuller(x,maxlag,autolag)
Parameters:
x-テストするデータ
maxlag–ラグ次数の最大
autolag-ラグ次数決定のための情報基準量(AICまたはBIC)
Returns:
結果–すべてのテスト結果(dictionary)

# Import Statsmodels
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tools.eval_measures import rmse, aic

def adfuller_test(series, signif=0.05, name='', verbose=False):
    """Perform ADFuller to test for Stationarity of given series and print report"""
    r = adfuller(series, autolag='AIC')
    output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
    p_value = output['pvalue'] 
    def adjust(val, length= 6): return str(val).ljust(length)

    # Print Summary
    print(f'    Augmented Dickey-Fuller Test on "{name}"', "\n   ", '-'*47)
    print(f' Null Hypothesis: Data has unit root. Non-Stationary.')
    print(f' Significance Level    = {signif}')
    print(f' Test Statistic        = {output["test_statistic"]}')
    print(f' No. Lags Chosen       = {output["n_lags"]}')

    for key,val in r[4].items():
        print(f' Critical value {adjust(key)} = {round(val, 3)}')

    if p_value <= signif:
        print(f" => P-Value = {p_value}. Rejecting Null Hypothesis.")
        print(f" => Series is Stationary.")
    else:
        print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.")
        print(f" => Series is Non-Stationary.")

# ADF Test on each column
for name, column in df_train.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')

#    Augmented Dickey-Fuller Test on "ETHUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -2.0733
# No. Lags Chosen       = 1
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.2554. Weak evidence to reject the Null Hypothesis.
# => Series is Non-Stationary.


#    Augmented Dickey-Fuller Test on "ZECUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -1.1881
# No. Lags Chosen       = 24
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.6787. Weak evidence to reject the Null Hypothesis.
# => Series is Non-Stationary.


#    Augmented Dickey-Fuller Test on "LTCUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -1.8017
# No. Lags Chosen       = 7
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.3796. Weak evidence to reject the Null Hypothesis.
# => Series is Non-Stationary.


#    Augmented Dickey-Fuller Test on "BTCUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -3.0881
# No. Lags Chosen       = 1
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.0274. Rejecting Null Hypothesis.
# => Series is Stationary.

非定常の結果が出たので時系列データの差分系列をとります。

# 1st difference
df_train_diff1 = df_train.diff().dropna()

df_train_diff1.plot(kind='line',y=df_train_diff1.columns[0:4],figsize=(20,5),grid=True)

close_log_diff1.png

差分系列に対してADF検定を行います。

# ADF Test on each column of 1st Differences Dataframe
for name, column in df_train_diff1.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')

#    Augmented Dickey-Fuller Test on "ETHUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -20.961
# No. Lags Chosen       = 3
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.0. Rejecting Null Hypothesis.
# => Series is Stationary.


#    Augmented Dickey-Fuller Test on "ZECUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -6.044
# No. Lags Chosen       = 24
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.0. Rejecting Null Hypothesis.
# => Series is Stationary.


#    Augmented Dickey-Fuller Test on "LTCUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -13.3198
# No. Lags Chosen       = 6
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.0. Rejecting Null Hypothesis.
# => Series is Stationary.


#    Augmented Dickey-Fuller Test on "BTCUSD_Close_log" 
#    -----------------------------------------------
# Null Hypothesis: Data has unit root. Non-Stationary.
# Significance Level    = 0.05
# Test Statistic        = -41.8013
# No. Lags Chosen       = 0
# Critical value 1%     = -3.435
# Critical value 5%     = -2.864
# Critical value 10%    = -2.568
# => P-Value = 0.0. Rejecting Null Hypothesis.
# => Series is Stationary.

差分過程は、定常過程と判断されました。

赤池情報基準量(AIC:Akaike Information Criteria)

ARモデルの次数決定には、AICが使われることが多いです。これは「ケチの原理」に基づくもので、モデルの次数が多くなれば、モデルの精度は高くなりますが、次数が多すぎるのは適切とは言えないということから、以下の式が最小になるように次数を決めます。

AIC=-2L(\theta)+2k

$L(\theta)$はモデルの当てはまりの良さ。kはモデルの次数です。図にすると以下のような感じになります。

aic4.png

その他、ベイズ情報基準量(BIC)、シュワルツ情報基準量(SIC)などもありますが、基準量の選択に適切な判断基準はなく、使う側の裁量に任されることが多いようです。

以下で最適次数を求めてみます。

model = VAR(df_train_diff1)
for i in [1,2,3,4,5,6,7,8,9]:
    result = model.fit(i)
    print('Lag Order =', i)
    print('AIC : ', result.aic)
    print('BIC : ', result.bic)
    print('FPE : ', result.fpe)
    print('HQIC: ', result.hqic, '\n')

     .....
# Lag Order = 5
# AIC :  -52.321876391993534
# BIC :  -52.01345179912366
# FPE :  1.891914170815473e-23
# HQIC:  -52.20672231483013
     ..... 

次数5でモデルを構築します。

model_fitted = model.fit(nlag)
```より

##ダービン・ワトソン統計量
自己相関がないことを検定します
ダービンワトソン統計量DWは、2に近い時自己相関がないと判断されます。2より十分小さい時は正の自己相関、2より十分大きい時は負の自己相関があると判断されます

```python
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(model_fitted.resid)

for col, val in zip(df_train_diff1.columns, out):
    print(col, ':', round(val, 2))

# ETHUSD_Close_log : 2.0
# ZECUSD_Close_log : 2.0
# LTCUSD_Close_log : 2.0
# BTCUSD_Close_log : 1.99

予測

# Get the lag order
lag_order = model_fitted.k_ar
print(lag_order)  #> 4

# Input data for forecasting
forecast_input = df_train_diff1.values[-lag_order:]

# Forecast
forecast_span = 15 # = n_split
fc = model_fitted.forecast(y=forecast_input, steps=forecast_span)
df_train_diff1_forecast = pd.DataFrame(fc, index=df_test.index[-forecast_span:], columns=df_test.columns + '_1d')  
                    # df_test の index ,columnsをforecastのDataFrameにする

差分系列を予測したので原データ系列に戻す必要があります。戻し方は以下の通りです。

差分系列.png

コードは以下の通りです。 second_diffは2階差分の場合に原系列に戻します。

def invert_transformation(df_train, df_forecast, second_diff=False):
    """Revert back the differencing to get the forecast to original scale."""
    df_fc = df_forecast.copy()
    columns = df_train.columns
    for col in columns:        
        # Roll back 2nd Diff
        if second_diff:
            df_fc[str(col)+'_1d'] = (df_train[col].iloc[-1]-df_train[col].iloc[-2]) + df_fc[str(col)+'_2d'].cumsum()
        # Roll back 1st Diff
        df_fc[str(col)+'_forecast'] = df_train[col].iloc[-1] + df_fc[str(col)+'_1d'].cumsum()
    return df_fc

df_results = invert_transformation(df_train, df_train_diff1_forecast, second_diff=False)

そして予測部分のグラフです。

fig, axes = plt.subplots(nrows=2, ncols=2, dpi=150, figsize=(10,10))
for i, (col,ax) in enumerate(zip(df_train.columns, axes.flatten())):
    #df_results[col+'_forecast'].plot(legend=True, ax=ax).autoscale(axis='x',tight=True)
    df_results[col+'_forecast'].plot(legend=True, ax=ax,grid=True)
    df_test[col][-forecast_span:].plot(legend=True, ax=ax,grid=True);
    ax.set_title(col + ": Forecast vs Actuals")
    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines["top"].set_alpha(0)
    ax.tick_params(labelsize=6)

plt.tight_layout()

forecast_result.png

参考にさせていただいたコードはこちらから。VARでモデルを作る場合のほとんどが含まれています。
Vector Autoregression (VAR) – Comprehensive Guide with Examples in Python

23
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?