%matplotlib inline
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training
from chainer.training import extensions
from chainer.utils import type_check
import os
import logging
import copy
from chainer import reporter as reporter_module
import matplotlib.pyplot as plt
class Accuracy(chainer.function.Function):
def __init__(self, eps=0.1):
super(Accuracy, self).__init__()
self.eps = eps
def forward(self, inputs):
xp = chainer.cuda.get_array_module(*inputs)
y, t = inputs
diff = np.abs(y - t)
return xp.asarray((diff < self.eps).mean(dtype=y.dtype)),
def accuracy(y, t, eps=0.1):
return Accuracy(eps=eps)(y, t)
class ForcePredictor(chainer.Chain):
def __init__(self, n_input, n_units, n_layers=3):
"""次の瞬間の加圧力を推定するネットワーク
:param int n_input: 入力要素数(電流・電圧・加圧力の3要素ないし
シリ-ズの場合加圧力が2つなので4要素
:param int n_units: 隠れ層の要素数
:param n_layers: 隠れ層の数
"""
super(ForcePredictor, self).__init__()
with self.init_scope():
self.lstm = L.NStepLSTM(n_layers, n_input, n_units,
dropout=0.2)
self.output_layer = L.Linear(n_units, 1)
self.loss = None
self.accuracy = None
self.hx = None
self.cx = None
def clear(self):
self.loss = None
self.accuracy = None
def reset_state(self):
self.hx = None
self.cx = None
def chain(self, x):
"""推定する
:param x: 過去のデータ
複数の推定を同時に行うためリストを入力とする。
リストの各項目は[n_input, t]の配列であり、
tはリストの項目間で異なっていてもいい。
:type x: list of :py:class:`chainer.variable.Variable`
:return: 次の瞬間の推定値
複数の推定結果を出力するため[n, 1]の配列。
nは入力した項目数と等しい
:rtype: :py:class:`chainer.variable.Variable`
"""
assert isinstance(x, list), print(type(x))
batch_size = len(x)
hy, cy, ys = self.lstm(self.hx, self.cx, x)
assert hy.shape[1] == batch_size, print(hy.shape)
self.hx = hy
self.cx = cy
h = hy[-1]
y = self.output_layer(h)
assert y.shape[0] == batch_size, print(y.shape)
return y
def __call__(self, x, t):
y = self.chain(x)
if not isinstance(t, self.xp.ndarray):
t = self.xp.asarray(t).reshape(y.shape)
self.loss = F.mean_squared_error(y, t)
self.accuracy = accuracy(y, t, 0.01)
acc05 = accuracy(y, t, 0.05)
acc10 = accuracy(y, t, 0.10)
chainer.report({"loss": self.loss,
"accuracy": self.accuracy,
"acc05": acc05,
"acc10": acc10}, self)
return self.loss
class SequencePredictIterator(chainer.dataset.Iterator):
def __init__(self, dataset, batch_size, min_length=1,
repeat=True):
"""
:param dataset: 2つ以上の要素を含んだタプルのデータセット、
タプル内の最初の要素が予測対象
:type dataset: :py.class:`chainer.datasets.TupleDataset`
:param int batch_size: ミニバッチのサイズ
:param int min_length: バッチ内のシーケンスの最小長さ
:param bool repeat: 入力データセットを繰り返すかどうか
"""
self.dataset = dataset
self.batch_size = batch_size
self.repeat = repeat
self.epoch = 0
self.is_new_epoch = False
self._previous_epoch_detail = -1
self.min_length = max(1, min_length)
assert len(dataset) > self.min_length
def __next__(self):
if not self.repeat and self.epoch > 0:
raise StopIteration
self._previous_epoch_detail = self.epoch_detail
self.epoch += 1
if self.batch_size == 1:
xs = self.dataset[:-1]
t = self.dataset[-1][0]
return xs, t
total_length = len(self.dataset)
min_length = self.min_length
offsets = np.random.randint(
0, min(int(total_length / 3 * 2),
total_length - min_length - 1),
size=self.batch_size)
lengths = [np.random.randint(self.min_length,
total_length - x)
for x in offsets]
indexes = list(zip(lengths, offsets))
# indexes.sort()
xs = [self.dataset[i:i + l] for l, i in indexes]
t = [self.dataset[i + l][0] for l, i in indexes]
return list(zip(xs, t))
@property
def epoch_detail(self):
return self.epoch
@property
def previous_epoch_detail(self):
if self._previous_epoch_detail < 0:
return None
return self._previous_epoch_detail
def serialize(self, serializer):
self.epoch = serializer('epoch', self.epoch)
self.is_new_epoch = serializer('is_new_epoch',
self.is_new_epoch)
self._previous_epoch_detail = serializer(
'previous_epoch_detail',
self._previous_epoch_detail)
def reset(self):
self.epoch = 0
self.is_new_epoch = False
self._previous_epoch_detail = -1
def test_data_create(n_data=100, n_loop=3, ratio=0.8):
"""試験用に三相交流波形を模擬したデータセットを用意する
:param int n_data: データ数
:param int n_loop: 何周期分の波形にするか
:param ratio: 訓練データとテストデータの比率
:return: データセットのペア
"""
t = np.linspace(0., 2 * np.pi * n_loop, num=n_data)
x_train = np.array(np.sin(t)).astype(np.float32)
y_train = np.array(np.sin(t + 2 / 3 * np.pi)).astype(
np.float32)
z_train = np.array(np.sin(t + 4 / 3 * np.pi)).astype(
np.float32)
n_train = int(n_data * ratio)
train = chainer.datasets.TupleDataset(x_train[:n_train],
y_train[:n_train],
z_train[:n_train])
test = chainer.datasets.TupleDataset(x_train[n_train:],
y_train[n_train:],
z_train[n_train:])
return train, test
def convert(batch, device):
if device is None:
def to_device(x):
return x
elif device < 0:
to_device = chainer.cuda.to_cpu
else:
def to_device(x):
return chainer.cuda.to_gpu(x, device,
chainer.cuda.Stream.null)
def to_device_batch(batch_):
if device is None:
return batch_
elif device < 0:
return [to_device(np.asarray(x)) for x in batch_]
else:
xp = chainer.cuda.cupy.get_array_module(*batch_)
concat = xp.concatenate(batch_, axis=0)
sections = np.cumsum([len(x) for x in batch_[:-1]],
dtype='i')
concat_dev = to_device(concat)
batch_dev = chainer.cuda.cupy.split(concat_dev,
sections)
return batch_dev
x_out = to_device_batch([x for x, _ in batch])
y_out = to_device_batch([y for _, y in batch])
return tuple([x_out, y_out])
class Updater(training.StandardUpdater):
def __init__(self, train_iter, optimizer, device=-1):
super(Updater, self).__init__(
train_iter, optimizer, converter=convert,
device=device)
def update_core(self):
train_iter = self.get_iterator('main')
optimizer = self.get_optimizer('main')
batch = train_iter.__next__()
xs, ts = self.converter(batch, self.device)
x_in = [chainer.Variable(x) for x in xs]
t_in = np.asarray(ts).reshape((-1, 1))
loss = optimizer.target(x_in, t_in)
optimizer.target.cleargrads()
loss.backward()
loss.unchain_backward()
optimizer.update()
class Evaluator(training.extensions.Evaluator):
def __init__(self, iterator, target, device=-1, **kwargs):
super(Evaluator, self).__init__(iterator, target,
converter=convert,
device=device, **kwargs)
def evaluate(self):
iterator = self._iterators['main']
target = self._targets['main']
eval_func = self.eval_func or target
if self.eval_hook:
self.eval_hook(self)
it = copy.copy(iterator)
summary = reporter_module.DictSummary()
for batch in it:
observation = {}
with reporter_module.report_scope(observation):
xs, ts = self.converter(batch, self.device)
x_in = [chainer.Variable(x) for x in xs]
t_in = np.asarray(ts).reshape((-1, 1))
eval_func(x_in, t_in)
summary.add(observation)
return summary.compute_mean()
# 3つの要素を入力し、100個のパラメータを持つ隠れ層を3つ重ねたネットワーク
model = ForcePredictor(n_input=3, n_units=100, n_layers=3)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
optimizer.add_hook(chainer.optimizer.GradientClipping(5))
# 訓練データの準備
t_dataset, v_dataset = test_data_create(500)
train_iter = SequencePredictIterator(t_dataset, batch_size=20,
min_length=50)
val_iter = SequencePredictIterator(v_dataset, batch_size=10,
min_length=50, repeat=False)
# 訓練の設定
updater = Updater(train_iter, optimizer)
trainer = training.Trainer(updater, (100, "epoch"))
eval_model = model.copy()
log_interval = (10, "epoch")
trainer.extend(Evaluator(val_iter, eval_model,
eval_hook=lambda _: eval_model.reset_state()),
trigger=log_interval)
trainer.extend(chainer.training.extensions.LogReport(
trigger=log_interval))
trainer.extend(extensions.PrintReport(
['epoch', 'main/loss', "validation/main/loss"]),
trigger=log_interval)
# 訓練
trainer.run()
epoch main/loss validation/main/loss
10 0.134788 0.0844831
20 0.0223034 0.0225472
30 0.0090264 0.0099782
40 0.00516568 0.00232969
50 0.00252769 0.000137683
60 0.00143399 0.00344819
70 0.00153117 8.24596e-06
80 0.00122108 0.000140406
90 0.000937449 5.31944e-05
100 0.000845839 9.81018e-05
# 推定精度確認
log = trainer.get_extension("LogReport").log.pop()
print(log["main/accuracy"], log["validation/main/accuracy"],
log["validation/main/acc05"], log["validation/main/acc10"])
0.23500000312924385 0.6000000238418579 1.0 1.0
# 訓練結果確認
# 確認用データ作成
t = np.linspace(0., 2*np.pi*3, num=500)
X = np.sin(t, dtype=np.float32)
Y = np.sin(t + 2/3 * np.pi, dtype=np.float32)
Z = np.sin(t + 4/3 * np.pi, dtype=np.float32)
plt.plot(t, X)
plt.plot(t, Y)
plt.plot(t, Z)
plt.show()
pre_skip = 20 # 推定を始めるため最小の長さ
X_data = chainer.datasets.TupleDataset(X, Y, Z)
X_in = [np.asarray(X_data[:i+pre_skip], dtype=np.float32)
for i in range(len(X) - pre_skip - 1)]
X_in = [chainer.variable.Variable(v) for v in X_in]
model.reset_state()
with chainer.configuration.using_config('train', False):
X_pred = model.chain(X_in)
plt.plot(t[pre_skip + 1:], X_pred.data.reshape(-1), color="r") # 推定結果(赤)
plt.plot(t, X, color="b") # 正解データ(青)
plt.show()
# 正解データとの差を確認
# 完全一致は難しいので0.05以下の差は誤差として
# それ以上ずれたデータ個数の割合を算出
real_data = list()
input_data = list()
for i in range(pre_skip, len(X_data)):
input_data.append(np.asarray(X_data[:i], dtype=np.float32))
real_data.append(X_data[i][0])
real_data = np.asarray(real_data, dtype=np.float32)
input_data = list(map(chainer.variable.Variable, input_data))
model.reset_state()
with chainer.configuration.using_config('train', False):
pred_data = model.chain(input_data)
diff = np.abs(pred_data.data.ravel() - real_data, dtype=np.float32)
(diff > 0.05).mean(dtype=np.float32)
0.0
# 異常データの入力を模擬して波形の一部を0に
X_err = X.copy()
X_err[100:120]=0
X_data_err = chainer.datasets.TupleDataset(X_err, Y, Z)
X_in_err = [np.asarray(X_data_err[:i+pre_skip], dtype=np.float32)
for i in range(len(X) - pre_skip - 1)]
X_in_err = [chainer.variable.Variable(v) for v in X_in_err]
model.reset_state()
with chainer.configuration.using_config('train', False):
X_pred_err = model.chain(X_in_err)
plt.plot(t[pre_skip + 1:], X_pred_err.data.reshape(-1), color="r") # 推定結果(赤)
plt.plot(t, X_err, color="b") #異常模擬データ(青)
plt.show()
# 異常データが入力されたので推定結果と入力データに差異が出ていることを確認
real_data = list()
input_data = list()
for i in range(pre_skip, len(X_data_err)):
input_data.append(np.asarray(X_data_err[:i], dtype=np.float32))
real_data.append(X_data_err[i][0])
real_data = np.asarray(real_data, dtype=np.float32)
input_data = list(map(chainer.variable.Variable, input_data))
model.reset_state()
with chainer.configuration.using_config('train', False):
pred_data = model.chain(input_data)
diff = np.abs(pred_data.data.ravel() - real_data, dtype=np.float32)
(diff > 0.05).mean(dtype=np.float32)
0.058333334