LoginSignup
27
23

More than 3 years have passed since last update.

密度比推定による変化検知

Last updated at Posted at 2018-04-15

概要

編集:2019/10/13 にリファクタリングをしました。

最近下記の本を読んで変化検知手法の基礎について学びました。
異常検知と変化検知 (機械学習プロフェッショナルシリーズ)
2018/4現在行っている仕事では、とある振動データの異常検出を目的としてソリューションの検討を行っています。しかし、検知手法の部分の有識者がおらずまったく議論が進まなかったので、自分で少し勉強してみました。
その中に、時系列分析をする手法として密度比推定という手法がありました(Chapter 11)。

密度比推定の考え方をざっくり説明します。正常標本がなんらかの分布によって生成されていると仮定します。そして、異常値を含むデータの標本分布を調べ、分布の比をとり、1からのずれを測ります。異常値は正常標本の分布から外れて生成されるので分布の比、すなわち密度比が1からずれてくるというわけです。実際のアルゴリズムではデータの分布を推定するのではなく、密度比を直接推定します。
密度比の推定のためには正常データと異常を含むデータを必要とします。詳細な説明は他サイトでも行われているため、参考に挙げるまでに留めます。

参考

下記サイトで詳細な説明が見られます。

実装

今回はカルバックライブラー密度比推定法を実装しました。カーネルはRBFカーネルを用いています。バンド幅は交差確認法で決定します。また、参考サイトではパラメータthetaが非負である必要があるとありますが、密度比が負にならなければ問題ないと思います(が、どうなんですかね)。より精度を上げたければ、thetaが負になる前にステップ幅を小さくしていくとか、やりようがあると思いますが、今回の例では十分な精度が出ているので、実装はしませんでした。

入力データはnormal_data.txt、error_data.txt2つで1行目から最終行の順に時間が経過しているようにデータを入れます。またベクトルデータにも対応しており、その場合は行ベクトルとして1行が一つのベクトルになるようにデータを作成してください。RBFカーネルを生成するときにはnormal_dataを使用します。

プログラムを実行すると目的関数の収束具合と異常を含むデータの異常度をグラフで表示します。

クラスファイルの前提ライブラリはnumpyです。mainとして実行する場合はグラフの表示にmatplotlib、cross validationのデータの分割にsklearnを使用しています。

ソースとデータはhttps://github.com/ground0state/anomaly-detectionに置きました。

density_ratio_estimation.py
"""
Copyright © 2019 ground0state. All rights reserved.
"""

import numpy as np


class DensityRatioEstimation():
    def __init__(self, band_width=1.0, learning_rate=0.1, num_iterations=100):
        self.band_width = band_width
        self.theta = None
        self.learning_rate = learning_rate
        self.num_iterations = num_iterations
        self.J = None
        self.psi = None
        self.psi_prime = None
        self.eps = 10e-15

    def fit(self, X_normal, X_error):
        self.theta = np.ones(len(X_normal))
        self.J = []

        self.psi = np.asarray([self._gauss_kernel(x, X_normal)
                               for x in X_normal])
        self.psi_prime = np.asarray(
            [self._gauss_kernel(x, X_normal) for x in X_error])
        dJ_1 = self.psi_prime.sum(axis=0) / len(X_error)

        for _ in range(self.num_iterations):
            # calculate J
            r = np.dot(self.psi, self.theta)
            r = np.maximum(r, self.eps)
            r_prime = np.dot(self.psi_prime, self.theta)
            r_prime = np.maximum(r_prime, self.eps)
            self.J.append(np.sum(r_prime)/len(X_error) -
                          np.sum(np.log(r))/len(X_normal))

            # calculate gradient
            dJ = dJ_1 - (self.psi / r).sum(axis=0) / len(X_normal)
            self.theta -= self.learning_rate * dJ

    def _gauss_kernel(self, x, X):
        return np.exp(-np.sum((x - X)**2, axis=1)/(2*self.band_width**2))

    def get_score(self):
        return self.J

    def objective(self, X_normal, X_error):
        # calculate J
        psi = np.asarray([self._gauss_kernel(x, X_normal)
                          for x in X_normal])
        psi_prime = np.asarray([self._gauss_kernel(x, X_normal)
                                for x in X_error])
        r = np.dot(psi, self.theta)
        r_prime = np.dot(psi_prime, self.theta)
        J = np.sum(r_prime)/len(X_error) - np.sum(np.log(r))/len(X_normal)
        return J

    def predict(self, X_normal, X_error):
        psi_prime = np.asarray([self._gauss_kernel(x, X_normal)
                                for x in X_error])
        r_prime = np.dot(psi_prime, self.theta)
        r_prime = np.maximum(r_prime, self.eps)
        return -np.log(r_prime)


if __name__ == '__main__':
    import matplotlib.pyplot as plt
    from sklearn.model_selection import KFold

    normal_data = np.loadtxt("../input/normal_data.csv", delimiter=",")
    error_data = np.loadtxt("../input/error_data.csv", delimiter=",")

    kf_iter = KFold(n_splits=3).split(normal_data)

    # A rule-of-thumb bandwidth estimator
    # <https://en.wikipedia.org/wiki/Kernel_density_estimation>
    SILVERMAN = 1.06*np.std(normal_data, axis=0)/pow(len(normal_data), 1/5)

    ks = SILVERMAN + [0.1, 0.5, 1.0]
    scores = []
    ks_score = {}
    for k in ks:
        for train_index, valid_index in kf_iter:
            train_normal_data = normal_data[train_index]
            valid_normal_data = normal_data[valid_index]
            train_error_data = error_data[train_index]
            valid_error_data = error_data[valid_index]

            model = DensityRatioEstimation(
                band_width=k, learning_rate=0.1, num_iterations=1000)
            model.fit(train_normal_data, train_error_data)
            scores.append(model.get_score())

        ks_score[k] = np.mean(scores)

    min_k = min(ks_score, key=ks_score.get)
    print('min k:', min_k)

    model = DensityRatioEstimation(
        band_width=min_k, learning_rate=0.1, num_iterations=1000)
    model.fit(normal_data, error_data)

    scores = model.get_score()
    pred = model.predict(normal_data, error_data)

    plt.plot(scores)
    plt.show()

    plt.plot(pred)
    plt.show()

スクリーンショット 2019-10-13 16.09.54.png
スクリーンショット 2019-10-13 16.09.59.png

27
23
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
27
23