LoginSignup
noname20220504
@noname20220504

Are you sure you want to delete the question?

Leaving a resolved question undeleted may help others!

オートエンコーダで異常検知を行う際に、loss・val_lossが発散してしまいます

解決したいこと

図1のような波形データを対象に、オートエンコーダによる教師なし学習で異常検知したいと思っています。
具体的には、
1.時刻歴波形のcsv(500次元のデータセット(0~0.05secの時刻歴データを0.0001secおきにプロットしたもの))を学習データとして読み取る。
2.1で読み取った学習データをオートエンコーダを用いて75次元まで圧縮する。
3.テストデータを1と同様の手順で読み取る。
4.3.で読み取った学習データを2.で生成したモデルを介して、75次元まで圧縮する。
5.4.をさらに逆変換し、元のデータと同次元のデータに復元する。
6.復元前後の残差平方和から異常の有無を判定する
という流れになります。
スクリーンショット 2022-05-05 224818.png
図1

発生している問題・エラー

loss, val_lossともに図2のように発散してしまいます。収束させるにはどうすれば良いか教えてください。

loss-epoch.png
図2

該当するソースコード

from tensorflow.keras.layers import Input,Dense
from tensorflow.keras.models import Model
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler

# AutoEncoder
input_layer=Input(shape=(500,))
encoded=Dense(250,activation="relu")(input_layer)
encoded=Dense(125,activation="relu")(encoded)
encoded=Dense(60,activation="relu")(encoded)
decoded=Dense(125,activation="relu")(encoded)
decoded=Dense(250,activation="relu")(encoded)
output_layer=Dense(500,activation="sigmoid")(decoded)

model=Model(inputs=input_layer,outputs=output_layer)
model.compile(loss='binary_crossentropy',optimizer="adam")
#model.compile(loss='mse',optimizer="adam")

dftrain=pd.read_csv('train.csv') #学習データ:計30
print("dftrain", dftrain.shape)
dataset_xtr=dftrain.drop(["Case","Class"],axis=1)
#dataset_xtr=np.abs(dataset_xtr)
#sc = StandardScaler()
#dataset_xtr=sc.fit_transform(dataset_xtr)
print("dataset_xtr", dataset_xtr.shape)
dataset_ytr=dftrain["Class"]

x_train,x_val,y_train,y_val=train_test_split(dataset_xtr,dataset_ytr,test_size=0.5,random_state=0)

dftest=pd.read_csv('test.csv') #テストデータ:計150
x_test=dftest.drop(["Case","Class"],axis=1)
#x_test=np.abs(x_test)
#x_test=sc.fit_transform(x_test)
y_test=dftest["Class"]

history = model.fit(x_train,x_train,epochs=1000, batch_size=4, validation_split=0.2)

nnn=1.96
x_val_inv=model.predict(x_val)
diff_tr=np.sum((np.array(x_val)-np.array(x_val_inv))**2,axis=1)
thresholder=(diff_tr.mean()+nnn*(diff_tr.std()))
preds=[]

#異常の有無を判定
x_test_inv=model.predict(x_test)
diff_test=np.sum((np.array(x_test)-np.array(x_test_inv))**2,axis=1)
for j in range(len(diff_test)):
    if diff_test[j]>thresholder:
        preds.append(1) #1:異常 0:正常
    elif diff_test[j]<=thresholder:
        preds.append(0)

print(classification_report(y_test,preds))
print(confusion_matrix(y_test,preds))
np.savetxt("out.txt",preds)

#ROC curve
cnt=0
point=[0,0]
roc=[]
roc.append([0,0])
rank=list(reversed(np.argsort(diff_test)))
error=np.sum((y_test.astype("int")==1))
for ridx in rank:
    if y_test[ridx]==1:
        point[1]=point[1]+1
        cnt=cnt+1
    else:
        point[0]=point[0]+1
    roc.append([point[0],point[1]])
    if cnt==error:
        roc.append([y_test.shape[0],point[1]])
        break
roc=np.array(roc)
np.savetxt("roc.txt",roc)
plt.plot(roc[:,0]/y_test.shape,roc[:,1]/error)
plt.plot([0,1],color='black',linestyle='dashed')
plt.savefig("ROC_curve.png")
plt.show()

#loss and val_loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.savefig("loss-epoch.png")
plt.show()

自分で試したこと

活性化関数をreluからsigmoidに変える、lossの評価関数をbinary_crossentropyからmseに変えましたが解決に至っておりません。なお、評価関数をmseに変えた際は、一応収束しました(図3参照。ただし、train/testで乖離がみられること、epoch=700付近で一時的にlossが増大するといった事象が確認されました。)
また、元データについても、絶対値をとって負値をなくす・標準化を行うなどしましたが、効果がみられませんでした。ちなみに、波形をcsvではなく画像として認識した場合には、loss, val_lossともにほぼ同値で収束しました。

loss-epoch.png
図3

ROC曲線自体は、図4のようになっており、異常検知自体は概ね正しく行われていると思われます。
ROC_curve.png
図4

0

6Answer

初めまして,減衰振動を見かけて居てもたってもいられず解答する形になります.参考になれば幸いです.

アプローチの良かった点と悪かった点,謎な点を述べたうえで,改善案を示したいと思います.

  • 良かったと思う点
    • MSEを使ったこと
  • 悪かったと思う点
    • Binary CrossEntropyを使ったこと
    • ReLUを使ったこと
    • Sigmoidを使ったこと
  • 謎な点
    • BCEからMSEへの変更,標準化によるデータの加工で改善しなかったこと

異常検知の原理からの観点

オートエンコーダを用いた波形の異常検知では,オートエンコーダに特徴量抽出を行わせて,元の波形を復元するという手法をとっています.

そこで問題になるのがReLU関数を用いた入出力層です.この関数は

$$
\mathrm{ReLU}(x) = \mathrm{max}(0, x)
$$

であることからもわかる通り,負の値を0に置き換えて無視します.その上,負の値を出力することはありません.

今回対象としている波形は負の値をとることもある減衰振動となっています.
ReLUを入力層に用いた学習では,負の値を使わずに学習が進むことになってしまい,不適切な選択だと考えられます.また,負の値を出力することもないため,元の波形を復元するに至りません.

次に実践してみますので,そちらを参考に話を進めたいと思います.

データの生成

減衰振動の関数と,異常な波形の例としてただの三角関数を用意しました.
次のコードで生成を行います.

import csv
import numpy as np
from numpy.random import randn
from matplotlib import pyplot as plt

def DampedVibration(x):
    return (1 + randn(1) * 0.125) * np.exp(-(1 + randn(1) * 0.125) * x) * np.cos(8 * x + randn(1) * 0.25) + randn(len(x)) * 0.03125

def Trigonometric(x):
    return (1 + randn(1) * 0.125) * np.cos(8 * x + randn(1) * np.pi) + randn(len(x)) * 0.03125

class DataGenerator:
    def __init__(self):
        self.x = np.linspace(0, 2 * np.pi, 512)
        self.y = list()
    
    def append(self, f, num):
        for _ in range(num):
            self.y.append(f(self.x))
    
    def plot(self):
        for y in self.y:
            plt.plot(self.x, y)
        plt.grid()
        plt.show()
        plt.close()
    
    def save(self, file_name):
        with open(file_name, mode = "w", encoding = "utf-8", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(self.x)
            writer.writerows(self.y)

if __name__  == "__main__":
    for func, file_name in zip([DampedVibration, Trigonometric], ["train.csv", "trigonometric.csv"]):
        datagen = DataGenerator()
        datagen.append(func, 30)
        datagen.plot()
        datagen.save(file_name)

得られる波形は次のようになります.各波形につき30個出力しています.(nonameさんのコードでは教師データが30とありますが300の間違いでしょうか.一応信じて30個にしています.)
dv_wave.png
tri_wave.png

実験

今回の問題は異常検知で,元の波形をオートエンコーダで出力させることを目的としているので,二値分類に用いるBinary CrossEntropyは不適です.損失関数をMSEにして話を進めます.

import csv
import numpy as np

from keras.layers import Dense, Input
from keras.models import Model

from matplotlib import pyplot as plt

def read_csv(file_name):
    ret = list()
    with open(file_name, "r") as f:
        reader = csv.reader(f)
        header = list(map(float, next(reader)))
        for row in reader:
            ret.append(np.array(list(map(float, row))))
    return header, np.array(ret)

header, x_train = read_csv("train.csv")

def AutoEncoder():
    kwargs = {
        "activation": "relu",
        "kernel_initializer": "he_normal"
    }
    inputs = Input(shape = (512,))
    encoded = Dense(256, **kwargs)(inputs)
    encoded = Dense(128, **kwargs)(encoded)
    encoded = Dense(64, **kwargs)(encoded)
    decoded = Dense(128, **kwargs)(encoded)
    decoded = Dense(256, **kwargs)(decoded)
    outputs = Dense(512, **kwargs)(decoded)
    return Model(inputs = inputs, outputs = outputs)

model = AutoEncoder()
model.compile(
    loss = 'mse',
    optimizer = 'adam',
    metrics = ['mae', 'mse']
)

epochs = 128
batch_size = 4

history = model.fit(
    x_train, x_train,
    epochs = epochs,
    batch_size = batch_size,
    validation_split = 0.2
)

epoch_arr = range(epochs)
plt.figure(figsize = (16, 5))
for i, param in enumerate(["loss", "mae", "mse"]):
    plt.subplot(1, 3, i + 1)
    plt.plot(epoch_arr, history.history[param], label = param)
    plt.plot(epoch_arr, history.history["val_" + param], label = "val_" + param)
    plt.ylim(bottom = 0)
    plt.legend()
    plt.grid()
plt.show()

_, x_test = read_csv("trigonometric.csv")
x_train_flipped = np.fliplr(x_train).copy()

err = list()
plt.figure(figsize = (16, 5))
titles = ["DampedVibration", "Trigonometric", "DV-Flipped"]
for i, (x, t) in enumerate(zip([x_train, x_test, x_train_flipped], titles)):
    x_pred = model.predict(x, batch_size = batch_size)
    err.append(np.sum((np.array(x) - np.array(x_pred)) ** 2, axis = 1))
    plt.subplot(1, 3, i + 1)
    plt.plot(header, x[0], label = "Input")
    plt.plot(header, x_pred[0], label = "Output")
    plt.title(t)
    plt.legend()
    plt.grid()
plt.show()

plt.figure(figsize = (7, 4))
plt.hist(err, bins = 64, label = titles, stacked = True)
plt.legend()
plt.grid()
plt.show()

上記のコードを用いて得られた結果が次になります.

loss.png

損失関数は収束しており,

io.png

予想した通り,波形は正の部分しか出力できておりません.

result.png

残差平方和からは,減衰振動と発散振動の区別が難しいようです.

Sigmoid関数に変更してみる

以下のようにオートエンコーダの定義だけ変更してみます.

def AutoEncoder():
    kwargs = {
        "activation": "sigmoid",
        "kernel_initializer": "he_normal"
    }
    inputs = Input(shape = (512,))
    encoded = Dense(256, **kwargs)(inputs)
    encoded = Dense(128, **kwargs)(encoded)
    encoded = Dense(64, **kwargs)(encoded)
    decoded = Dense(128, **kwargs)(encoded)
    decoded = Dense(256, **kwargs)(decoded)
    outputs = Dense(512, **kwargs)(decoded)
    return Model(inputs = inputs, outputs = outputs)

次の結果が得られました.

sigmoid_only_loss.png

先ほどと同様,正の値しか出力できていません.

sigmoid_only_io.png

しかし,少しだけ減衰振動と発散振動の区別がつきやすくなったように感じます.

sigmoid_only.png

tanh関数に変更してみる

以下のようにオートエンコーダの定義だけ変更してみます.

def AutoEncoder():
    kwargs = {
        "activation": "tanh",
        "kernel_initializer": "he_normal"
    }
    inputs = Input(shape = (512,))
    encoded = Dense(256, **kwargs)(inputs)
    encoded = Dense(128, **kwargs)(encoded)
    encoded = Dense(64, **kwargs)(encoded)
    decoded = Dense(128, **kwargs)(encoded)
    decoded = Dense(256, **kwargs)(decoded)
    outputs = Dense(512, **kwargs)(decoded)
    return Model(inputs = inputs, outputs = outputs)

次の結果が得られました.
損失は先ほどの2パターンと比べて0に収束しています.

loss.png

元の波形が減衰振動でない場合は,ただのノイズのような波形が出力されることがわかりました.

io.png

残差平方和からも,先ほどと比べて区別がつきやすくなっております.

result.png

出力層のみLinear,他はすべてtanh

tanhのみの場合と比較して良い結果になりました.

def AutoEncoder():
    kwargs = {
        "activation": "tanh",
        "kernel_initializer": "he_normal"
    }
    inputs = Input(shape = (512,))
    encoded = Dense(256, **kwargs)(inputs)
    encoded = Dense(128, **kwargs)(encoded)
    encoded = Dense(64, **kwargs)(encoded)
    decoded = Dense(128, **kwargs)(encoded)
    decoded = Dense(256, **kwargs)(decoded)
    kwargs["activation"] = "linear"
    outputs = Dense(512, **kwargs)(decoded)
    return Model(inputs = inputs, outputs = outputs)

i_tanh_o_linear_mid_tanh.png

tanhは$\pm 1$の値を出力するのが限界ですので,それを上回る教師データもあることも鑑みた結果,こちらの方がうまくいったと考えられます.

結論

おそらく,nonameさんが「うまくいかなかった」というのは,設定した閾値thresholderがうまく分離するように働かなかった,すなわちReLU関数やSigmoid関数のときのような負値を出力していない状態であったことや,標準化前ではSigmoidを使った場合において$\pm 1$の範囲でしか出力できないデコーダの状態で異常検知をしようとしていたからなのではないでしょうか.

問題設定に応じて,適切な活性化関数や損失関数を選ぶべきだと考えます.
今回の問題であれば,出力層は負の値を出力できるtanhが適していると考えることができます.また標準化を行わないで学習するのであれば入出力層はLinearにするべきだと考えます.

本解答では示していませんが,中間層にReLUファミリーの活性化関数を用いたケースを検討しましたが,あまりうまくいきませんでした.中間層のサイズを増大させた場合に,tanhほどではありませんがある程度の分離ができましたこと,報告いたします.

0

ご教示いただきありがとうございます。自分の代わりに検討いただいて恐縮です。
ネットに転がってるサンプルコードを見よう見まねで書いたという感じで、活性化関数の目的や使い分けについて不勉強だったと思います。
頂いたご助言をもとに、損失関数をmseまたはmaeに, 入出力層にlinear, 中間層にtanhを用いたところ、発散することなく、0に向かって収束する傾向が捉えられました。
ただ、学習データ数を30から200程度に増やしてもlossとval_lossで差があること、0.2~0.5と比較的高めの値で収束するといった事象は変わりませんでした。時刻歴データをFFTして周波数応答スペクトルに変換するなどして、時刻歴データの処理方法についても今後検討できればと思います。

loss-epoch.png

0

なるほど,コードに意図がわからないものがあった点の理由がわかりました.
いくつか疑問とその改善案を提案しておきます.
また,この改善によってlossとval_lossが依然大きいと思われている状態が良くなれば幸いです.

学習(train)データと検証(validation)データの切り分けについて

普通,testデータとも併せて次のように考えてデータを扱う必要があります.

  • train
    • 実際にニューラルネットワークの重みを更新させる学習データ
  • validation
    • ハイパーパラメータの良し悪しを確かめる検証データ.学習には使わない.
  • test
    • 学習後に汎化性能を確かめるテストデータ.学習には使わない.

その中で,nonameさんのコードでは,次のような記述がありました.

x_train,x_val,y_train,y_val=train_test_split(dataset_xtr,dataset_ytr,test_size=0.5,random_state=0)

# 中略

history = model.fit(x_train,x_train,epochs=1000, batch_size=4, validation_split=0.2)

ここで,train_test_split()ではtest_size = 0.5として元データdataset_xtr, dataset_ytrの半分をtrainデータとしており,さらにmodel.fit()ではvalidation_split = 0.2としており,渡されたtrainデータx_trainの20%をvalidationデータとするようにしています.

元のデータサイズが30でしたが,これの半分15個をx_trainとし,さらにmodel.fit()の際にはこれの20%であるたった3個をvalidationデータとして扱うように指定しています.

testデータはtest.csvに格納されているため,2度分ける必要はありません.

train_test_split()でvalidationデータを作成した場合は,

x_train, x_val, y_train, y_val = train_test_split(
    dataset_xtr, dataset_ytr,
    test_size = 0.2,
    random_state = 0
)

# 中略

history = model.fit(
    x_train, x_train,
    epochs = 1000,
    batch_size = 4,
    validation_data = (x_val, x_val) # ここを変更
)

としてあげる必要があります.このコードは,後々のクロスバリデーションの際に有効です.

また,train_test_split()を用いなくとも,model.fit()の際にvalidation_split = 0.2を指定してあげることでvalidationデータとしてtrainデータの後半20%を切り分けてくれます.

# x_train, x_val, y_train, y_val = train_test_split(
#     dataset_xtr, dataset_ytr,
#     test_size = 0.2,
#     random_state = 0
# )

# 中略

history = model.fit(
    dataset_xtr, dataset_xtr,
    epochs = 1000,
    batch_size = 4,
    validation_split = 0.2
)

このコードは,クロスバリデーションに応用しづらいですが,train_test_split()を必要としないので記述が楽です.

本件の場合,上で示したいずれかのコードにする必要があると考えます.
元のコードのままでは,教師データdftrainの半分を使わないまま,4割をtrainデータ,1割をvalidationデータとして使っている状態になります.非常に勿体無いです.

与えるデータの性質

データセットの列にClassがあるのをお見受けします.また,それをy_trainy_testとしていることから,異常か否かを示すラベルだと考えましたが当たっていますでしょうか.その前提で話を進めます.

まず,オートエンコーダを用いた異常検知では,異常データが少ないことを前提に,異常データを学習に使わず(少ないから使えない),正常データのみをオートエンコーダに学習させ,異常データの波形を入力したときに異常な波形を再現することができず入出力の残差平方和が増大するという性質を用いて異常検知を行います.先に私が示したようなものと同様のことが起きるはずです.

nonameさんのコードを見る限りでは,train.csvに正常データも異常データも両方とも入っていることから,そのまま両方ともオートエンコーダに学習させてしまっていないでしょうか.

これでは,どちらの波形も学習することで,正常な波形も異常な波形もオートエンコーダが再現できてしまい,区別することができません.

したがって,教師データであるtrain.csvから,正常な波形と異常な波形を切り分ける必要があると考えます.

逆に,今回のようなラベルが割り振られているケースではオートエンコーダを使うという希望を捨てて,ただの分類器を作成して使うという方針も可能です.

今回作成したオートエンコーダはちょっとサイズ感を縮小していますが,次のような形状をしています.

スクリーンショット 2022-05-10 13.39.50.png

(図では32->16->8->16->32のユニット数になっていますが)実際に作ったモデルでは512次元のデータを64次元に削減して,また512次元に復元するという感じです.

分類器にするために次のようなネットワークを構築すると考えましょう.
スクリーンショット 2022-05-10 13.41.00.png

512次元のデータを1次元にまで削減してしまうという感じです.
結果,1つの出力ができ,この出力値が0か1かの2値で正常か異常かを識別できる分類器になります.
このときには誤差関数をbinary_crossentropyにする必要があります.名前の通り2値用です.
モデルは次のように書くことができます.

def Classifier():
    kwargs = {
        "activation": "relu",
        "kernel_initializer": "glorot_normal"
    }
    inputs = Input(shape = (512,))
    x = Dense(256, **kwargs)(inputs)
    x = Dense(128, **kwargs)(x)
    x = Dense(64, **kwargs)(x)
    x = Dense(32, **kwargs)(x)
    x = Dense(16, **kwargs)(x)
    x = Dense(8, **kwargs)(x)
    x = Dense(4, **kwargs)(x)
    kwargs["activation"] = "sigmoid"
    outputs = Dense(1, **kwargs)(x)
    return Model(inputs = inputs, outputs = outputs)

1出力で2値分類で分類器を作る場合の出力は基本的にsigmoid関数で作成します.

このモデルを学習するときは

x_train, x_val, y_train, y_val = train_test_split(
    dataset_xtr, dataset_ytr,
    test_size = 0.2,
    random_state = 0
)

# 中略

history = model.fit(
    x_train, y_train,
    epochs = 1000,
    batch_size = 4,
    validation_data = (x_val, y_val)
)

または

history = model.fit(
    dataset_xtr, dataset_ytr,
    epochs = 1000,
    batch_size = 4,
    validation_split = 0.2
)

ですね.

とにかく,

  • 正常データと異常データを切り分けて,正常データのみをオートエンコーダに学習させる.
  • 正常データと異常データを区別する分類器を作成して学習させる.

の2択,いずれか取る必要があると考えました.異常データが少ないときは前者を,異常データも正常データと同じぐらいあるときは後者を選ぶことができます.

終わりに

波形をFFTして周波数応答スペクトルを見るという考えは非常に良いアプローチだと思います.
もっとPythonで機械学習のコードを作成する能力が高まれば,元の波形とFFT波形の両方を入力として受け取り,それぞれ学習できるように作れるはずです.

質問になりますが,先に提示していただいた「lossとval_lossで差があること,0.2~0.5と比較的高めの値で収束する」といった事象があったときには,データの正規化・標準化は行ったのでしょうか.

  • 行っていない場合
    • 元のデータの範囲が$\pm 6$ぐらいであることから,MSEが$0.2\sim 0.5$であることに対して比較的小さい値である.
    • MSEから単位を合わせてRMSEにしても$0.45\sim 0.71$であり,元データの範囲から$7.45\sim 11.8$%程度の差しかない.
    • なのでノイズの強度次第では気にしなくて良いと考えることもできる.
  • StandardScaler()で標準化を行っている場合
    • データが各列で標準化されるので上のコメントアウトされているコードでは絶対にうまくいかない
    • すなわち0秒地点同士,1秒地点同士といった各列で標準化を行うので,時系列データに対して不適.
    • 実際にやってみると,私が最初に示した減衰振動の波形が次のようになる.

std_ed.png

いずれにせよ,まだまだ改善の余地はありそうです.

0

何度もご教示いただきありがとうございます。
異常検知については、

正常データのみをオートエンコーダに学習させる
方になります。

こちらが考えていた異常検知の流れは以下のような感じです。
① dataset_xtrのうち半分のデータ(x_train)を使って、AEを生成
② ①で生成したAEに対して、dataset_xtrの残り半分(x_val)を適用し、時刻歴波形を再現する(再現後の波形データがx_val_invに相当します)
③ 正常データの再現前後で取りうる差分の範囲から、正常範囲を推定(≒thresholdを計算)
④ テストデータx_testを①で生成したAEで、②と同様に再現波形(x_test_inv)を生成し、差分を求める。(差分が③で推定した正常範囲を超過したら異常とみなす)

上記の②③の関係で、データを二度も分割していました。ただ、仰るように、現状のままだと学習で使わないデータのほうが多くなってしまうと認識しています。

また、後半の質問ですが、当該事象には正規化や標準化は行っていません。
最初の質問で標準化について述べておりましたが、最初の質問であるlossの発散と関わりがあるとおもいとりあえずやってみたという感じでした。

0

なるほど,そういうことだったのですね.

であればx_testに対して用いるthresholdを計算するときには,x_valを使うべきではないと考えます.ちゃんとx_testの結果に対してthresholdを求め,汎化性能を確かめるべきです.

あくまでx_valは学習曲線の経過を見るのに使うのがセオリーな気がします.あとvalidationデータをtrainデータの半分も割り当てるのはあまり見たことないです...

testデータを用いたthresholdの算出及びそこから異常だと考えられる波形のプロットをしてみました.

import csv
import numpy as np

from keras.layers import Dense, Input
from keras.models import Model

from matplotlib import pyplot as plt

def read_csv(file_name):
    ret = list()
    with open(file_name, "r") as f:
        reader = csv.reader(f)
        header = list(map(float, next(reader)))
        for row in reader:
            ret.append(np.array(list(map(float, row))))
    return header, np.array(ret)

def SquaredError(y, y_pred):
    return np.sum((np.array(y) - np.array(y_pred)) ** 2, axis = 1)

header, x_train = read_csv("train.csv")
_, x_test = read_csv("test.csv")

def AutoEncoder():
    kwargs = {
        "activation": "tanh",
        "kernel_initializer": "glorot_normal"
    }
    inputs = Input(shape = (512,))
    encoded = Dense(512, **kwargs)(inputs)
    encoded = Dense(256, **kwargs)(encoded)
    encoded = Dense(128, **kwargs)(encoded)
    encoded = Dense(64, **kwargs)(encoded)
    decoded = Dense(128, **kwargs)(encoded)
    decoded = Dense(256, **kwargs)(decoded)
    decoded = Dense(512, **kwargs)(decoded)
    kwargs["activation"] = 'linear'
    outputs = Dense(512, **kwargs)(decoded)
    return Model(inputs = inputs, outputs = outputs)

model = AutoEncoder()
model.compile(
    loss = 'mse',
    optimizer = 'adam',
    metrics = ['mae', 'mse']
)

epochs = 128
batch_size = 8

history = model.fit(
    x_train, x_train,
    epochs = epochs,
    batch_size = batch_size,
    validation_split = 0.2
)

epoch_arr = range(epochs)
plt.figure(figsize = (16, 5))
for i, param in enumerate(["loss", "mae", "mse"]):
    plt.subplot(1, 3, i + 1)
    plt.plot(epoch_arr, history.history[param], label = param)
    plt.plot(epoch_arr, history.history["val_" + param], label = "val_" + param)
    plt.title(param)
    plt.ylim(bottom = 0)
    plt.legend()
    plt.grid()
plt.show()

plt.figure(figsize = (7, 4))
x_pred = model.predict(x_test, batch_size = batch_size)
err = SquaredError(x_test, x_pred)
thresholder = err.mean() + 1.96 * err.std()
preds = np.array([1 if e > thresholder else 0 for e in err])

for i in np.argsort(err):
    plt.plot(header, x_test[i], color = ["blue", "red"][preds[i]])
plt.xlabel("time")
plt.ylabel("Amplitude")
plt.grid()
plt.show()

n, bins, _ = plt.hist([err[preds == 0], err[preds == 1]], bins = 64, label = ["Normal", "AbNormal"], stacked = True)
plt.vlines(thresholder, ymin = 0, ymax = n.max(), colors = "black",
    linestyles = "dashdot", label = f"threshold: {thresholder:.3f}")
plt.legend()
plt.xlabel("Squared Error")
plt.ylabel("Histogram")
plt.grid()
plt.show()

適当に生成したデータなのでこれといった異常は直感的にはなくとも,thresholderで定義された閾値を超えたデータを赤色で示すことができました.ですがやはり三角関数を入れたときのような100を超える二乗和誤差は出ず,ほとんど元の波形を再現することがわかりました.

loss.png

青波: 閾値未満,赤波: 閾値超えした入力波形のプロットになります.

wave.png

histogram.png

0

丁寧なご返信いただき感謝申し上げます。追記いただいたコードで、
err = SquaredError(x_test, x_pred)
thresholder = err.mean() + 1.96 * err.std()
とありますが、x_testという正常か異常か不明なデータからthresholderを計算しているという理解で宜しいでしょうか?
素人考えですが、仮にx_testがすべて異常なデータだった場合、正常なデータが含まれる場合と比較してthresholderが大きなることで、異常なのに正常と誤判定するのではないかと気になりました。

また、x_test, x_valについて以下補足させてください。
x_test:判定したいデータ(正常or異常が不明)
dataset_xtr:正常であることがあきらかなデータ
→ x_trainをtrain_test_splitでAEのモデル生成用(x_train, 80%):異常判定の閾値設定用(x_val, 20%)に分割
→ AEのモデル生成用データ(x_train)を、model.fit中のvalidation_dataでさらに学習用(80%)と検証用(20%)に分割
x_test, x_valという変数名のせいで分かりづらくなってしまい申し訳ありませんが、いわゆる機械学習のテストデータ、検証用データとは性格が異なるものになります。
(イメージとしては、正常であることが明確なデータx_trainをもとにAEを生成し、AEの生成に用いなかった正常データx_valから正常値の上限thresholderを求めるという感じになります。)

ちなみに、本件のコードは下記サイトを参考に作成しました。
https://www.medi-08-data-06.work/entry/cae_cnn

0

Your answer might help someone💌