2
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

LSTMによる異常検知の練習

Posted at
%matplotlib inline
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training
from chainer.training import extensions
from chainer.utils import type_check
import os
import logging
import copy
from chainer import reporter as reporter_module
import matplotlib.pyplot as plt
class Accuracy(chainer.function.Function):
    def __init__(self, eps=0.1):
        super(Accuracy, self).__init__()
        self.eps = eps
    
    def forward(self, inputs):
        xp = chainer.cuda.get_array_module(*inputs)
        y, t = inputs
        diff = np.abs(y - t)
        return xp.asarray((diff < self.eps).mean(dtype=y.dtype)),

def accuracy(y, t, eps=0.1):
    return Accuracy(eps=eps)(y, t)
class ForcePredictor(chainer.Chain):
    def __init__(self, n_input, n_units, n_layers=3):
        """次の瞬間の加圧力を推定するネットワーク

        :param int n_input: 入力要素数(電流・電圧・加圧力の3要素ないし
            シリ-ズの場合加圧力が2つなので4要素
        :param int n_units: 隠れ層の要素数
        :param n_layers: 隠れ層の数
        """
        super(ForcePredictor, self).__init__()
        with self.init_scope():
            self.lstm = L.NStepLSTM(n_layers, n_input, n_units,
                                    dropout=0.2)
            self.output_layer = L.Linear(n_units, 1)
        self.loss = None
        self.accuracy = None
        self.hx = None
        self.cx = None

    def clear(self):
        self.loss = None
        self.accuracy = None

    def reset_state(self):
        self.hx = None
        self.cx = None

    def chain(self, x):
        """推定する

        :param x: 過去のデータ
            複数の推定を同時に行うためリストを入力とする。
            リストの各項目は[n_input, t]の配列であり、
            tはリストの項目間で異なっていてもいい。
        :type x: list of :py:class:`chainer.variable.Variable`
        :return: 次の瞬間の推定値
            複数の推定結果を出力するため[n, 1]の配列。
            nは入力した項目数と等しい
        :rtype: :py:class:`chainer.variable.Variable`
        """
        assert isinstance(x, list), print(type(x))
        batch_size = len(x)
        hy, cy, ys = self.lstm(self.hx, self.cx, x)
        assert hy.shape[1] == batch_size, print(hy.shape)
        self.hx = hy
        self.cx = cy
        h = hy[-1]
        y = self.output_layer(h)
        assert y.shape[0] == batch_size, print(y.shape)
        return y

    def __call__(self, x, t):
        y = self.chain(x)
        if not isinstance(t, self.xp.ndarray):
            t = self.xp.asarray(t).reshape(y.shape)
        self.loss = F.mean_squared_error(y, t)
        self.accuracy = accuracy(y, t, 0.01)
        acc05 = accuracy(y, t, 0.05)
        acc10 = accuracy(y, t, 0.10)
        chainer.report({"loss": self.loss,
                        "accuracy": self.accuracy,
                        "acc05": acc05,
                        "acc10": acc10}, self)
        return self.loss
class SequencePredictIterator(chainer.dataset.Iterator):
    def __init__(self, dataset, batch_size, min_length=1,
                 repeat=True):
        """
        :param dataset: 2つ以上の要素を含んだタプルのデータセット、
            タプル内の最初の要素が予測対象
        :type dataset: :py.class:`chainer.datasets.TupleDataset`
        :param int batch_size: ミニバッチのサイズ
        :param int min_length: バッチ内のシーケンスの最小長さ
        :param bool repeat: 入力データセットを繰り返すかどうか
        """
        self.dataset = dataset
        self.batch_size = batch_size
        self.repeat = repeat
        self.epoch = 0
        self.is_new_epoch = False
        self._previous_epoch_detail = -1
        self.min_length = max(1, min_length)
        assert len(dataset) > self.min_length

    def __next__(self):
        if not self.repeat and self.epoch > 0:
            raise StopIteration

        self._previous_epoch_detail = self.epoch_detail
        self.epoch += 1

        if self.batch_size == 1:
            xs = self.dataset[:-1]
            t = self.dataset[-1][0]
            return xs, t

        total_length = len(self.dataset)
        min_length = self.min_length
        offsets = np.random.randint(
            0, min(int(total_length / 3 * 2),
                   total_length - min_length - 1),
            size=self.batch_size)
        lengths = [np.random.randint(self.min_length,
                                     total_length - x)
                   for x in offsets]
        indexes = list(zip(lengths, offsets))
        # indexes.sort()

        xs = [self.dataset[i:i + l] for l, i in indexes]
        t = [self.dataset[i + l][0] for l, i in indexes]
        return list(zip(xs, t))

    @property
    def epoch_detail(self):
        return self.epoch

    @property
    def previous_epoch_detail(self):
        if self._previous_epoch_detail < 0:
            return None
        return self._previous_epoch_detail

    def serialize(self, serializer):
        self.epoch = serializer('epoch', self.epoch)
        self.is_new_epoch = serializer('is_new_epoch',
                                       self.is_new_epoch)
        self._previous_epoch_detail = serializer(
            'previous_epoch_detail',
            self._previous_epoch_detail)

    def reset(self):
        self.epoch = 0
        self.is_new_epoch = False
        self._previous_epoch_detail = -1
def test_data_create(n_data=100, n_loop=3, ratio=0.8):
    """試験用に三相交流波形を模擬したデータセットを用意する

    :param int n_data: データ数
    :param int n_loop: 何周期分の波形にするか
    :param ratio: 訓練データとテストデータの比率
    :return: データセットのペア
    """
    t = np.linspace(0., 2 * np.pi * n_loop, num=n_data)
    x_train = np.array(np.sin(t)).astype(np.float32)
    y_train = np.array(np.sin(t + 2 / 3 * np.pi)).astype(
        np.float32)
    z_train = np.array(np.sin(t + 4 / 3 * np.pi)).astype(
        np.float32)
    n_train = int(n_data * ratio)
    train = chainer.datasets.TupleDataset(x_train[:n_train],
                                          y_train[:n_train],
                                          z_train[:n_train])
    test = chainer.datasets.TupleDataset(x_train[n_train:],
                                         y_train[n_train:],
                                         z_train[n_train:])
    return train, test
def convert(batch, device):
    if device is None:
        def to_device(x):
            return x
    elif device < 0:
        to_device = chainer.cuda.to_cpu
    else:
        def to_device(x):
            return chainer.cuda.to_gpu(x, device,
                                       chainer.cuda.Stream.null)

    def to_device_batch(batch_):
        if device is None:
            return batch_
        elif device < 0:
            return [to_device(np.asarray(x)) for x in batch_]
        else:
            xp = chainer.cuda.cupy.get_array_module(*batch_)
            concat = xp.concatenate(batch_, axis=0)
            sections = np.cumsum([len(x) for x in batch_[:-1]],
                                 dtype='i')
            concat_dev = to_device(concat)
            batch_dev = chainer.cuda.cupy.split(concat_dev,
                                                sections)
            return batch_dev

    x_out = to_device_batch([x for x, _ in batch])
    y_out = to_device_batch([y for _, y in batch])
    return tuple([x_out, y_out])


class Updater(training.StandardUpdater):

    def __init__(self, train_iter, optimizer, device=-1):
        super(Updater, self).__init__(
            train_iter, optimizer, converter=convert,
            device=device)

    def update_core(self):
        train_iter = self.get_iterator('main')
        optimizer = self.get_optimizer('main')

        batch = train_iter.__next__()

        xs, ts = self.converter(batch, self.device)

        x_in = [chainer.Variable(x) for x in xs]
        t_in = np.asarray(ts).reshape((-1, 1))
        loss = optimizer.target(x_in, t_in)

        optimizer.target.cleargrads()
        loss.backward()
        loss.unchain_backward()
        optimizer.update()
class Evaluator(training.extensions.Evaluator):
    def __init__(self, iterator, target, device=-1, **kwargs):
        super(Evaluator, self).__init__(iterator, target,
                                        converter=convert,
                                        device=device, **kwargs)

    def evaluate(self):
        iterator = self._iterators['main']
        target = self._targets['main']
        eval_func = self.eval_func or target

        if self.eval_hook:
            self.eval_hook(self)
        it = copy.copy(iterator)
        summary = reporter_module.DictSummary()

        for batch in it:
            observation = {}
            with reporter_module.report_scope(observation):
                xs, ts = self.converter(batch, self.device)
                x_in = [chainer.Variable(x) for x in xs]
                t_in = np.asarray(ts).reshape((-1, 1))
                eval_func(x_in, t_in)

            summary.add(observation)

        return summary.compute_mean()
# 3つの要素を入力し、100個のパラメータを持つ隠れ層を3つ重ねたネットワーク
model = ForcePredictor(n_input=3, n_units=100, n_layers=3)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
optimizer.add_hook(chainer.optimizer.GradientClipping(5))
# 訓練データの準備
t_dataset, v_dataset = test_data_create(500)
train_iter = SequencePredictIterator(t_dataset, batch_size=20,
                                     min_length=50)
val_iter = SequencePredictIterator(v_dataset, batch_size=10,
                                   min_length=50, repeat=False)
# 訓練の設定
updater = Updater(train_iter, optimizer)
trainer = training.Trainer(updater, (100, "epoch"))
eval_model = model.copy()
log_interval = (10, "epoch")
trainer.extend(Evaluator(val_iter, eval_model,
        eval_hook=lambda _: eval_model.reset_state()),
               trigger=log_interval)
trainer.extend(chainer.training.extensions.LogReport(
    trigger=log_interval))
trainer.extend(extensions.PrintReport(
    ['epoch', 'main/loss', "validation/main/loss"]),
               trigger=log_interval)
# 訓練
trainer.run()
epoch       main/loss   validation/main/loss
10          0.134788    0.0844831             
20          0.0223034   0.0225472             
30          0.0090264   0.0099782             
40          0.00516568  0.00232969            
50          0.00252769  0.000137683           
60          0.00143399  0.00344819            
70          0.00153117  8.24596e-06           
80          0.00122108  0.000140406           
90          0.000937449  5.31944e-05           
100         0.000845839  9.81018e-05           
# 推定精度確認
log = trainer.get_extension("LogReport").log.pop()
print(log["main/accuracy"], log["validation/main/accuracy"],
      log["validation/main/acc05"], log["validation/main/acc10"])
0.23500000312924385 0.6000000238418579 1.0 1.0
# 訓練結果確認
# 確認用データ作成
t = np.linspace(0., 2*np.pi*3, num=500)
X = np.sin(t, dtype=np.float32)
Y = np.sin(t + 2/3 * np.pi, dtype=np.float32)
Z = np.sin(t + 4/3 * np.pi, dtype=np.float32)
plt.plot(t, X)
plt.plot(t, Y)
plt.plot(t, Z)
plt.show()

output_12_0.png

pre_skip = 20 # 推定を始めるため最小の長さ
X_data = chainer.datasets.TupleDataset(X, Y, Z)
X_in = [np.asarray(X_data[:i+pre_skip], dtype=np.float32)
        for i in range(len(X) - pre_skip - 1)]
X_in = [chainer.variable.Variable(v) for v in X_in]
model.reset_state()
with chainer.configuration.using_config('train', False):
    X_pred = model.chain(X_in)
plt.plot(t[pre_skip + 1:], X_pred.data.reshape(-1), color="r")  # 推定結果(赤)
plt.plot(t, X, color="b")  # 正解データ(青)
plt.show()

output_13_0.png

# 正解データとの差を確認
# 完全一致は難しいので0.05以下の差は誤差として
# それ以上ずれたデータ個数の割合を算出
real_data = list()
input_data = list()
for i in range(pre_skip, len(X_data)):
    input_data.append(np.asarray(X_data[:i], dtype=np.float32))
    real_data.append(X_data[i][0])
real_data = np.asarray(real_data, dtype=np.float32)
input_data = list(map(chainer.variable.Variable, input_data))
model.reset_state()
with chainer.configuration.using_config('train', False):
    pred_data = model.chain(input_data)
diff = np.abs(pred_data.data.ravel() - real_data, dtype=np.float32)
(diff > 0.05).mean(dtype=np.float32)
0.0
# 異常データの入力を模擬して波形の一部を0に
X_err = X.copy()
X_err[100:120]=0
X_data_err = chainer.datasets.TupleDataset(X_err, Y, Z)
X_in_err = [np.asarray(X_data_err[:i+pre_skip], dtype=np.float32)
        for i in range(len(X) - pre_skip - 1)]
X_in_err = [chainer.variable.Variable(v) for v in X_in_err]
model.reset_state()
with chainer.configuration.using_config('train', False):
    X_pred_err = model.chain(X_in_err)
plt.plot(t[pre_skip + 1:], X_pred_err.data.reshape(-1), color="r")  # 推定結果(赤)
plt.plot(t, X_err, color="b")  #異常模擬データ(青)
plt.show()

output_15_0.png

# 異常データが入力されたので推定結果と入力データに差異が出ていることを確認
real_data = list()
input_data = list()
for i in range(pre_skip, len(X_data_err)):
    input_data.append(np.asarray(X_data_err[:i], dtype=np.float32))
    real_data.append(X_data_err[i][0])
real_data = np.asarray(real_data, dtype=np.float32)
input_data = list(map(chainer.variable.Variable, input_data))
model.reset_state()
with chainer.configuration.using_config('train', False):
    pred_data = model.chain(input_data)
diff = np.abs(pred_data.data.ravel() - real_data, dtype=np.float32)
(diff > 0.05).mean(dtype=np.float32)
0.058333334
2
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?