1. はじめに
本記事はエンジニア向けに ChatGPT の活用例を紹介する「ChatGPTと始める」シリーズです。今回は、以下で公開されている機械の稼働データを使用して、機械の異常検知に挑戦します!
MIMII Dataset: 複数の種類の産業機械(ポンプ、ファン、バルブ、スライダー)の正常時と故障時の動作音データが公開されています。
今回はファンの動作音データを使って、正常な動作音のデータのみを学習用データとして使用してモデルを作成した場合に、異常を正しく検出できるかどうかを検証します。
本記事では、いろいろと検証していく過程で「異常検知」という問題に関する難しさを感じたので、その内容についても紹介したいと思います。
「うまくいった」という記事はよく見ますが、「ここに困った」という報告はあまりされないので、ご参考になれば幸いです。
2. 音声データを確認する
2.1. データセットの構造
データセットは以下の構造になっています。
データセットについての補足
- SN比ごとに3種類のデータが用意されています。「-6_db_fan」がノイズが多く、「6_dB_fan」がノイズが最も少ないです。
- id_00, ..., id_06 は対象となる機械Noのことです。4台の機械が用意されていて、機械ごとにそれぞれ特性(基本周波数)も異なります
- 「normal」, 「abnormal」は正常、異常データのことです。機械Noによってデータ数が異なりますが、正常データは約1000、異常データは約400用意されています。
2.2. 可視化してみる
まずはどんな音声データなのか、可視化して確認してみましょう。
可視化のためのコードの生成を、ChatGPT に依頼しました。
返ってきたコードはこちらです。
import matplotlib.pyplot as plt
import librosa
import librosa.display
import numpy as np
def load_audio_file(file_path):
'''音声ファイルを読み込む
Args:
file_path (str): 音声ファイルへのパス
Returns:
audio (np.array): オーディオデータ
sr (int): サンプリング周波数
'''
audio, sr = librosa.load(file_path, sr=16000) # 16kHzのサンプリング周波数で読み込む
return audio, sr
def plot_waveform(audio, sr):
'''振幅時間プロットを描画する
Args:
audio (np.array): オーディオデータ
sr (int): サンプリング周波数
'''
plt.figure(figsize=(12, 4))
librosa.display.waveshow(audio, sr=sr)
plt.title('Waveform')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.show()
def plot_spectrogram(audio, sr):
'''スペクトログラムを描画する
Args:
audio (np.array): オーディオデータ
sr (int): サンプリング周波数
'''
plt.figure(figsize=(12, 4))
D = librosa.amplitude_to_db(np.abs(librosa.stft(audio)), ref=np.max)
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
plt.title('Spectrogram')
plt.colorbar(format='%+2.0f dB')
plt.xlabel('Time (seconds)')
plt.ylabel('Frequency (Hz)')
plt.show()
def main():
id = "id_00"
wav_file = "00000000"
file_path = f'data/6_dB_fan/fan/{id}/normal/{wav_file}.wav'
audio, sr = load_audio_file(file_path)
plot_waveform(audio, sr)
plot_spectrogram(audio, sr)
if __name__ == "__main__":
main()
filepath
だけ環境に合うように書き換えました。
普通は関数の使い方から調べないといけないので、このレベルのコードが10秒くらいで出てくるのは本当にありがたいです。
例として、ノイズ成分が一番少ない「6_dB_fan」で、「id_00」と「id_06」の0000000.wavファイルを対象にプログラムを実行してみました。
id_00
id_06
あとで予測結果を紹介しますが、「id_06」の方が、特性がはっきりとしているため、異常の検知も精度が高くなりました。
3. 音の異常検知をやってみよう!
3.1. ChatGPT に作業プロセスを提案してもらう
音声データ解析の専門家ではないため、まずは ChatGPT にどういったプロセスで実施すればよいのか相談してみます。
ChatGPT の回答がこちらです。
プロンプトが曖昧なので、実行するたびに微妙に結果が異なります。
一番わかりやすそうで、かつ具体的な処理内容が書いているもの(内容が書いていないと、長いスレッドでは段々と実施内容が、冒頭の提案とずれることがあります)を選びました。
ここからは、目次に沿って「1. データの準備 について教えてください。」のように順番に手順を聞く形で進めていきますが、本記事では(長くなるため)細かい点は省略して、結果を中心に紹介します。
どのように実施しているのか、詳細を知りたい場合は、以下のシリーズ記事をご確認ください。
3.2. 使用した特徴量
今回使用した特徴量(MFCC、Chroma features、Spectral Contrast)について紹介します。
データ分析で重要なのが特徴量だと思いますが、この特徴量が適切なのか、十分に評価ができていないです。
ChatGPT に聞いてみると色々と教えてくれるのですが、「対象によって適切なものを選ぶべき」とのことで、このあたりは、専門性や経験があった方がもちろんよいと思います。
4. One-Class SVM による異常検知
4.1. One-Class SVM の概要とコード
目次に提案があった、One-Class SVM から試していきます。
機械学習でよく使う SVM(サポートベクターマシン)を異常検知のために応用したという感じでしょうか。
ChatGPTが生成したコードは以下になります。
長いので折りたたみます
from sklearn.model_selection import train_test_split
import numpy as np
import librosa
from sklearn import svm
from sklearn import metrics
import pickle
import os
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
def extract_features(data, sr):
"""
音声データから特徴量を抽出します。
Parameters
----------
data : np.array
音声データ
sr : int
サンプリングレート
Returns
-------
features : np.array
音声データから抽出された特徴量
"""
mfccs = librosa.feature.mfcc(y=data, sr=sr, n_mfcc=13)
chroma_stft = librosa.feature.chroma_stft(y=data, sr=sr)
spec_contrast = librosa.feature.spectral_contrast(y=data, sr=sr)
features = np.concatenate((np.mean(mfccs, axis=1), np.mean(chroma_stft, axis=1), np.mean(spec_contrast, axis=1)))
return features
def load_audio_file(file_path):
"""
音声ファイルを読み込みます。
Parameters
----------
file_path : str
音声ファイルのパス
Returns
-------
input_length : int
サンプリングレートと音声の長さに基づいて計算された結果
data : np.array
音声データ
sr : int
サンプリングレート
"""
data, sr = librosa.load(file_path)
input_length = sr * librosa.get_duration(y=data, sr=sr)
return input_length, data, sr
def split_data(features, labels, test_size=0.2, random_state=42):
"""
データを訓練用データとテストデータに分割します。
Parameters
----------
features : list
特徴量のリスト
labels : list
ラベルのリスト
test_size : float
テストデータの割合
random_state : int
乱数のシード
Returns
-------
X_train, X_test, y_train, y_test : list
分割後の訓練用データとテストデータ
"""
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
def create_dataset(id, normal_files, abnormal_files, dB_set, test_ratio=0.2):
"""
データセットを作成します。音声データから特徴量を抽出し、正常・異常データを分割します。
処理に時間がかかるため、pickleファイルとして保存します。
Parameters
----------
id : str
識別子
normal_files : list
正常な音声データファイルのリスト
abnormal_files : list
異常な音声データファイルのリスト
dB_set : str
データセットのdB設定
test_ratio : float
テストデータの割合 (default: 0.2)
Returns
-------
features_train, features_test, labels_train, labels_test : list
特徴量とラベルの訓練データとテストデータ
"""
# データを保存するためのリストを初期化
features_train = []
labels_train = []
features_test = []
labels_test = []
# pickleファイルの保存先を設定
pickle_dir = f'./pickle/{dB_set}/'
pickle_file = f'{pickle_dir}{id}_train_data.pickle'
# まだ該当のpickleファイルがない場合は、新たに作成
if not os.path.isfile(pickle_file):
# 正常な音声ファイルから特徴量を抽出
for i, file in enumerate(normal_files):
# 音声ファイルを読み込む
file_path = f'data/{dB_set}/fan/{id}/normal/{file}.wav'
input_length, data, sr = load_audio_file(file_path)
# 音声データから特徴量を抽出
feature = extract_features(data, sr)
# テスト用データと訓練用データに分割
if i < len(normal_files) * test_ratio:
features_test.append(feature)
labels_test.append('normal')
else:
features_train.append(feature)
labels_train.append('normal')
# 異常な音声ファイルから特徴量を抽出
for file in abnormal_files:
# 音声ファイルを読み込む
file_path = f'data/{dB_set}/fan/{id}/abnormal/{file}.wav'
input_length, data, sr = load_audio_file(file_path)
# 音声データから特徴量を抽出
feature = extract_features(data, sr)
# 異常データは全てテスト用データとして利用
features_test.append(feature)
labels_test.append('abnormal')
# データをpickleファイルとして保存
os.makedirs(pickle_dir, exist_ok=True)
with open(pickle_file, 'wb') as file:
data = {'features_train': features_train, 'features_test': features_test, 'labels_train': labels_train, 'labels_test': labels_test}
pickle.dump(data , file)
else:
# すでにpickleファイルが存在する場合は、そのデータを読み込む
with open(pickle_file, 'rb') as file:
data = pickle.load(file)
features_train = data['features_train']
features_test = data['features_test']
labels_train = data['labels_train']
labels_test = data['labels_test']
return features_train, features_test, labels_train, labels_test
def plot_roc_curve(labels, scores, auc):
"""
ROC曲線をプロットします。
Parameters
----------
labels : list
正解ラベル
scores : list
スコア
auc : float
AUCスコア
Returns
-------
None
"""
fpr, tpr, _ = roc_curve(labels, scores)
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.show()
def train_and_evaluate(features_train, labels_train, features_test, labels_test):
"""
モデルを訓練し、評価します。
Parameters
----------
features_train : list
訓練データの特徴量
labels_train : list
訓練データのラベル
features_test : list
テストデータの特徴量
labels_test : list
テストデータのラベル
Returns
-------
auc : float
AUCスコア
cm : ndarray
混同行列
"""
# モデル(One-class SVM)を訓練
one_class_svm = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
one_class_svm.fit(features_train)
# 訓練データに対するモデルのスコアを取得
scores_train = one_class_svm.decision_function(features_train)
# テストデータに対するモデルのスコアを取得
scores_test = one_class_svm.decision_function(features_test)
# 異常検知において、スコアが低いほど異常とみなす
# そのため、スコアが0未満のものは-1(異常)、それ以外は1(正常)とする
labels_test_pred = [-1 if score < 0 else 1 for score in scores_test]
# 評価指標(AUCと混同行列)を計算
# ラベルは'normal'なら1、それ以外('abnormal')なら-1として数値に変換
auc = metrics.roc_auc_score(labels_test, scores_test)
labels_test_num = [1 if item == 'normal' else -1 for item in labels_test]
cm = metrics.confusion_matrix(labels_test_num, labels_test_pred)
# ROC曲線をプロット
# plot_roc_curve(labels_test_num, scores_test, auc)
return auc, cm
def main():
"""
メインの関数です。指定したdBのデータセットを用いて、異常検知の訓練と評価を行います。
"""
# dB設定と対象のIDリストを設定
dB_set = '6_dB_fan'
id_list = ["id_00", "id_02", "id_04", "id_06"]
# 正常音声データファイルと異常音声データファイルのリストを作成
normal_files = [str(s).zfill(8) for s in range(1, 1001)]
abnormal_files = [str(s).zfill(8) for s in range(1, 341)]
# 各IDごとにデータセットを作成し、モデルを訓練・評価
for id in id_list:
# データセットの作成
features_train, features_test, labels_train, labels_test = create_dataset(id, normal_files, abnormal_files, dB_set)
# モデルの訓練と評価
auc, cm = train_and_evaluate(features_train, labels_train, features_test, labels_test)
# 結果の表示
print("-" * 50)
print(f"{id} result")
print("AUC: ", auc)
print("Confusion Matrix: \n", cm)
if __name__ == "__main__":
main()
目次通りに進めていくと、各章で関数を生成してくれるので、それを ChatGPT の指示通り組み上げることでコードが完成します。
変更箇所
- コメントの書き方にばらつきがあったので、コメントの形式を整えるように依頼しました。
- 以下の機能は、自分で追加しました。
- wavファイルの数が多いので、特徴量を作成するのに時間がかかりました。処理時間を短くするために、一度作成した特徴量を pickleファイルに一時保存するようにしました。
4.2. 6_dB_fan での実行結果
ノイズ成分の一番少ない「6_dB_fan」データを使って、One-Class SVM の異常検知コードを実行した結果が以下になります。
ID | 6_dB_fan のAUC | 6_dB_fan の混同行列 |
---|---|---|
id_00 | 0.942 | [[340, 0], [165, 35]] |
id_02 | 0.999 | [[340, 0], [158, 42]] |
id_04 | 0.999 | [[340, 0], [135, 65]] |
id_06 | 1.000 | [[340, 0], [166, 34]] |
まず、AUC(Area Under the Curve)は、分類モデルのパフォーマンスを評価するための指標のことで、1に近いほど性能が良いです。詳細については、下記の記事が非常にわかりやすいです。
AUCだけを見ると、どの機械(id_00, id_02,...)も 0.9 を超える結果が得られており、高い精度を達成しています。
一方で、混同行列(Confusion Matrix) については、まず表での id_00 の結果
[[340 0]
[165 35]]
は、
予測
abnormal normal
----------------
実際 abnormal | 340 0
normal | 165 35
になります。
この id_00 の結果では、異常データ(340/340件)はすべて異常と正しく予測はできていますが、一方で、正常なデータを異常と誤検知する数も多くなっています(165/200件)。他の機械の結果も同様で、AUCの結果は非常に良いにも関わらず、正常なデータを異常と誤検知する数はどれも多いです。
4.3. (考察)AUCは良いのに、判定結果の精度が悪くなる理由
AUCの結果はよいのに、なぜ混同行列の結果が悪くなるのかという理由について、ChatGPT に相談しながら勉強してみた結果、以下の理由だと考えています。
-
AUCは「ある最適な閾値設定において、正しく予想できたか、できなかったか」でモデルを評価する指標になります。この評価は閾値全体にわたる性能を示すため、実際に選択された特定の閾値による結果(混合行列)とは必ずしも一致しません。実際に判定に使用されている閾値設定が最適でない場合に、AUCが良い値であったとしても、混同行列の結果が悪くなることがあります。
-
補足: 先ほど紹介したコード上では、以下の行の
score < 0
の部分が閾値になります。labels_test_pred = [-1 if score < 0 else 1 for score in scores_test]
-
-
One-Class SVMでは、訓練時に異常と判断される閾値が0となるように学習が行われます。そのためテスト時も閾値は0でよいはずですが、実際には最適な閾値は訓練データとテストデータで異なる値になることがあります(今回使用したデータでは、実際にはもう少し小さい値が最適値となっていました)。原因としては、以下が考えられます。
- 学習データに使用する正常データの分布と、テストデータに使用する正常データの分布が異なっている。
- データにノイズが含まれている。
- 特徴量の設定が不適切。
- One-Class SVM が今回の問題に対して、十分に正常・異常を分類できるものではなかった(もしくは、パラメータ調整が不十分)
2.について、いずれも可能性であり、どれが今回の結果の主原因かは見つけられていないです。
4.4. -6_dB_fan での実行結果
つぎに、データとしてノイズの多い「-6_dB_fan」を使用すると以下の結果が得られました。比較のために右2列に追加します。
ID | 6_dB_fan のAUC |
6_dB_fan の混同行列 |
-6_dB_fan のAUC |
-6_dB_fan の混同行列 |
---|---|---|---|---|
id_00 | 0.942 | [[340, 0], [165, 35]] |
0.697 | [[330, 10], [183, 17]] |
id_02 | 0.999 | [[340, 0], [158, 42]] |
0.880 | [[338, 2], [161, 39]] |
id_04 | 0.999 | [[340, 0], [135, 65]] |
0.788 | [[340, 0], [175, 25]] |
id_06 | 1.000 | [[340, 0], [166, 34]] |
0.880 | [[340, 0], [177, 23]] |
ノイズの割合の少ない「6_dB_fan」と比較して、AUCの値が大きく低下し、異常検知が難しくなったことがわかります。
特に、id_00 については 0.94 → 0.69 まで現象したので、id_00 は他の Noの機械と比べて、(冒頭でスペクトログラムで紹介したように)主の動作音がノイズに隠れやすいのかなと思います。
ノイズ成分の多い「-6_dB_fan」の結果を改良するために、ハイパスフィルタを通してみたりしてみたのですが、あまり改善されませんでした。
フィルタの選択が悪いという可能性も十分にありますが、ノイズはできるだけ混入しないように、音の取得方法を工夫するべきなのかなと思います。
5. Autoencoder による異常検知
5.1. Autoencoder の概要
つぎに、ニューラルネットワークを使用した異常検知手法の一つである Autoencoder を試していきます。
Autoencoderは以下のような構成になっています。
Autoencoder について(by ChatGPT)
- エンコーダとデコーダはそれぞれニューラルネットワークで構成されています。エンコーダは入力データをより低次元の"圧縮"表現に変換し、デコーダはその圧縮された表現を元の入力データの次元に戻します(再構築する)。
- Autoencoder は正常データのみを使用して学習を行います。その後、学習したモデルに対して、学習時のデータに含まれていない異常なデータを入力すると、正常なデータと比べて大きな誤差を含んだものを出力します。この再構築誤差が異常検知の指標となり、一定の閾値より大きければ、そのデータは異常であると判定されます。
5.2. Autoencoder のコードと実行結果
ChatGPT が生成してくれたコードは以下になります。実行のためのすべてのコードを貼っていますが,One-Class SVMのコードと中身が異なるのは train_and_evaluate
関数だけです。
長いので折りたたみます。
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
import torch
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from torch import nn, optim
from torch.autograd import Variable
from tqdm import tqdm
import librosa
from sklearn import metrics
def extract_features(data, sr):
"""
音声データから特徴量を抽出します。
Parameters
----------
data : np.array
音声データ
sr : int
サンプリングレート
Returns
-------
features : np.array
音声データから抽出された特徴量
"""
mfccs = librosa.feature.mfcc(y=data, sr=sr, n_mfcc=13)
chroma_stft = librosa.feature.chroma_stft(y=data, sr=sr)
spec_contrast = librosa.feature.spectral_contrast(y=data, sr=sr)
features = np.concatenate((np.mean(mfccs, axis=1), np.mean(chroma_stft, axis=1), np.mean(spec_contrast, axis=1)))
return features
def load_audio_file(file_path):
"""
音声ファイルを読み込みます。
Parameters
----------
file_path : str
音声ファイルのパス
Returns
-------
input_length : int
サンプリングレートと音声の長さに基づいて計算された結果
data : np.array
音声データ
sr : int
サンプリングレート
"""
data, sr = librosa.load(file_path)
input_length = sr * librosa.get_duration(y=data, sr=sr)
return input_length, data, sr
def split_data(features, labels, test_size=0.2, random_state=42):
"""
データを訓練用データとテストデータに分割します。
Parameters
----------
features : list
特徴量のリスト
labels : list
ラベルのリスト
test_size : float
テストデータの割合
random_state : int
乱数のシード
Returns
-------
X_train, X_test, y_train, y_test : list
分割後の訓練用データとテストデータ
"""
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
def create_dataset(id, normal_files, abnormal_files, dB_set, test_ratio=0.2):
"""
データセットを作成します。音声データから特徴量を抽出し、正常・異常データを分割します。
処理に時間がかかるため、pickleファイルとして保存します。
Parameters
----------
id : str
識別子
normal_files : list
正常な音声データファイルのリスト
abnormal_files : list
異常な音声データファイルのリスト
dB_set : str
データセットのdB設定
test_ratio : float
テストデータの割合 (default: 0.2)
Returns
-------
features_train, features_test, labels_train, labels_test : list
特徴量とラベルの訓練データとテストデータ
"""
# データを保存するためのリストを初期化
features_train = []
labels_train = []
features_test = []
labels_test = []
# pickleファイルの保存先を設定
pickle_dir = f'./pickle/{dB_set}/'
pickle_file = f'{pickle_dir}{id}_train_data.pickle'
# まだ該当のpickleファイルがない場合は、新たに作成
if not os.path.isfile(pickle_file):
# 正常な音声ファイルから特徴量を抽出
for i, file in enumerate(normal_files):
# 音声ファイルを読み込む
file_path = f'data/{dB_set}/fan/{id}/normal/{file}.wav'
input_length, data, sr = load_audio_file(file_path)
# 音声データから特徴量を抽出
feature = extract_features(data, sr)
# テスト用データと訓練用データに分割
if i < len(normal_files) * test_ratio:
features_test.append(feature)
labels_test.append('normal')
else:
features_train.append(feature)
labels_train.append('normal')
# 異常な音声ファイルから特徴量を抽出
for file in abnormal_files:
# 音声ファイルを読み込む
file_path = f'data/{dB_set}/fan/{id}/abnormal/{file}.wav'
input_length, data, sr = load_audio_file(file_path)
# 音声データから特徴量を抽出
feature = extract_features(data, sr)
# 異常データは全てテスト用データとして利用
features_test.append(feature)
labels_test.append('abnormal')
# データをpickleファイルとして保存
os.makedirs(pickle_dir, exist_ok=True)
with open(pickle_file, 'wb') as file:
data = {'features_train': features_train, 'features_test': features_test, 'labels_train': labels_train, 'labels_test': labels_test}
pickle.dump(data , file)
else:
# すでにpickleファイルが存在する場合は、そのデータを読み込む
with open(pickle_file, 'rb') as file:
data = pickle.load(file)
features_train = data['features_train']
features_test = data['features_test']
labels_train = data['labels_train']
labels_test = data['labels_test']
return features_train, features_test, labels_train, labels_test
def plot_roc_curve(labels, scores, auc):
"""
ROC曲線をプロットします。
Parameters
----------
labels : list
正解ラベル
scores : list
スコア
auc : float
AUCスコア
Returns
-------
None
"""
fpr, tpr, _ = roc_curve(labels, scores)
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.show()
class DeepAutoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim, latent_dim):
"""
深層オートエンコーダのクラスです。エンコーダとデコーダが含まれます。
Parameters
----------
input_dim : int
入力次元数
hidden_dim : int
隠れ層の次元数
latent_dim : int
潜在変数の次元数
"""
super(DeepAutoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, latent_dim),
nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim),
)
def forward(self, x):
"""
順伝播を計算します。
Parameters
----------
x : torch.Tensor
入力テンソル
Returns
-------
x : torch.Tensor
出力テンソル
"""
x = self.encoder(x)
x = self.decoder(x)
return x
def train_and_evaluate(features_train, labels_train, features_test, labels_test):
"""
モデルを訓練し、評価します。
Parameters
----------
features_train : list
訓練データの特徴量
labels_train : list
訓練データのラベル
features_test : list
テストデータの特徴量
labels_test : list
テストデータのラベル
Returns
-------
auc : float
AUCスコア
cm : ndarray
混同行列
"""
# 入力をnumpy配列に変換
features_train = np.array(features_train)
features_test = np.array(features_test)
# 各種パラメータを設定
input_dim = features_train.shape[1]
hidden_dim = 32
latent_dim = 20
num_epochs = 10000
learning_rate = 0.001
# データをPyTorchのテンソルに変換
features_train_tensor = torch.from_numpy(features_train).float()
features_test_tensor = torch.from_numpy(features_test).float()
# モデル、損失関数、最適化手法を定義
autoencoder = DeepAutoencoder(input_dim, hidden_dim, latent_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=learning_rate)
# モデルの訓練
for epoch in tqdm(range(num_epochs)):
autoencoder.train()
inputs = Variable(features_train_tensor)
outputs = autoencoder(inputs)
loss = criterion(outputs, inputs)
optimizer.zero_grad()
loss.backward()
optimizer.step()
autoencoder.eval()
# 訓練データに対する再構築誤差を計算
train_outputs = autoencoder(Variable(features_train_tensor))
reconstruction_errors_train = ((features_train_tensor - train_outputs)**2).sum(dim=1).detach().numpy()
# 再構築誤差の95パーセンタイルを異常の閾値とする
anomaly_threshold = np.percentile(reconstruction_errors_train, 95)
# テストデータに対する再構築誤差を計算
test_outputs = autoencoder(Variable(features_test_tensor))
reconstruction_errors_test = ((features_test_tensor - test_outputs)**2).sum(dim=1).detach().numpy()
# 再構築誤差を正規化
max_error = np.max(reconstruction_errors_test)
reconstruction_errors_test = max_error - reconstruction_errors_test
# 再構築誤差が閾値を超えるかどうかでラベルを予測
labels_test_pred = [-1 if error < (max_error - anomaly_threshold) else 1 for error in reconstruction_errors_test]
# ラベルを数値に変換
labels_test_num = [-1 if item == 'abnormal' else 1 for item in labels_test]
# AUCと混同行列を計算
auc = metrics.roc_auc_score(labels_test_num, reconstruction_errors_test)
cm = metrics.confusion_matrix(labels_test_num, labels_test_pred)
# ROC曲線をプロット
# plot_roc_curve(labels_test_num, reconstruction_errors_test, auc)
return auc, cm
def main():
"""
メインの関数です。指定したdBのデータセットを用いて、異常検知の訓練と評価を行います。
"""
# dB設定と対象のIDリストを設定
dB_set = '6_dB_fan'
id_list = ["id_00", "id_02", "id_04", "id_06"]
# 正常音声データファイルと異常音声データファイルのリストを作成
normal_files = [str(s).zfill(8) for s in range(1, 1001)]
abnormal_files = [str(s).zfill(8) for s in range(1, 341)]
# 各IDごとにデータセットを作成し、モデルを訓練・評価
for id in id_list:
# データセットの作成
features_train, features_test, labels_train, labels_test = create_dataset(id, normal_files, abnormal_files, dB_set)
# モデルの訓練と評価
auc, cm = train_and_evaluate(features_train, labels_train, features_test, labels_test)
# 結果の表示
print("-" * 50)
print(f"{id} result")
print("AUC: ", auc)
print("Confusion Matrix: \n", cm)
if __name__ == "__main__":
main()
ノイズ成分の一番少ない、「6_dB_fan」データを使って AutoencoderとOne-Class SVMの結果と比較したのが以下になります。
ID | One-Class SVM のAUC |
One-Class SVM の混同行列 |
Autoencoder のAUC |
Autoencoder の混同行列 |
---|---|---|---|---|
id_00 | 0.942 | [[340, 0], [165, 35]] |
0.911 | [[223, 117], [16, 184]] |
id_02 | 0.999 | [[340, 0], [158, 42]] |
0.991 | [[321, 19], [7, 193]] |
id_04 | 0.999 | [[340, 0], [135, 65]] |
0.983 | [[319, 21], [11, 189]] |
id_06 | 1.000 | [[340, 0], [166, 34]] |
0.995 | [[340, 0], [18, 182]] |
結果から読み取れること
- Autoencoderの方が、One-Class SVMに比べてAUCはやや低いです(結果が悪い)。
- 一方で、混同行列を見ると、Autoencoderの方が異常データの一部の見逃しが生じていますが、正常なデータを異常と予測する数は大きく減少し,正常データを正常と判断することが多くなりました(例えば id_06の場合、異常データ 340/340 件を正しく異常と判定できており、正常データを異常と誤検知したのは 18/200 件のみになります)。
5.3. 実運用における懸念点
前節の結果を見ると、Autoencoderの方がやAUCの結果が悪いものの、この程度の差であれば、混同行列の結果から Autoencoder の方が使いやすいのではないかという印象を受けます。
ただし、実際に手を動かしていて、運用上で気になる点があったので記載します。
-
エンコーダとデコーダとしてどのようなモデルを使えばいいのか? またそのパラメータ調整はどう行えばよいのか。
今回使用したAutoencoderで使用したモデルは以下のものであり、
hidden_dim=32
(隠れ層),latent_dim=20
(圧縮後の次元)としました。self.encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, latent_dim), nn.ReLU() ) self.decoder = nn.Sequential( nn.Linear(latent_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, input_dim), )
このモデルの構造や次元数は、何度かテストデータで評価を行った結果なのですが、そもそも異常検知を行いたい場合は、今回のような検証用のテストデータが事前に得られない場合がほとんどだと思います。そのため、事前に十分なパラメータ調整を行うことができません。
そのような中で、(特にAutoencoderの方は調整項目が多いので)どのようにモデルの構造を決定するか、また、パラメータを調整するかは、かなり難しい問題だと思います。 -
正常、異常の閾値の評価
One-Class SVM でも同様の問題がありましたが、正常・異常の閾値をどうやって決めればいいのかという課題があります。Autoencoder の場合,以下の95と書いた部分が閾値になります。
anomaly_threshold = np.percentile(reconstruction_errors_train, 95)
実問題に運用する場合は、この閾値は始めは仮の値で決定し、誤検知などの様子を見つつ、調整しながら最適な値を探っていく流れになると思います。
ただし、そんな誤検知ありきのシステムを、実環境で使用することはできるのでしょうか? また、実際の現場で、そんなに異常は発生するのでしょうか?
5.4. 異常検知に挑戦してみた感想
今回の異常検知の理論自体は機械学習を触ったことがある方ならそこまで難しくなく、(ChatGPT がコードを書いてくれますし)試してみることに関しては、ハードルが高いものではないと思います。
一方で、実際の問題に対して どうやって運用していくのか (手法の選定や、正常・異常の閾値の設定など...)、そのプロセスが非常に難しい問題だと感じました。
(過去の実績データが、正常・異常含めてものすごい数あれば問題ないと思いますが...)
またデータの取り方についても、そもそものデータがノイズだらけだと、異常の検出が非常に難しくなると思います。
他の異常検出モデルとして、Isolation ForestとLOF もやってみたのですが、あまり大きな変化はなかったので割愛します。
6. おわりに
あとから「これを使っていれば検知できました」と言うのは簡単なのですが、実際に、知る前にやってみるのは大変難しい問題なんだなと感じました。
実際の企業では、これをどうやって運用しているのでしょうか...
コツコツと続けていて、ちょくちょく過去記事も見てくださっているのかアクセスがあって嬉しいです。ありがとうございます。
そろそろ「ChatGPTと始める」は取った方がいいのか悩んでいます...