8
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

お題は不問!Qiita Engineer Festa 2023で記事投稿!

ChatGPT-4 と始める音の異常検知:One-Class SVM と AutoEncoder で機械の稼働音データから異常を検出しよう!

Posted at

1. はじめに

本記事はエンジニア向けに ChatGPT の活用例を紹介する「ChatGPTと始める」シリーズです。今回は、以下で公開されている機械の稼働データを使用して、機械の異常検知に挑戦します!

MIMII Dataset: 複数の種類の産業機械(ポンプ、ファン、バルブ、スライダー)の正常時と故障時の動作音データが公開されています。

今回はファンの動作音データを使って、正常な動作音のデータのみを学習用データとして使用してモデルを作成した場合に、異常を正しく検出できるかどうかを検証します。

本記事では、いろいろと検証していく過程で「異常検知」という問題に関する難しさを感じたので、その内容についても紹介したいと思います。

「うまくいった」という記事はよく見ますが、「ここに困った」という報告はあまりされないので、ご参考になれば幸いです。

2. 音声データを確認する

2.1. データセットの構造

データセットは以下の構造になっています。

データセットについての補足

  • SN比ごとに3種類のデータが用意されています。「-6_db_fan」がノイズが多く、「6_dB_fan」がノイズが最も少ないです。
  • id_00, ..., id_06 は対象となる機械Noのことです。4台の機械が用意されていて、機械ごとにそれぞれ特性(基本周波数)も異なります
  • 「normal」, 「abnormal」は正常、異常データのことです。機械Noによってデータ数が異なりますが、正常データは約1000、異常データは約400用意されています。

2.2. 可視化してみる

まずはどんな音声データなのか、可視化して確認してみましょう。
可視化のためのコードの生成を、ChatGPT に依頼しました。

スクリーンショット 2023-06-24 165904.png

返ってきたコードはこちらです。

import matplotlib.pyplot as plt
import librosa
import librosa.display
import numpy as np

def load_audio_file(file_path):
    '''音声ファイルを読み込む

    Args:
        file_path (str): 音声ファイルへのパス

    Returns:
        audio (np.array): オーディオデータ
        sr (int): サンプリング周波数
    '''
    audio, sr = librosa.load(file_path, sr=16000) # 16kHzのサンプリング周波数で読み込む
    return audio, sr

def plot_waveform(audio, sr):
    '''振幅時間プロットを描画する

    Args:
        audio (np.array): オーディオデータ
        sr (int): サンプリング周波数
    '''
    plt.figure(figsize=(12, 4))
    librosa.display.waveshow(audio, sr=sr)
    plt.title('Waveform')
    plt.xlabel('Time (seconds)')
    plt.ylabel('Amplitude')
    plt.show()

def plot_spectrogram(audio, sr):
    '''スペクトログラムを描画する

    Args:
        audio (np.array): オーディオデータ
        sr (int): サンプリング周波数
    '''
    plt.figure(figsize=(12, 4))
    D = librosa.amplitude_to_db(np.abs(librosa.stft(audio)), ref=np.max)
    librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
    plt.title('Spectrogram')
    plt.colorbar(format='%+2.0f dB')
    plt.xlabel('Time (seconds)')
    plt.ylabel('Frequency (Hz)')
    plt.show()

def main():
    id = "id_00"
    wav_file = "00000000"
    file_path = f'data/6_dB_fan/fan/{id}/normal/{wav_file}.wav'
    audio, sr = load_audio_file(file_path)
    plot_waveform(audio, sr)
    plot_spectrogram(audio, sr)

if __name__ == "__main__":
    main()

filepath だけ環境に合うように書き換えました。
普通は関数の使い方から調べないといけないので、このレベルのコードが10秒くらいで出てくるのは本当にありがたいです。

例として、ノイズ成分が一番少ない「6_dB_fan」で、「id_00」と「id_06」の0000000.wavファイルを対象にプログラムを実行してみました。

id_00

plot01_id00.png
plot02_id00.png

id_06

plot01_id06.png
plot02_id06.png

あとで予測結果を紹介しますが、「id_06」の方が、特性がはっきりとしているため、異常の検知も精度が高くなりました。

3. 音の異常検知をやってみよう!

3.1. ChatGPT に作業プロセスを提案してもらう

音声データ解析の専門家ではないため、まずは ChatGPT にどういったプロセスで実施すればよいのか相談してみます。

スクリーンショット 2023-06-24 173724.png

ChatGPT の回答がこちらです。

スクリーンショット 2023-06-24 173755.png

プロンプトが曖昧なので、実行するたびに微妙に結果が異なります。
一番わかりやすそうで、かつ具体的な処理内容が書いているもの(内容が書いていないと、長いスレッドでは段々と実施内容が、冒頭の提案とずれることがあります)を選びました。

ここからは、目次に沿って「1. データの準備 について教えてください。」のように順番に手順を聞く形で進めていきますが、本記事では(長くなるため)細かい点は省略して、結果を中心に紹介します。

どのように実施しているのか、詳細を知りたい場合は、以下のシリーズ記事をご確認ください。

3.2. 使用した特徴量

今回使用した特徴量(MFCC、Chroma features、Spectral Contrast)について紹介します。

スクリーンショット 2023-06-24 174913.png

データ分析で重要なのが特徴量だと思いますが、この特徴量が適切なのか、十分に評価ができていないです。
ChatGPT に聞いてみると色々と教えてくれるのですが、「対象によって適切なものを選ぶべき」とのことで、このあたりは、専門性や経験があった方がもちろんよいと思います。

4. One-Class SVM による異常検知

4.1. One-Class SVM の概要とコード

目次に提案があった、One-Class SVM から試していきます。

スクリーンショット 2023-06-25 071633.png

機械学習でよく使う SVM(サポートベクターマシン)を異常検知のために応用したという感じでしょうか。

ChatGPTが生成したコードは以下になります。

長いので折りたたみます
from sklearn.model_selection import train_test_split
import numpy as np
import librosa
from sklearn import svm
from sklearn import metrics
import pickle
import os
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve

def extract_features(data, sr):
    """
    音声データから特徴量を抽出します。

    Parameters
    ----------
    data : np.array
        音声データ
    sr : int
        サンプリングレート

    Returns
    -------
    features : np.array
        音声データから抽出された特徴量
    """
    mfccs = librosa.feature.mfcc(y=data, sr=sr, n_mfcc=13)
    chroma_stft = librosa.feature.chroma_stft(y=data, sr=sr)
    spec_contrast = librosa.feature.spectral_contrast(y=data, sr=sr)
    features = np.concatenate((np.mean(mfccs, axis=1), np.mean(chroma_stft, axis=1), np.mean(spec_contrast, axis=1)))
    return features

def load_audio_file(file_path):
    """
    音声ファイルを読み込みます。

    Parameters
    ----------
    file_path : str
        音声ファイルのパス

    Returns
    -------
    input_length : int
        サンプリングレートと音声の長さに基づいて計算された結果
    data : np.array
        音声データ
    sr : int
        サンプリングレート
    """
    data, sr = librosa.load(file_path)
    input_length = sr * librosa.get_duration(y=data, sr=sr)
    return input_length, data, sr

def split_data(features, labels, test_size=0.2, random_state=42):
    """
    データを訓練用データとテストデータに分割します。

    Parameters
    ----------
    features : list
        特徴量のリスト
    labels : list
        ラベルのリスト
    test_size : float
        テストデータの割合
    random_state : int
        乱数のシード

    Returns
    -------
    X_train, X_test, y_train, y_test : list
        分割後の訓練用データとテストデータ
    """
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)
    return X_train, X_test, y_train, y_test

def create_dataset(id, normal_files, abnormal_files, dB_set, test_ratio=0.2):
    """
    データセットを作成します。音声データから特徴量を抽出し、正常・異常データを分割します。
    処理に時間がかかるため、pickleファイルとして保存します。

    Parameters
    ----------
    id : str
        識別子
    normal_files : list
        正常な音声データファイルのリスト
    abnormal_files : list
        異常な音声データファイルのリスト
    dB_set : str
        データセットのdB設定
    test_ratio : float
        テストデータの割合 (default: 0.2)

    Returns
    -------
    features_train, features_test, labels_train, labels_test : list
        特徴量とラベルの訓練データとテストデータ
    """

    # データを保存するためのリストを初期化
    features_train = []
    labels_train = []
    features_test = []
    labels_test = []

    # pickleファイルの保存先を設定
    pickle_dir = f'./pickle/{dB_set}/'
    pickle_file = f'{pickle_dir}{id}_train_data.pickle'

    # まだ該当のpickleファイルがない場合は、新たに作成
    if not os.path.isfile(pickle_file):
        # 正常な音声ファイルから特徴量を抽出
        for i, file in enumerate(normal_files):
            # 音声ファイルを読み込む
            file_path = f'data/{dB_set}/fan/{id}/normal/{file}.wav'
            input_length, data, sr = load_audio_file(file_path)
            # 音声データから特徴量を抽出
            feature = extract_features(data, sr)
            # テスト用データと訓練用データに分割
            if i < len(normal_files) * test_ratio:
                features_test.append(feature)
                labels_test.append('normal')
            else:
                features_train.append(feature)
                labels_train.append('normal')

        # 異常な音声ファイルから特徴量を抽出
        for file in abnormal_files:
            # 音声ファイルを読み込む
            file_path = f'data/{dB_set}/fan/{id}/abnormal/{file}.wav'
            input_length, data, sr = load_audio_file(file_path)
            # 音声データから特徴量を抽出
            feature = extract_features(data, sr)
            # 異常データは全てテスト用データとして利用
            features_test.append(feature)
            labels_test.append('abnormal')

        # データをpickleファイルとして保存
        os.makedirs(pickle_dir, exist_ok=True)
        with open(pickle_file, 'wb') as file:
            data = {'features_train': features_train, 'features_test': features_test, 'labels_train': labels_train, 'labels_test': labels_test}
            pickle.dump(data , file)
    else:
        # すでにpickleファイルが存在する場合は、そのデータを読み込む
        with open(pickle_file, 'rb') as file:
            data = pickle.load(file)
            features_train = data['features_train']
            features_test = data['features_test']
            labels_train = data['labels_train']
            labels_test = data['labels_test']

    return features_train, features_test, labels_train, labels_test

def plot_roc_curve(labels, scores, auc):
    """
    ROC曲線をプロットします。

    Parameters
    ----------
    labels : list
        正解ラベル
    scores : list
        スコア
    auc : float
        AUCスコア

    Returns
    -------
    None
    """
    fpr, tpr, _ = roc_curve(labels, scores)
    plt.figure()
    plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic')
    plt.legend(loc="lower right")
    plt.show()

def train_and_evaluate(features_train, labels_train, features_test, labels_test):
    """
    モデルを訓練し、評価します。

    Parameters
    ----------
    features_train : list
        訓練データの特徴量
    labels_train : list
        訓練データのラベル
    features_test : list
        テストデータの特徴量
    labels_test : list
        テストデータのラベル

    Returns
    -------
    auc : float
        AUCスコア
    cm : ndarray
        混同行列
    """

    # モデル(One-class SVM)を訓練
    one_class_svm = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
    one_class_svm.fit(features_train)

    # 訓練データに対するモデルのスコアを取得
    scores_train = one_class_svm.decision_function(features_train)

    # テストデータに対するモデルのスコアを取得
    scores_test = one_class_svm.decision_function(features_test)

    # 異常検知において、スコアが低いほど異常とみなす
    # そのため、スコアが0未満のものは-1(異常)、それ以外は1(正常)とする
    labels_test_pred = [-1 if score < 0 else 1 for score in scores_test]

    # 評価指標(AUCと混同行列)を計算
    # ラベルは'normal'なら1、それ以外('abnormal')なら-1として数値に変換
    auc = metrics.roc_auc_score(labels_test, scores_test)
    labels_test_num = [1 if item == 'normal' else -1 for item in labels_test]
    cm = metrics.confusion_matrix(labels_test_num, labels_test_pred)
    
    # ROC曲線をプロット
    # plot_roc_curve(labels_test_num, scores_test, auc)

    return auc, cm

def main():
    """
    メインの関数です。指定したdBのデータセットを用いて、異常検知の訓練と評価を行います。
    """

    # dB設定と対象のIDリストを設定
    dB_set = '6_dB_fan'
    id_list = ["id_00", "id_02", "id_04", "id_06"]
    # 正常音声データファイルと異常音声データファイルのリストを作成
    normal_files = [str(s).zfill(8) for s in range(1, 1001)]
    abnormal_files = [str(s).zfill(8) for s in range(1, 341)]

    # 各IDごとにデータセットを作成し、モデルを訓練・評価
    for id in id_list:
        # データセットの作成
        features_train, features_test, labels_train, labels_test = create_dataset(id, normal_files, abnormal_files, dB_set)
        # モデルの訓練と評価
        auc, cm = train_and_evaluate(features_train, labels_train, features_test, labels_test)

        # 結果の表示
        print("-" * 50)
        print(f"{id} result")
        print("AUC: ", auc)
        print("Confusion Matrix: \n", cm)

if __name__ == "__main__":
    main()

目次通りに進めていくと、各章で関数を生成してくれるので、それを ChatGPT の指示通り組み上げることでコードが完成します。

変更箇所

  • コメントの書き方にばらつきがあったので、コメントの形式を整えるように依頼しました。
  • 以下の機能は、自分で追加しました。
    • wavファイルの数が多いので、特徴量を作成するのに時間がかかりました。処理時間を短くするために、一度作成した特徴量を pickleファイルに一時保存するようにしました。

4.2. 6_dB_fan での実行結果

ノイズ成分の一番少ない「6_dB_fan」データを使って、One-Class SVM の異常検知コードを実行した結果が以下になります。

ID 6_dB_fan のAUC 6_dB_fan の混同行列
id_00 0.942 [[340, 0],
[165, 35]]
id_02 0.999 [[340, 0],
[158, 42]]
id_04 0.999 [[340, 0],
[135, 65]]
id_06 1.000 [[340, 0],
[166, 34]]

まず、AUC(Area Under the Curve)は、分類モデルのパフォーマンスを評価するための指標のことで、1に近いほど性能が良いです。詳細については、下記の記事が非常にわかりやすいです。

AUCだけを見ると、どの機械(id_00, id_02,...)も 0.9 を超える結果が得られており、高い精度を達成しています。

一方で、混同行列(Confusion Matrix) については、まず表での id_00 の結果

 [[340   0]
 [165  35]]

は、

                予測
              abnormal   normal
              ----------------
実際  abnormal |  340   0
        normal |  165    35

になります。
この id_00 の結果では、異常データ(340/340件)はすべて異常と正しく予測はできていますが、一方で、正常なデータを異常と誤検知する数も多くなっています(165/200件)。他の機械の結果も同様で、AUCの結果は非常に良いにも関わらず、正常なデータを異常と誤検知する数はどれも多いです。

4.3. (考察)AUCは良いのに、判定結果の精度が悪くなる理由

AUCの結果はよいのに、なぜ混同行列の結果が悪くなるのかという理由について、ChatGPT に相談しながら勉強してみた結果、以下の理由だと考えています。

  1. AUCは「ある最適な閾値設定において、正しく予想できたか、できなかったか」でモデルを評価する指標になります。この評価は閾値全体にわたる性能を示すため、実際に選択された特定の閾値による結果(混合行列)とは必ずしも一致しません。実際に判定に使用されている閾値設定が最適でない場合に、AUCが良い値であったとしても、混同行列の結果が悪くなることがあります。

    • 補足: 先ほど紹介したコード上では、以下の行の score < 0 の部分が閾値になります。

      labels_test_pred = [-1 if score < 0 else 1 for score in scores_test]
      
  2. One-Class SVMでは、訓練時に異常と判断される閾値が0となるように学習が行われます。そのためテスト時も閾値は0でよいはずですが、実際には最適な閾値は訓練データとテストデータで異なる値になることがあります(今回使用したデータでは、実際にはもう少し小さい値が最適値となっていました)。原因としては、以下が考えられます。

    • 学習データに使用する正常データの分布と、テストデータに使用する正常データの分布が異なっている。
    • データにノイズが含まれている。
    • 特徴量の設定が不適切。
    • One-Class SVM が今回の問題に対して、十分に正常・異常を分類できるものではなかった(もしくは、パラメータ調整が不十分)

2.について、いずれも可能性であり、どれが今回の結果の主原因かは見つけられていないです。

4.4. -6_dB_fan での実行結果

つぎに、データとしてノイズの多い「-6_dB_fan」を使用すると以下の結果が得られました。比較のために右2列に追加します。

ID 6_dB_fan
のAUC
6_dB_fan
の混同行列
-6_dB_fan
のAUC
-6_dB_fan
の混同行列
id_00 0.942 [[340, 0],
[165, 35]]
0.697 [[330, 10],
[183, 17]]
id_02 0.999 [[340, 0],
[158, 42]]
0.880 [[338, 2],
[161, 39]]
id_04 0.999 [[340, 0],
[135, 65]]
0.788 [[340, 0],
[175, 25]]
id_06 1.000 [[340, 0],
[166, 34]]
0.880 [[340, 0],
[177, 23]]

ノイズの割合の少ない「6_dB_fan」と比較して、AUCの値が大きく低下し、異常検知が難しくなったことがわかります。
特に、id_00 については 0.94 → 0.69 まで現象したので、id_00 は他の Noの機械と比べて、(冒頭でスペクトログラムで紹介したように)主の動作音がノイズに隠れやすいのかなと思います。

ノイズ成分の多い「-6_dB_fan」の結果を改良するために、ハイパスフィルタを通してみたりしてみたのですが、あまり改善されませんでした。
フィルタの選択が悪いという可能性も十分にありますが、ノイズはできるだけ混入しないように、音の取得方法を工夫するべきなのかなと思います。

5. Autoencoder による異常検知

5.1. Autoencoder の概要

つぎに、ニューラルネットワークを使用した異常検知手法の一つである Autoencoder を試していきます。

Autoencoderは以下のような構成になっています。

Autoencoder について(by ChatGPT)

  • エンコーダとデコーダはそれぞれニューラルネットワークで構成されています。エンコーダは入力データをより低次元の"圧縮"表現に変換し、デコーダはその圧縮された表現を元の入力データの次元に戻します(再構築する)。
  • Autoencoder は正常データのみを使用して学習を行います。その後、学習したモデルに対して、学習時のデータに含まれていない異常なデータを入力すると、正常なデータと比べて大きな誤差を含んだものを出力します。この再構築誤差が異常検知の指標となり、一定の閾値より大きければ、そのデータは異常であると判定されます。

5.2. Autoencoder のコードと実行結果

ChatGPT が生成してくれたコードは以下になります。実行のためのすべてのコードを貼っていますが,One-Class SVMのコードと中身が異なるのは train_and_evaluate 関数だけです。

長いので折りたたみます。
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
import torch
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from torch import nn, optim
from torch.autograd import Variable
from tqdm import tqdm
import librosa
from sklearn import metrics

def extract_features(data, sr):
    """
    音声データから特徴量を抽出します。

    Parameters
    ----------
    data : np.array
        音声データ
    sr : int
        サンプリングレート

    Returns
    -------
    features : np.array
        音声データから抽出された特徴量
    """
    mfccs = librosa.feature.mfcc(y=data, sr=sr, n_mfcc=13)
    chroma_stft = librosa.feature.chroma_stft(y=data, sr=sr)
    spec_contrast = librosa.feature.spectral_contrast(y=data, sr=sr)
    features = np.concatenate((np.mean(mfccs, axis=1), np.mean(chroma_stft, axis=1), np.mean(spec_contrast, axis=1)))
    return features

def load_audio_file(file_path):
    """
    音声ファイルを読み込みます。

    Parameters
    ----------
    file_path : str
        音声ファイルのパス

    Returns
    -------
    input_length : int
        サンプリングレートと音声の長さに基づいて計算された結果
    data : np.array
        音声データ
    sr : int
        サンプリングレート
    """
    data, sr = librosa.load(file_path)
    input_length = sr * librosa.get_duration(y=data, sr=sr)
    return input_length, data, sr

def split_data(features, labels, test_size=0.2, random_state=42):
    """
    データを訓練用データとテストデータに分割します。

    Parameters
    ----------
    features : list
        特徴量のリスト
    labels : list
        ラベルのリスト
    test_size : float
        テストデータの割合
    random_state : int
        乱数のシード

    Returns
    -------
    X_train, X_test, y_train, y_test : list
        分割後の訓練用データとテストデータ
    """
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)
    return X_train, X_test, y_train, y_test

def create_dataset(id, normal_files, abnormal_files, dB_set, test_ratio=0.2):
    """
    データセットを作成します。音声データから特徴量を抽出し、正常・異常データを分割します。
    処理に時間がかかるため、pickleファイルとして保存します。

    Parameters
    ----------
    id : str
        識別子
    normal_files : list
        正常な音声データファイルのリスト
    abnormal_files : list
        異常な音声データファイルのリスト
    dB_set : str
        データセットのdB設定
    test_ratio : float
        テストデータの割合 (default: 0.2)

    Returns
    -------
    features_train, features_test, labels_train, labels_test : list
        特徴量とラベルの訓練データとテストデータ
    """

    # データを保存するためのリストを初期化
    features_train = []
    labels_train = []
    features_test = []
    labels_test = []

    # pickleファイルの保存先を設定
    pickle_dir = f'./pickle/{dB_set}/'
    pickle_file = f'{pickle_dir}{id}_train_data.pickle'

    # まだ該当のpickleファイルがない場合は、新たに作成
    if not os.path.isfile(pickle_file):
        # 正常な音声ファイルから特徴量を抽出
        for i, file in enumerate(normal_files):
            # 音声ファイルを読み込む
            file_path = f'data/{dB_set}/fan/{id}/normal/{file}.wav'
            input_length, data, sr = load_audio_file(file_path)
            # 音声データから特徴量を抽出
            feature = extract_features(data, sr)
            # テスト用データと訓練用データに分割
            if i < len(normal_files) * test_ratio:
                features_test.append(feature)
                labels_test.append('normal')
            else:
                features_train.append(feature)
                labels_train.append('normal')

        # 異常な音声ファイルから特徴量を抽出
        for file in abnormal_files:
            # 音声ファイルを読み込む
            file_path = f'data/{dB_set}/fan/{id}/abnormal/{file}.wav'
            input_length, data, sr = load_audio_file(file_path)
            # 音声データから特徴量を抽出
            feature = extract_features(data, sr)
            # 異常データは全てテスト用データとして利用
            features_test.append(feature)
            labels_test.append('abnormal')

        # データをpickleファイルとして保存
        os.makedirs(pickle_dir, exist_ok=True)
        with open(pickle_file, 'wb') as file:
            data = {'features_train': features_train, 'features_test': features_test, 'labels_train': labels_train, 'labels_test': labels_test}
            pickle.dump(data , file)
    else:
        # すでにpickleファイルが存在する場合は、そのデータを読み込む
        with open(pickle_file, 'rb') as file:
            data = pickle.load(file)
            features_train = data['features_train']
            features_test = data['features_test']
            labels_train = data['labels_train']
            labels_test = data['labels_test']

    return features_train, features_test, labels_train, labels_test

def plot_roc_curve(labels, scores, auc):
    """
    ROC曲線をプロットします。

    Parameters
    ----------
    labels : list
        正解ラベル
    scores : list
        スコア
    auc : float
        AUCスコア

    Returns
    -------
    None
    """
    fpr, tpr, _ = roc_curve(labels, scores)
    plt.figure()
    plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic')
    plt.legend(loc="lower right")
    plt.show()

class DeepAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        """
        深層オートエンコーダのクラスです。エンコーダとデコーダが含まれます。

        Parameters
        ----------
        input_dim : int
            入力次元数
        hidden_dim : int
            隠れ層の次元数
        latent_dim : int
            潜在変数の次元数
        """
        super(DeepAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim),
        )

    def forward(self, x):
        """
        順伝播を計算します。

        Parameters
        ----------
        x : torch.Tensor
            入力テンソル

        Returns
        -------
        x : torch.Tensor
            出力テンソル
        """
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    
def train_and_evaluate(features_train, labels_train, features_test, labels_test):
    """
    モデルを訓練し、評価します。

    Parameters
    ----------
    features_train : list
        訓練データの特徴量
    labels_train : list
        訓練データのラベル
    features_test : list
        テストデータの特徴量
    labels_test : list
        テストデータのラベル

    Returns
    -------
    auc : float
        AUCスコア
    cm : ndarray
        混同行列
    """
    # 入力をnumpy配列に変換
    features_train = np.array(features_train)
    features_test = np.array(features_test)

    # 各種パラメータを設定
    input_dim = features_train.shape[1]
    hidden_dim = 32
    latent_dim = 20
    num_epochs = 10000
    learning_rate = 0.001

    # データをPyTorchのテンソルに変換
    features_train_tensor = torch.from_numpy(features_train).float()
    features_test_tensor = torch.from_numpy(features_test).float()

    # モデル、損失関数、最適化手法を定義
    autoencoder = DeepAutoencoder(input_dim, hidden_dim, latent_dim)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(autoencoder.parameters(), lr=learning_rate)

    # モデルの訓練
    for epoch in tqdm(range(num_epochs)):
        autoencoder.train()
        inputs = Variable(features_train_tensor)
        outputs = autoencoder(inputs)
        loss = criterion(outputs, inputs)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    autoencoder.eval()

    # 訓練データに対する再構築誤差を計算
    train_outputs = autoencoder(Variable(features_train_tensor))
    reconstruction_errors_train = ((features_train_tensor - train_outputs)**2).sum(dim=1).detach().numpy()

    # 再構築誤差の95パーセンタイルを異常の閾値とする
    anomaly_threshold = np.percentile(reconstruction_errors_train, 95)

    # テストデータに対する再構築誤差を計算
    test_outputs = autoencoder(Variable(features_test_tensor))
    reconstruction_errors_test = ((features_test_tensor - test_outputs)**2).sum(dim=1).detach().numpy()

    # 再構築誤差を正規化
    max_error = np.max(reconstruction_errors_test)
    reconstruction_errors_test = max_error - reconstruction_errors_test

    # 再構築誤差が閾値を超えるかどうかでラベルを予測
    labels_test_pred = [-1 if error < (max_error - anomaly_threshold) else 1 for error in reconstruction_errors_test]

    # ラベルを数値に変換
    labels_test_num = [-1 if item == 'abnormal' else 1 for item in labels_test]

    # AUCと混同行列を計算
    auc = metrics.roc_auc_score(labels_test_num, reconstruction_errors_test)
    cm = metrics.confusion_matrix(labels_test_num, labels_test_pred)

    # ROC曲線をプロット
    # plot_roc_curve(labels_test_num, reconstruction_errors_test, auc)

    return auc, cm

def main():
    """
    メインの関数です。指定したdBのデータセットを用いて、異常検知の訓練と評価を行います。
    """

    # dB設定と対象のIDリストを設定
    dB_set = '6_dB_fan'
    id_list = ["id_00", "id_02", "id_04", "id_06"]
    # 正常音声データファイルと異常音声データファイルのリストを作成
    normal_files = [str(s).zfill(8) for s in range(1, 1001)]
    abnormal_files = [str(s).zfill(8) for s in range(1, 341)]

    # 各IDごとにデータセットを作成し、モデルを訓練・評価
    for id in id_list:
        # データセットの作成
        features_train, features_test, labels_train, labels_test = create_dataset(id, normal_files, abnormal_files, dB_set)
        # モデルの訓練と評価
        auc, cm = train_and_evaluate(features_train, labels_train, features_test, labels_test)

        # 結果の表示
        print("-" * 50)
        print(f"{id} result")
        print("AUC: ", auc)
        print("Confusion Matrix: \n", cm)

if __name__ == "__main__":
    main()

ノイズ成分の一番少ない、「6_dB_fan」データを使って AutoencoderとOne-Class SVMの結果と比較したのが以下になります。

ID One-Class SVM
のAUC
One-Class SVM
の混同行列
Autoencoder
のAUC
Autoencoder
の混同行列
id_00 0.942 [[340, 0],
[165, 35]]
0.911 [[223, 117],
[16, 184]]
id_02 0.999 [[340, 0],
[158, 42]]
0.991 [[321, 19],
[7, 193]]
id_04 0.999 [[340, 0],
[135, 65]]
0.983 [[319, 21],
[11, 189]]
id_06 1.000 [[340, 0],
[166, 34]]
0.995 [[340, 0],
[18, 182]]

結果から読み取れること

  • Autoencoderの方が、One-Class SVMに比べてAUCはやや低いです(結果が悪い)。
  • 一方で、混同行列を見ると、Autoencoderの方が異常データの一部の見逃しが生じていますが、正常なデータを異常と予測する数は大きく減少し,正常データを正常と判断することが多くなりました(例えば id_06の場合、異常データ 340/340 件を正しく異常と判定できており、正常データを異常と誤検知したのは 18/200 件のみになります)。

5.3. 実運用における懸念点

前節の結果を見ると、Autoencoderの方がやAUCの結果が悪いものの、この程度の差であれば、混同行列の結果から Autoencoder の方が使いやすいのではないかという印象を受けます。

ただし、実際に手を動かしていて、運用上で気になる点があったので記載します。

  1. エンコーダとデコーダとしてどのようなモデルを使えばいいのか? またそのパラメータ調整はどう行えばよいのか。

    今回使用したAutoencoderで使用したモデルは以下のものであり、hidden_dim=32 (隠れ層), latent_dim=20(圧縮後の次元)としました。

    self.encoder = nn.Sequential(
        nn.Linear(input_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, latent_dim),
        nn.ReLU()
    )
    self.decoder = nn.Sequential(
        nn.Linear(latent_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, input_dim),
    )
    

    このモデルの構造や次元数は、何度かテストデータで評価を行った結果なのですが、そもそも異常検知を行いたい場合は、今回のような検証用のテストデータが事前に得られない場合がほとんどだと思います。そのため、事前に十分なパラメータ調整を行うことができません。
    そのような中で、(特にAutoencoderの方は調整項目が多いので)どのようにモデルの構造を決定するか、また、パラメータを調整するかは、かなり難しい問題だと思います。

  2. 正常、異常の閾値の評価

    One-Class SVM でも同様の問題がありましたが、正常・異常の閾値をどうやって決めればいいのかという課題があります。Autoencoder の場合,以下の95と書いた部分が閾値になります。

    anomaly_threshold = np.percentile(reconstruction_errors_train, 95)
    

    実問題に運用する場合は、この閾値は始めは仮の値で決定し、誤検知などの様子を見つつ、調整しながら最適な値を探っていく流れになると思います。
    ただし、そんな誤検知ありきのシステムを、実環境で使用することはできるのでしょうか? また、実際の現場で、そんなに異常は発生するのでしょうか?

5.4. 異常検知に挑戦してみた感想

今回の異常検知の理論自体は機械学習を触ったことがある方ならそこまで難しくなく、(ChatGPT がコードを書いてくれますし)試してみることに関しては、ハードルが高いものではないと思います。

一方で、実際の問題に対して どうやって運用していくのか (手法の選定や、正常・異常の閾値の設定など...)、そのプロセスが非常に難しい問題だと感じました。

(過去の実績データが、正常・異常含めてものすごい数あれば問題ないと思いますが...)

またデータの取り方についても、そもそものデータがノイズだらけだと、異常の検出が非常に難しくなると思います。

他の異常検出モデルとして、Isolation ForestとLOF もやってみたのですが、あまり大きな変化はなかったので割愛します。

6. おわりに

あとから「これを使っていれば検知できました」と言うのは簡単なのですが、実際に、知る前にやってみるのは大変難しい問題なんだなと感じました。
実際の企業では、これをどうやって運用しているのでしょうか...

コツコツと続けていて、ちょくちょく過去記事も見てくださっているのかアクセスがあって嬉しいです。ありがとうございます。
そろそろ「ChatGPTと始める」は取った方がいいのか悩んでいます...

8
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?