Edited at

カルマンフィルタで変化点検知

More than 5 years have passed since last update.


動機

仕事で変化点検知をする機会がありました。その時は時間がなかった事もあり、yokkunsさんがやられていたARIMAモデルを使ったアルゴリズムを参考にさせていただき作りました。ただ、ARIMAモデルだと色々と面倒なところがあったのでkalman filterで書き換えを試みた次第です。


ARIMAモデルの問題点


  • パラメタ調整が面倒

  • 対象とするwindow以上のデータが溜まるまで解析できない

  • window内に同一データのみが並んだベクトルとなった場合、逆行列が計算出来ない


参考文献

主に参考にしたのは、みんな大好き「データマイニングによる異常検知」です。


概要

計算ステップは以下のとおりです。

計算は大きく分けて、学習ステップとスコア計算ステップに分けることができます。


学習ステップ

こちらは新しいデータが入ってくる前に計算しておくステップです。


  • あるtime windowのデータを使用し、当該データにfitする時系列モデルを作成

  • time windowのデータとの残差を計算

  • 次の時点のデータを予測


スコア計算ステップ

新しいデータが入ってきた時に計算を行うステップです。


  • 学習ステップで計算した残差と予測データを使って変化点スコアを計算

  • スコアの平滑化

変化点スコア計算の概要は以下になります。


  • 残差ベクトルを正規分布からサンプルリングされたデータとみなす

  • 残差ベクトルから正規分布の代表値(平均、標準偏差)を計算

  • 過去のデータの傾向からの予測値と観測値の差が今までのデータからみて異常な値かどうかの検定を行う(確率を見る)

  • 確率をスコアに変換する

簡単ですが説明は以上です。

では、以下にコードを記載します。


参考コード


kalman filterについてのコード


KF.py

# coding: utf8


from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log

class KalmanFiltering:
limy = 1e20 # 欠測とみなす数値の境界
GSIG2 = 1
L = 1
R = np.identity(L)
NSUM = 0.0
SIG2 = 0.0
LDET = 0.0

def __init__(self, k, p, q, term=10, w=10):
self.k = k # 階差
self.p = p # 季節性循環
self.q = q # AR成分
self.m, self.F, self.G, \
self.H, self.Q = self.FGHset(0,k,p,q,w)
self.term = term
self.strg_trm = term

self.resid = np.zeros(self.term)
self.pred = 0.0

# matrix for storage predicted value
self.XPS = np.zeros((term,self.m), dtype=np.float)
self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage predicted value
self.XFS = np.zeros((term,self.m), dtype=np.float)
self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage smoothed value
self.XSS = np.zeros((term,self.m), dtype=np.float)
self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)

def forward_backward(self, new_data, smoothing=0):
self.NSUM += 1
if self.NSUM < self.strg_trm:
self.term = int(self.NSUM)
else:
self.term = self.strg_trm
# forward
self.forward(new_data)
# smoothing
self.SMO()
if smoothing==1:
return np.mean( self.XSS[:self.term,0] )

return self.predict()[0]

def forward(self, y):
XF = self.XFS[self.term-1]
VF = self.VFS[self.term-1]

# 1span predicting
XP, VP = self.forward_predicting(VF, XF)
XF, VF = self.filtering(y, XP, VP)
self.storage_params(XP, XF, VP, VF)
# sig2 = self.SIG2 / self.NSUM
# FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
# return {'LLF':FF, 'Ovar':sig2}

def storage_params(self, XP, XF, VP, VF):
if self.NSUM>self.term:
self.XPS[:self.term-1] = self.XPS[1:self.term]
self.XFS[:self.term-1] = self.XFS[1:self.term]
self.VPS[:self.term-1] = self.VPS[1:self.term]
self.VFS[:self.term-1] = self.VFS[1:self.term]
self.normal_storage(XP, XF, VP, VF)
else:
self.normal_storage(XP, XF, VP, VF)

def normal_storage(self, XP, XF, VP, VF):
self.XPS[self.term-1] = XP
self.XFS[self.term-1] = XF
self.VPS[self.term-1] = VP
self.VFS[self.term-1] = VF

def forward_predicting(self, VF, XF):
"""1span predicting"""
XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #2週目から縦ベクトルになってしまうので、常に横ベクトルに変換
VP = self.F.dot(VF).dot(self.F.T) + self.G.dot(self.Q).dot(self.G.T)
return XP, VP

def filtering(self, y, XP, VP):
if y < self.limy:
B = np.dot( np.dot(self.H, VP), self.H.T) + self.R # Hは数学的には横ベクトル
B1 = inverse(B)
K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) # Kは縦ベクトルになる(matrix)
e = np.array(y).T - np.dot(self.H, XP.T)
XF = np.array(XP) + np.array( K * np.matrix(e) ).T # 横ベクトル
VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)
self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] # 1次元でも計算できるようにmatrixにする
self.LDET += log(linalg.det(B))
else:
XF = XP; VF = VP
return XF, VF

def SMO(self):
"""fixed-interval smoothing"""
XS1 = self.XFS[self.term-1]
VS1 = self.VFS[self.term-1]
self.XSS[self.term-1] = XS1
self.VSS[self.term-1] = VS1
for n1 in xrange(self.term):
n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
A = np.dot( np.dot(VF, self.F.T), VPI)
XS2 = XF + np.dot(A, (XS1 - XP))
VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
XS1 = XS2; VS1 = VS2
self.XSS[n-1] = XS1
self.VSS[n-1] = VS1

# TAU2xの対数尤度関数の定義
def LogL(self, parm, *args):
y=args[0]
LLF = self.forward(y)
LL = LLF['LLF']
return -LL # optimezeが最小化関数なので、対数尤度にマイナスをかけたものを返す

def predict(self, forward_time=1):
"""pridint average value"""
y = np.zeros(forward_time, dtype=np.float)
XFp=self.XFS[-1] #直近のデータ行列のみ取得
#VFp=VF[XF.shape[0]-1,:]

for n in xrange(forward_time):
XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
#VP = np.dot( np.dot(F, VF), F.T ) + np.dot( np.dot(G, Q), G.T )
y[n] = np.dot(self.H, XP) # 期待値を取るのでノイズは入れない
XFp=XP
return y

def FGHset(self, al, k, p, q, w=10):
"""季節調整モデルの状態空間表現の行列設定
al:ARモデルのαベクトル
k,p,q:階差、季節性周期、ARパラメタ数(予測する場合はk>=2とする)
w:システム誤差の分散(変化点検出では小さめに決め打ちで設定しておけば良い)
"""

m = k + p + q -1

if q>0: G = np.zeros((m,3), dtype=np.float) # 状態モデルでトレンド、季節、ARの3つを含む場合
elif p>0: G = np.zeros((m,2), dtype=np.float) #AR成分を含まない場合(q=0)
else: m=k; G = np.zeros((m,1), dtype=np.float)
F = np.zeros((m,m), dtype=np.float)
H = np.zeros((1,m), dtype=np.float)

ns = 0; ls =0
# トレンドモデルのブロック行列の構築
if k>0:
ns +=1
G[0,0] = 1; H[0,0] = 1
if k==1: F[0,0] = 1
if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
ls += k

# 季節調整成分ブロック行列の構築
if p>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(p-1): F[ls, ls+i] = -1
for i in xrange(p-2): F[ls+i+1, ls+i] = 1
ls +=p-1

# AR成分ブロック行列の構築
if q>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(q): F[ls, ls+i-1] = al[i]
if q>1:
for i in xrange(q-1): F[ls+i, ls+i-1] = 1

# シスムモデルの分散共分散行列Qの枠の算出
Q = np.eye(ns,dtype=np.float)*w

return m, F, G, H, Q



変化点検知についてのコード


KF_AnomalyDetection.py

# coding: utf-8

from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF

class KFAnomalyDetection:
datalist = []
outlier_score_list = []
change_score_list = []
outlier_score_smooth = []
change_score_smooth = []
outlier_resid = None
change_resid = None
outlier_pred = None
change_pred = None

def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.term = term

def forward_step(self, new_data):
# add new_data to datalist
if len(self.datalist)>=self.term:
self.datalist.pop(0)
self.datalist.append(new_data)
else:
self.datalist.append(new_data)

# compute score
if self.outlier_pred is None:
self.first_step(new_data)
else:
self.calculate_score_step(new_data)
self.learn_step(new_data)

def conversion_score(self, train, var):
"""convert score to log loss"""
m = np.mean(train)
s = np.std(train)
try:
if s < 1: s=1
px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
res = -log(px)
return res
except:
return 0

def first_step(self, new_data):
# learn outlier model
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, new_data)
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# learn cnage model
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)

def learn_step(self, data):
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, data)
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])

def learn_KF(self, func, data):
"""leaning KF from new data"""
pred = func.forward_backward(data)
resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
return resid, pred

def calculate_score_step(self, new_data):
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)

def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
print 'got score', score
storage_score_list.append(score)
print 'smoothing score'
storage_smooth_list.append( func.forward_backward(score, smoothing=1) )

if __name__=='__main__':
fname = 'test'
term = 3 # time window of training
smooth = 1
kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
datalist = []
of = open('score_out.txt','w')
dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
for data in dlist:
kfad.forward_step(data)
of.write( str(kfad.change_score_smooth[-1])+'\n' )
of.close()

rng = range( len(dlist.tolist()) )
plt.plot(rng,dlist,label=u"data")
plt.show()
plt.plot(rng,kfad.change_score_smooth,label=u"score")
plt.show()


相変わらず汚いコードで申し訳ありません。カルマンフィルタのコードは1年以上前に書いたんですが、今見ると「誰だよこんなウンコード書いた奴は」って思いますね。。。

んま、1年前よりは成長していると前向きに捉えておくことにしますw

スコア算出の際に標準偏差の値が小さいと異常値と判定されるポイントが多くなるため、標準偏差の最小値を1としています。

(カウントデータなら許容されるかな、くらいな感じで使っています。)

また、異常過ぎる値が入力された場合に、確率がゼロとみなされ(ゼロ値が返ってくる)、logの演算が出来なくなるため、floatの最小オーダーに置き換えています。

この辺りは裾の厚い分布を使用すれば解決出来る可能性もあると思います。


実験結果

上のコードで実行した結果を以下に貼り付けます。


元データ


変化点スコアデータ


コメント

割りと上手くスコア検出出来てますね。これだけ明確なデータだと検出出来て当たり前ですが。。。

パラメタについては、ほとんどterm(time window)しか変更する必要はありません。smoothのwindowについても1で問題ありません。つまり上のコードのsmooth部分はただただ無駄に計算しているだけ、という事になりますね...orz

それと、分散も決め打ちで問題ありませんし、大きな変化に対応させる必要はありませんので、スケールにもよりますが10~20くらいでいいんじゃないかと思います。


まとめ

ARIMAモデルよりもカルマンフィルタの方が利点が多いんじゃないかと思って作ったのですが、結果は思ったほどではありませんでした。。

改善出来そうなところを今後も探していこうと思います。

お手数ですが間違いありましたら、ご指摘いただけますと助かります。