0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【時系列波形異常検知】 DRAGによる時系列ディスコードの探索(Python実装例付)

Last updated at Posted at 2024-07-27

[TL;DR] 以下の論文紹介とpython実装: Yankov, et al. "Disk Aware Discord Discovery: Finding Unusual Time Series in Terabyte Sized Datasets." ICDM 2007.

はじめに

時系列異常検知手法は無数に提案されていますが、単変量の時系列異常検知で最強の方法とはなんでしょう?そうですね、時系列ディスコード(Time Series Discords)ですね。
(Deep Learning系の何かだろ!と思われた方はこの辺りの文献を参照されたし[1][2]。)

時系列ディスコードとは、部分時系列であって、自身と類似したパターンが存在しない部分時系列と定義されます。↓こんなやつです。

ECGのサブシーケンス.png

定義的なもので説明すると(文献によって多少定義が揺れたりするんですが)、長さ$n$の時系列データを$T$として、$i$番目の値を$T_i$と表します。
$$T=[T_1,T_2,...,T_n]$$
部分時系列の窓幅を$L$とし、時系列の$i$番目から始まる部分時系列を$S_i$と表します。
$$S_i=[T_i,T_{i+1},...,T_{i+L-1}]$$
時系列の$1→n$番目までスライドさせていけば部分時系列は全部で$n-L+1$個あると思いますが、部分時系列の集合は$\mathcal{S}$と表記します。

さて、$S_i$と$S_j$の間の距離を$d(i,j)$とします。距離の定義は色々設定出来そうですが、単純なユークリッド距離が主流です。$d(i,j)$について、$1 \leqq j \leqq n-L+1$の範囲で全て調べて、$S_i$の最近傍を探します。それを$d_{NN}(i)$とします。

時系列ディスコード(Time Series Discords)とは、$d_{NN}(i)$が、ある大きな閾値$r$を超える場合の$S_i$のことです。これを時系列全体で力任せに調べようとすると$O(n^2)$の計算量になります。正直n=5000とかの時系列データなら、今どきのCPUならすぐ答え出るだろうという話ですが、時系列波形分析界隈では100万とかそのくらいの長さの時系列を想定していて、Discordsを高速に見つけられないかという問題が取り組まれてきました。

今回紹介するのは、効率的なDiscords探索として長きに渡り支持されてきたDRAG(Discord Range Aware Gathering)[3][4]と呼ばれるアルゴリズムです。出てきたのが2007年頃でして、決して最先端ではないことを述べておきます。余談ですが、DRAGという呼び名は当時の文献では命名されてませんでしたが、最近のDiscords探索の文献 [2] (地味に国産の論文)で命名されました。

DRAG

早速DRAGの仕組みについて説明します。DRAGは①候補選択フェーズと②改良フェーズの2段階に分かれます。②の改良フェーズは、絞り込まれた候補の集合に対してほぼ力任せに探索してるだけなので、①で候補を絞り込むトリックが重要という感じです。

STEP1 候補選択フェーズ

このフェーズでは、誤検出(最近傍をよくよく調べたらディスコードではないもの)があってもいいので、部分時系列集合$\mathcal{S}$の中からディスコードの候補を絞り込もうといったフェーズです。

具体的な手続きとしては、現在の候補部分時系列の集合(論文では$C$と表記されてます)と、i番目の部分時系列$S_i$について距離を全て調べて、1つでも$r$を下回るものがあればそのままスルー。そうでなければ、$S_i$はディスコードかもしれないってことで、候補集合$C$に加えます。どういうことかって言うと、ディスコードは最終的には$S_i$の最近傍の距離(距離最小の奴)が$r$を超えるかどうかって問題な訳ですが、最近傍ですらない奴が$r$未満なら最近傍も$r$未満確定なのでディスコードでないことが確定する訳です。

iを1からnまで順に動かして上記の手続きを繰り返し、いい塩梅の候補集合を作るって感じです。以下Pythonの実装例の説明です。

import numpy as np
from numba import njit
from typing import Tuple

@njit
def generate_subsequences(TS: np.ndarray, window_size: int, z_normalize: bool = True) -> np.ndarray:
    """
    Function to create a set of subsequences from time series data.

    Parameters
    TS : 1D time series data of length n
    window_size : length of the subsequences 
    Returns
    S : a set of subsequences with shape (n - window_size + 1, window_size)
    """
    n: int = TS.shape[0]
    output_shape: Tuple[int, int] = (n - window_size + 1, window_size)
    S: np.ndarray = np.empty(output_shape, dtype=TS.dtype)
    for i in range(output_shape[0]):
        subsequence: np.ndarray = TS[i : i + window_size]
        if z_normalize:
            mean: float = np.mean(subsequence)
            std: float = np.std(subsequence)
            S[i] = (subsequence - mean) / std
        else:
            S[i] = subsequence
    return S

@njit
def candidates_selection_phase(S: np.ndarray, r: float) -> np.ndarray:
    """
    Phase 1 of DRAG.
    
    Parameters
    S : a set of subsequences with shape (n - window_size + 1, window_size)
    r : discords threshold
    Returns
    C : flags whether S[i] is a candidate or not. (n - window_size + 1)
    """
    discords_candidate_size: int
    window_size: int
    discords_candidate_size, window_size = S.shape
    C: np.ndarray = np.zeros(discords_candidate_size, dtype=np.bool_)
    for i in range(discords_candidate_size):
        iscandidate: bool = True
        j_indexes: np.ndarray = np.where(C == True)[0]
        for j in j_indexes:
            if abs(j - i) >= window_size: # Is (i,j) trivial match?
                d: float = np.linalg.norm(S[i] - S[j])
                if d < r:
                    C[j] = False
                    iscandidate = False
        if iscandidate:
            C[i] = True
    return C

最初に、関数generate_subsequencesによって、1次元時系列データTSから長さwindow_sizeの部分時系列集合を作成してます。なお、部分時系列の正規化オプションがついてます。多くは語りませんが最近の流行です。

candidates_selection_phaseでは上述した手続きでiを最後まで動かす感じです。Cはiが候補か否かを意味する2値の1次元リストです。候補ならTrueが入ってます。最初のイテレーションで自動的に$S_1$(python内だと$S_0$)が自動的に候補集合に入り、残りのイテレーションで候補として棄却するか迎え入れるかを前述した感じで判定します。$r$がいい塩梅だと候補集合はせいぜい数個なので、STEP2の計算が爆速になるって感じです。

trivial matchについても言及しなければいけません。

if abs(j - i) >= window_size: # Is (i,j) trivial match?

の部分です。trivial matchは$i$と$j$が部分時系列同士で重なり合うほど近すぎると必然的に$d(i,j)$が小さくなる訳ですが、本来意図した最近傍じゃないよねってことで全ての計算から無視されます。このプログラムでは1点でも重なってればtrivial match判定してますが、どれくらい近ければtrivial match扱いするかは諸説あります。

STEP2 改良フェーズ

改良フェーズで行うことは単純で、候補集合$C$がDiscordsなのかどうかを調べます。STEP1ではDiscordsは絶対あり得ないものを除外しただけなので、残った候補集合の最近傍の値が$r$より上か下かは未知な訳です。

手続きとしては、各候補集合を取り出し($C$の中でTrueだったindex)、$\mathcal{S}$の全ての部分時系列との距離を求めて最小値を決め、それが$r$を下回ってればDiscordsじゃないので候補から除外($C$のTrueをFalseに変える)する感じです。

とまぁ考え方はこんな感じなのですが、元論文のアルゴリズムだとiとjの処理が逆で、それに従って以下のPythonプログラムもiとjは逆の順番で実装してます。つまり、$\mathcal{S}$の部分時系列についてi番目を取り出して、各候補集合jとの距離を求めるといった順になってるということです。まぁやってることは同じです。

@njit
def discord_refinement_phase(S: np.ndarray, C: np.ndarray, r: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Phase 2 of DRAG.
    
    Parameters
    S : a set of subsequences with shape (n - window_size + 1, window_size)
    C : flags whether S[i] is a candidate or not. (n - window_size + 1)
    r : discords threshold
    Returns
    C : flags whether S[i] is a candidate or not. (n - window_size + 1)
    C_dist : NN distance if C[i] == True, else inf (n - window_size + 1)
    """
    discords_candidate_size: int
    window_size: int
    discords_candidate_size, window_size = S.shape
    C_dist: np.ndarray = np.ones(discords_candidate_size) * np.inf
    for i in range(discords_candidate_size):
        j_indexes: np.ndarray = np.where(C == True)[0]
        for j in j_indexes:
            if abs(j - i) >= window_size: # Is (i,j) trivial match?
                d: float = np.linalg.norm(S[i] - S[j])
                if d < r:
                    C[j] = False
                else:
                    C_dist[j] = min(d, C_dist[j])
    return C, C_dist

def DRAG(TS: np.ndarray, window_size: int, r: float, z_normalize: bool = True) -> Tuple[np.ndarray, np.ndarray]:
    """
    DRAG algorithm for time series discords discovery.
    
    Parameters
    TS : 1D time series data of length n
    window_size : length of the subsequences 
    r : discords threshold
    Returns
    candidate_flags : flags whether S[i] is a candidate or not. (n - window_size + 1)
    NN_distances : NN distance if C[i] == True, else inf (n - window_size + 1)
    """
    subsequences: np.ndarray = generate_subsequences(TS, window_size, z_normalize)
    candidate_flags: np.ndarray = candidates_selection_phase(subsequences, r)
    candidate_flags, NN_distances = discord_refinement_phase(subsequences, candidate_flags, r)
    return candidate_flags, NN_distances

STEP1の出力である候補集合flag $C$をSTEP2で再度チェックして、Trueとなっている部分のうち誤検出だったものをFalseに塗り替える感じです。

Pythonでの異常検知実施例

お馴染み(?)のECG時系列データで実験してみます。
https://www.cs.ucr.edu/~eamonn/discords/qtdbsel102.txt

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from drag import DRAG

#データの読み込み
ECG = pd.read_csv('qtdbsel102.txt', header=None, delimiter='\t')    
X = ECG.values[:5000,2]

#ハイパラ
w_size = 200
r = 15

#DRAGアルゴリズムの適用
C, C_dist = DRAG(X,w_size,r)
top_1_discord_start_point = np.where(C)[0][C_dist[np.where(C)[0]].argmax()]

#ECG dataとdiscord可視化
plt.figure(figsize=(10, 3))
plt.grid()
plt.plot(X)
plt.axvspan(top_1_discord_start_point,
             top_1_discord_start_point+w_size, facecolor='r', alpha=0.2)
plt.xlim(0,5000)

ECGとDRAG.png

本データおよびr=15の閾値設定だと、異常波形付近の複数の部分時系列が候補として出力されるので、最も距離$d$が大きかったものを赤領域でハイライトしてる感じです。top_1_discord_start_pointのところです。

Discussion

  • 適切な$r$ってどうやって決めればいいの?

DRAGでは、$r$が小さすぎると力任せ探索と同等に遅くなり、大きすぎるとディスコードの探索に失敗して空の$C$を出力します。じゃあどうやって$r$を適切に設定すればいいかって話ですが、元論文だと最近傍値を部分的に計算しておおよその最近傍値の分布を計算し、$r$を比較的大きく設定して、見つからなかったら徐々に小さくしていくといったヒューリスティックな感じの方法が紹介されてました。主観ですがあまりエレガントな方法はないようです。少し文脈が変わりますが、DRAGを拡張した文献[2]ではもう少し決定論的方法が紹介されています。

  • 適切な窓幅$L$ってどうやって決めればいいの?

これに関しては元論文には言及がありませんし、実は良いサブシーケンス長を見つけることはそれ自体が研究トピックになってたりします。文献[2]はこの動機で、DRAGを拡張することで任意長のディスコードを探索する効率的な方法を提案してます。

おわりに

本記事では、日本コミュニティだと大変マイナーな、時系列異常検知に役立つ時系列ディスコードと呼ばれる概念と、その探索アルゴリズムで主要な手法の1つ(主観)であるDRAGを紹介しました。

DRAGのPythonでの実装公開はgithub上でも初だと思いますが、文献[2]の技術に関するmatlabで書かれたプログラム(↓)の一部を参考にさせて頂きました。
https://sites.google.com/view/merlin-find-anomalies/MERLIN
いくつかの機能追加や改善がありましたが、元論文で書かれたアルゴリズムを忠実に再現するため必要最低限の機能だけ実装しました。紹介したプログラムをまとめたものはこちら(↓)です。
https://github.com/k-kotera/DRAG-python

参考文献

[1] Wu, Renjie, and Eamonn J. Keogh. "Current time series anomaly detection benchmarks are flawed and are creating the illusion of progress." IEEE transactions on knowledge and data engineering 35.3 (2021): 2421-2429.
[2] Nakamura, Takaaki, et al. "Merlin: Parameter-free discovery of arbitrary length anomalies in massive time series archives." 2020 IEEE international conference on data mining (ICDM). IEEE, 2020.
[3] Yankov, Dragomir, Eamonn Keogh, and Umaa Rebbapragada. "Disk Aware Discord Discovery: Finding Unusual Time Series in Terabyte Sized Datasets." Seventh IEEE International Conference on Data Mining (ICDM 2007). IEEE, 2007.
[4] Yankov, Dragomir, Eamonn Keogh, and Umaa Rebbapragada. "Disk aware discord discovery: Finding unusual time series in terabyte sized datasets." Knowledge and Information Systems 17 (2008): 241-262.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?