はじめに
数値を予測するとき予測区間の推定をしたいというときはよくあると思う。
NGBoostとか使うとできるあれ。
今回は時系列データの予測をするときに深層学習で予測区間の推定したいなと思って調べたことを記録に残したい。
予測区間推定
予測区間推定をすると予測値とその取り得るばらつきはどの程度なのかを知ることができる。やっていることは基本的に確率分布の推定。
例えばNGBoostだと、モデルが推論するものとして正規分布を仮定した確率分布を設定する。
NGBoostアルゴリズムで学習させると、2種の推論結果を出すモデルが出来上がる。
この2つの出力がそれぞれ推論された正規分布の平均$μ$と標準偏差$σ$であり、点推定としては$μ$を予測値として使用し、予測区間としては$±Nσ$$(N>0)$を区間として設定するといった活用方法がある。
深層学習でも損失関数を確率分布の推論用の関数を適用すればNGBoostのような予測区間の推定ができる。例えばaotamasakiさんの記事「RNNを用いた正規分布の回帰 keras実装」では、損失関数に正規分布の負の対数尤度(パラメータは$μ$と$σ$の2つある)を設定し、それを最適化するように学習させ、$μ$と$σ$を出力するニューラルネットを構築している。
対数尤度を以下の式として、
\ln L(μ, σ^2)=-\frac{N}{2}\ln (2\pi)-\frac{N}{2}\ln (σ^2)-\frac{1}{2σ^2}\sum_{i=1}^{N}(x_i-μ)^2\\
\ln L:対数尤度,\,\,μ:平均,\,\,σ:標準偏差\,\,\,\,※
※「はじめてのパターン認識」より引用
上式の対数尤度を、下式のような負の対数尤度にして損失関数として設定。(定数項や定数倍は不要なので省略する)
\begin{align}
-\ln L_i(μ, σ^2)≈loss&=\ln (σ^2)+\frac{(x_i−μ)}{σ^2}\\
&=−\ln (β)+(x_i−μ)^2β\\
β:1/σ^2(精度)
\end{align}
aotamasakiさんの記事では、カスタムで損失関数を作成して学習させているが、同じようなことをtensorflow-probabilityを使用してもできる。(tensorflow公式の記事や、enakai00さんの記事「「確率分布をファーストクラスオブジェクトとして扱う」という観点で Tensorflow Probability を理解する」参照)
これでLSTMとか使って時系列データの予測区間の推定ができるかも、やったぜ。
ということでtensorflow-probabilityの方のアプローチでやってみた。
データ作成
まず必要なものをimport
import os
import re
import collections
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import sklearn
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import scipy
import seaborn as sns
import gzip
import glob
import datetime as dt
import gc
import sys
import tqdm as tq
from tqdm import tqdm
import time
import pickle
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, LSTM, GRU, Activation
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow_probability as tfp
tfd = tfp.distributions
jpn_fonts=list(np.sort([ttf for ttf in fm.findSystemFonts() if 'ipaexg' in ttf or 'msgothic' in ttf or 'japan' in ttf or 'ipafont' in ttf]))
jpn_font=jpn_fonts[0]
prop = fm.FontProperties(fname=jpn_font)
print(jpn_font)
sns.set()
sinカーブにノイズを乗せた疑似データを作成。
1分1行で、8000行のノイズが大きいsinカーブ20種とノイズが小さいsinカーブ20種の計40種の時系列データとしている。
# 時系列データ長
input_data_length = 8000
# sin波+Noise 1
big_noise_sin = np.array([ np.sin( np.linspace(0, np.pi*6, input_data_length) )\
* 5 + np.random.randn(input_data_length) * 0.5 for _ in range(20) ])
# sin波+Noise 2
small_noise_sin = np.array([ np.sin( np.linspace(0, np.pi*6, input_data_length) )\
* 5 + np.random.randn(input_data_length) * 0.1 for _ in range(20) ])
# カラム名リスト
cols=['noise_sin_'+i for i in np.char.zfill(np.linspace(0,big_noise_sin.shape[0]-1
,big_noise_sin.shape[0]).astype(int).astype(str)
, 2)]
# df化
big_noise_sin_df=pd.DataFrame(big_noise_sin.T, columns=cols)
# カラム名リスト
cols=['noise_sin_'+i for i in np.char.zfill(np.linspace(0+big_noise_sin.shape[0]
,small_noise_sin.shape[0]-1+big_noise_sin.shape[0]
,small_noise_sin.shape[0]).astype(int).astype(str)
, 2)]
# df化
small_noise_sin_df=pd.DataFrame(small_noise_sin.T, columns=cols)
# データ結合
data = pd.concat([big_noise_sin_df, small_noise_sin_df], axis=1)
# date index追加
data.index=pd.date_range(start='2022-1-1', periods=len(data), freq='min')
# データを可視化
fig = plt.figure(figsize = (12,8))
ax = fig.add_subplot(3,1,1)
data.plot(ax = ax, alpha=0.6, legend=False)
ax.set_title('all data')
ax = fig.add_subplot(3,1,2)
data.iloc[:,:20].plot(ax = ax, alpha=0.6, legend=False)
ax.set_title('big noise')
ax = fig.add_subplot(3,1,3)
data.iloc[:,20:].plot(ax = ax, alpha=0.6, legend=False)
ax.set_title('small noise')
plt.tight_layout()
plt.show()
データ加工
次に、GRUやLSTMといった深層学習モデルを学習するためにデータを加工する関数を作成する。
今回は40種あるデータの過去60分のデータを説明変数として、33番目の”noise_sin_33”カラムの1分先の数値を目的変数として予測するようなモデルにするので、加工後のデータは説明変数は7939(=8000-60-1)×60×40のデータ、目的変数は7939×1×1のデータとなる。
データを加工し、Train, Valid, Testの3つに分ける。
# モデル学習/適用のためにデータを加工
def generate_data(data, length_per_unit, dimension, obs_col, pred_l=1):
'''
data:df
length_per_unit:説明変数に使用する過去の時系列の長さ(N秒前、N日前など)
dimension:説明変数の次元
obs_col:目的変数名
pred_l:予測する時系列の長さ(N秒先、N日先など)
'''
# 目的変数のIndex番号
obs_ind=np.where(data.columns.to_numpy()==obs_col)[0][0]
# DataFrame→array変換
data_array = data.to_numpy()
# 時系列データを入れる箱
sequences = []
# 正解データを入れる箱
target = []
# length_per_unitごとに時系列データと正解データをセットで追加していく
for i in range(0, data_array.shape[0] - length_per_unit - pred_l):
sequences.append(data_array[i:i + length_per_unit])
target.append(data_array[(i + length_per_unit):(i + length_per_unit + pred_l)])
# 時系列データを成形
X = np.array(sequences).reshape(len(sequences), length_per_unit, dimension)
# 正解データを成形
Y = np.array(target).reshape(len(sequences), pred_l, dimension)
Y = Y[:,:,obs_ind].reshape(Y.shape[0],Y.shape[1],1)
return X,Y
# データ前処理
length=60 # 60-1分前のデータを説明変数にする
col = 'noise_sin_33'# 目的変数名
X_data, y_data = generate_data(data, length, data.shape[1], obs_col=col , pred_l=1)# 目的変数は1分後のデータ
X_train, y_train = X_data[:int(len(X_data)*0.7), :, :], y_data[:int(len(X_data)*0.7), :]# Train Data
X_valid, y_valid = X_data[int(len(X_data)*0.7):int(len(X_data)*0.8), :, :], y_data[int(len(X_data)*0.7):int(len(X_data)*0.8), :]# Valid Data
X_test, y_test = X_data[int(len(X_data)*0.8):, :, :], y_data[int(len(X_data)*0.8):, :]# Test Data
print('Train DataX{}'.format(X_train.shape), 'Valid DataX{}'.format(X_valid.shape), 'Test DataX{}'.format(X_test.shape))
print('Train DataY{}'.format(y_train.shape), 'Valid DataY{}'.format(y_valid.shape), 'Test DataY{}'.format(y_test.shape))
深層学習モデル作成
データの加工ができたので、次は深層学習ネットワークを構築する。
基本的に、以前テキストデータ分析の時に作ったことがあるネットワークを使いまわしているだけなので多層になっていたりするのに特に意味はない。(※Bidirectionalの層は消した)
確率分布の推論の時に、普通のネットワークと異なる箇所は、最後の出力層のところ。
まず中間層からつなげて、出力層の次元を2にする。これが予測する確率分布(正規分布)の$μ$と$σ$に相当すると思われる。
pred_out = tf.keras.layers.Dense(2)(hiddened)# 出力の次元を2にする(正規分布のμとσの2つ)
その後に最後の出力層として、tfp.layers.DistributionLambdaにつなげて、tfd.Normalを使って正規分布$N$$(μ,σ^2)$に変換している。
softplus関数を使っているのはおそらく$σ$が負にならないようにするためかな?この中のコードは、基本的にTensorflow公式の記事から引っ張ってきている。
scaleに1e-5のような微小な値を足しているのは0にならないようにするため、0.001(公式の記事では0.01や0.05)をかけているのは$μ$と$σ$の最適化の際の勾配に差をつけるためと思われる。※tensorflowのissue#703参照
pred_out = tfp.layers.DistributionLambda(lambda t: tfd.Normal(loc=t[..., :1], scale=1e-5 + tf.math.softplus(0.001 * t[..., 1:])))(pred_out)
損失関数は負の対数尤度を定義し、学習ではそれを最適化する。(optimizerは”adam”に設定)
以下の部分のコードについて、rv_yは最後の出力層から出力される正規分布を示し、rv_y.log_probは正規分布の対数尤度を示しているので、negloglikはyが観測されたときの正規分布の負の対数尤度を表している。
(aotamasakiさんの記事のようにカスタム関数を自作している場合、主にrv_y.log_prob部分を自作しているといえるだろうか。)
negloglik = lambda y, rv_y: -rv_y.log_prob(y)
%%time
# LSTM, GRUネットワーク組み立て(昔別のデータの時に書いたコード流用)
#Input(shape=((列数(シーケンスの長さ),次元(データの数))))
inputs = tf.keras.layers.Input(shape=((X_train.shape[1],X_train.shape[2])))
num_out1=[32]
num_out2=16
num_out3=8
activation1='relu'
optimizer='adam'
# 多層GRUネットワーク
for i in range(0, len(num_out1)):
if i == 0:
embed = tf.keras.layers.GRU(num_out1[i], return_sequences=True)(inputs)
else:
embed = tf.keras.layers.GRU(num_out1[i], return_sequences=True)(embed)
# 最後LSTM持ってくる
embed = tf.keras.layers.LSTM(num_out2, return_sequences=False)(embed)
# 活性化関数はrelu
hiddened = tf.keras.layers.Dense(num_out3, activation=activation1)(embed)
# 出力層
pred_out = tf.keras.layers.Dense(2)(hiddened)# 出力の次元を2にする(正規分布のμとσの2つ)
pred_out = tfp.layers.DistributionLambda(lambda t: tfd.Normal(loc=t[..., :1]
, scale=1e-5 + tf.math.softplus(0.001 * t[..., 1:])))(pred_out)
model = tf.keras.models.Model(inputs=inputs, outputs=pred_out)
model.summary()
negloglik = lambda y, rv_y: -rv_y.log_prob(y)
model.compile(optimizer=optimizer, loss=negloglik)
では学習。
GPUなんて持っていないけど、ただのsinカーブで数千行のデータなのでCPUでもすぐに学習は終わる。
lossも収束している感じ。
%%time
# 学習
batch_size=128
epochs=500
model_path='prob_dist_predict_model.hdf5'
call_backs=[tf.keras.callbacks.EarlyStopping(patience=10), tf.keras.callbacks.ModelCheckpoint(model_path, save_best_only=True)]
log_dir = "logs/fit/" + dt.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
# 1分先の数値を目的変数とする
history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs
, callbacks=[call_backs, tensorboard_callback]
, validation_data=(X_valid, y_valid), shuffle=False)
結果確認
学習が終わったので、結果を確認していく。
まずはTrain, Valid, Testそれぞれのデータをモデルに適用する。
適用した後に返ってくるデータから平均や標準偏差の推論値を取得できる。
results_tra = model(X_train)
results_val = model(X_valid)
results_tes = model(X_test)
print(results_tra.mean())
print(results_tra.stddev())
結果を確認前の処理として、Train, Valid, Testそれぞれの平均や標準偏差の推論値を取得し、結合しておく。
また、標準偏差の推論値を使い、平均の推論値$μ±2σ$を計算しておく。
正解である実測値も結合し、日時の配列も定義しておく。
# 平均と標準偏差を出力
pred_tra, sigma_tra=results_tra.mean().numpy().reshape(-1), results_tra.stddev().numpy().reshape(-1)# 平均, 標準偏差
pred_val, sigma_val=results_val.mean().numpy().reshape(-1), results_val.stddev().numpy().reshape(-1)# 平均, 標準偏差
pred_tes, sigma_tes=results_tes.mean().numpy().reshape(-1), results_tes.stddev().numpy().reshape(-1)# 平均, 標準偏差
# 結合
pred_all=np.hstack((pred_tra,pred_val,pred_tes))# train, valid, testの結合
sigma_all=np.hstack((sigma_tra,sigma_val,sigma_tes))# train, valid, testの結合
# 平均±2σ
nsig = 2
pred_all_pls=pred_all+(nsig*sigma_all)
pred_all_mns=pred_all-(nsig*sigma_all)
# 実測値を結合
y_all = np.vstack((y_train[:,0,:],y_valid[:,0,:],y_test[:,0,:]))
# 日時の配列を定義しておく
date_times = data.index.to_numpy()[:len(y_all)]
一応実務への応用(異常検知)を想定して、Testデータに異常値を入れておく。
# 異常値を差し込んでおく#2022-01-05 22:20:00 2022-01-05 22:29:00
y_all[7100:7110]=-2.0
# ±2σ外に出る日時のIndex番号取得
anom_span = np.where(((y_all.reshape(-1)-pred_all_mns.reshape(-1))*(y_all.reshape(-1)-pred_all_pls.reshape(-1)))>=0)[0]
結果を可視化。
実測値を青色、推論値を緑色、推論値$±2σ$を灰色の網掛けとしている。
このように、平均の各推論値を点推定として使い、標準偏差の各推論値を点推定のばらつきとして使うことで、予測区間推定ができる。
例えば、以下の図のように$±2σ$の範囲からはみ出たものを異常値とするなど、異常検知への適用にも活用できそう。
# 実測値, 推論値±2σを可視化
plt.figure(figsize=(15,6))
plt.rcParams['font.family'] = prop.get_name()
ax=plt.subplot(1,1,1)
ax.plot(date_times, y_all, label=col+' 実測値', color='b', alpha=0.8)
ax.set_xlabel('time')
ax.plot(date_times, pred_all, label=col+' 予測値', color='g', alpha=1.0)
ax.fill_between(date_times
, pred_all_mns, pred_all_pls, label=str(nsig)+'σ', color="gray", alpha=0.4)
ax.axvline(x=date_times[0], ls='--', c='gray')
ax.text(date_times[0], pred_all_pls.max(), "Train", c='gray')
ax.axvline(x=date_times[int(len(X_data)*0.7)], ls='--', c='c')
ax.text(date_times[int(len(X_data)*0.7)], pred_all_pls.max(), "Valid", c='c')
ax.axvline(x=date_times[int(len(X_data)*0.8)], ls='--', c='orange')
ax.text(date_times[int(len(X_data)*0.8)], pred_all_pls.max(), "Test", c='orange')
ax.scatter(date_times[anom_span], y_all[anom_span], label='異常箇所', color='r', alpha=1.)
ax.legend(loc='upper right')
plt.show()
おわりに
深層学習でもNGBoostのように区間推定ができるらしい、しかもtensorflow公式の記事でもやり方が載っている、ということでマネしてみた話でした。
tensorflow-probabilityって使ったことほぼなかったけどこういう使い方ができるのね。
単純にベイズ統計モデリングをするならpymc3とかでもいいかもしれんが、tensorflowやkerasと一緒に使うならtensorflow-probabilityがいいのかもね。
今回は目的変数を1つに絞り実施したが、深層学習モデルは多出力モデルも作れるので、一つのモデルで複数の変数の異常を検知できるモデルを作るなど応用も効くかもしれない。
以上!