#概要
編集:2019/10/13 にリファクタリングをしました。
最近下記の本を読んで変化検知手法の基礎について学びました。
異常検知と変化検知 (機械学習プロフェッショナルシリーズ)
2018/4現在行っている仕事では、とある振動データの異常検出を目的としてソリューションの検討を行っています。しかし、検知手法の部分の有識者がおらずまったく議論が進まなかったので、自分で少し勉強してみました。
その中に、時系列分析をする手法として密度比推定という手法がありました(Chapter 11)。
密度比推定の考え方をざっくり説明します。正常標本がなんらかの分布によって生成されていると仮定します。そして、異常値を含むデータの標本分布を調べ、分布の比をとり、1からのずれを測ります。異常値は正常標本の分布から外れて生成されるので分布の比、すなわち密度比が1からずれてくるというわけです。実際のアルゴリズムではデータの分布を推定するのではなく、密度比を直接推定します。
密度比の推定のためには正常データと異常を含むデータを必要とします。詳細な説明は他サイトでも行われているため、参考に挙げるまでに留めます。
#参考
下記サイトで詳細な説明が見られます。
#実装
今回はカルバックライブラー密度比推定法を実装しました。カーネルはRBFカーネルを用いています。バンド幅は交差確認法で決定します。また、参考サイトではパラメータtheta
が非負である必要があるとありますが、密度比が負にならなければ問題ないと思います(が、どうなんですかね)。より精度を上げたければ、theta
が負になる前にステップ幅を小さくしていくとか、やりようがあると思いますが、今回の例では十分な精度が出ているので、実装はしませんでした。
入力データはnormal_data.txt、error_data.txt2つで1行目から最終行の順に時間が経過しているようにデータを入れます。またベクトルデータにも対応しており、その場合は行ベクトルとして1行が一つのベクトルになるようにデータを作成してください。RBFカーネルを生成するときにはnormal_dataを使用します。
プログラムを実行すると目的関数の収束具合と異常を含むデータの異常度をグラフで表示します。
クラスファイルの前提ライブラリはnumpyです。mainとして実行する場合はグラフの表示にmatplotlib、cross validationのデータの分割にsklearnを使用しています。
ソースとデータはhttps://github.com/ground0state/anomaly-detectionに置きました。
"""
Copyright © 2019 ground0state. All rights reserved.
"""
import numpy as np
class DensityRatioEstimation():
def __init__(self, band_width=1.0, learning_rate=0.1, num_iterations=100):
self.band_width = band_width
self.theta = None
self.learning_rate = learning_rate
self.num_iterations = num_iterations
self.J = None
self.psi = None
self.psi_prime = None
self.eps = 10e-15
def fit(self, X_normal, X_error):
self.theta = np.ones(len(X_normal))
self.J = []
self.psi = np.asarray([self._gauss_kernel(x, X_normal)
for x in X_normal])
self.psi_prime = np.asarray(
[self._gauss_kernel(x, X_normal) for x in X_error])
dJ_1 = self.psi_prime.sum(axis=0) / len(X_error)
for _ in range(self.num_iterations):
# calculate J
r = np.dot(self.psi, self.theta)
r = np.maximum(r, self.eps)
r_prime = np.dot(self.psi_prime, self.theta)
r_prime = np.maximum(r_prime, self.eps)
self.J.append(np.sum(r_prime)/len(X_error) -
np.sum(np.log(r))/len(X_normal))
# calculate gradient
dJ = dJ_1 - (self.psi / r).sum(axis=0) / len(X_normal)
self.theta -= self.learning_rate * dJ
def _gauss_kernel(self, x, X):
return np.exp(-np.sum((x - X)**2, axis=1)/(2*self.band_width**2))
def get_score(self):
return self.J
def objective(self, X_normal, X_error):
# calculate J
psi = np.asarray([self._gauss_kernel(x, X_normal)
for x in X_normal])
psi_prime = np.asarray([self._gauss_kernel(x, X_normal)
for x in X_error])
r = np.dot(psi, self.theta)
r_prime = np.dot(psi_prime, self.theta)
J = np.sum(r_prime)/len(X_error) - np.sum(np.log(r))/len(X_normal)
return J
def predict(self, X_normal, X_error):
psi_prime = np.asarray([self._gauss_kernel(x, X_normal)
for x in X_error])
r_prime = np.dot(psi_prime, self.theta)
r_prime = np.maximum(r_prime, self.eps)
return -np.log(r_prime)
if __name__ == '__main__':
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold
normal_data = np.loadtxt("../input/normal_data.csv", delimiter=",")
error_data = np.loadtxt("../input/error_data.csv", delimiter=",")
kf_iter = KFold(n_splits=3).split(normal_data)
# A rule-of-thumb bandwidth estimator
# <https://en.wikipedia.org/wiki/Kernel_density_estimation>
SILVERMAN = 1.06*np.std(normal_data, axis=0)/pow(len(normal_data), 1/5)
ks = SILVERMAN + [0.1, 0.5, 1.0]
scores = []
ks_score = {}
for k in ks:
for train_index, valid_index in kf_iter:
train_normal_data = normal_data[train_index]
valid_normal_data = normal_data[valid_index]
train_error_data = error_data[train_index]
valid_error_data = error_data[valid_index]
model = DensityRatioEstimation(
band_width=k, learning_rate=0.1, num_iterations=1000)
model.fit(train_normal_data, train_error_data)
scores.append(model.get_score())
ks_score[k] = np.mean(scores)
min_k = min(ks_score, key=ks_score.get)
print('min k:', min_k)
model = DensityRatioEstimation(
band_width=min_k, learning_rate=0.1, num_iterations=1000)
model.fit(normal_data, error_data)
scores = model.get_score()
pred = model.predict(normal_data, error_data)
plt.plot(scores)
plt.show()
plt.plot(pred)
plt.show()