AIに車を運転させたい。それは技術者の夢である。
流行りのニューラルネットで運転させたい。それはサラリーマンの夢である。
制御に出てくる関数
NNでの関数近似でよく見かけるのがsin関数など関数の即出力が決まり、出力履歴が残らない内容。
これに対して車両等、運動を伴う現象には履歴が付きまとう。
ステアリング操作に対する車両の応答ですら、ゆっくり後からしかも履歴を伴ってやってくる。
画像引用元:2012-076477号 車両用舵角制御装置 - astamuse
もし、こういった応答性が鈍く履歴が残る関数を近似表現するNNを定式化できれば、
Deep Q learningなどのネットワーク設計に役立つのではないかと思う。
1次遅れ系の離散化
先の例は2次遅れ系で車両の運動を表現していたが、もっと簡素な例として1次遅れ系を取り上げる。
G(s) = \frac{1}{\tau s + 1}
これをΔT[sec]のサンプリングレートでもって離散化すると1
G(z) = \frac{(1-e^{-dT/\tau})z^{-1}}{1-e^{-dT/\tau}z^{-1}}
であって、x[n]を入力、y[n]を出力とした漸化式で表現すると
y[n] = (1-e^{-dT/\tau}) \cdot x[n-1] + e^{-dT/\tau} \cdot y[n-1]
となる。
入力と出力の組み合わせと、さらにその性質が1次遅れだと知っていれば、
このτを求めるのは、さほど骨ではないが、
こういった前提知識をゼロで運転を学習してほしいというのがDQNという魔法への期待である。
NN教師信号と卒業試験の内容
サンプリングレートは10[msec], 伝達関数は1[sec]の1次遅れ系としてしまって計算。
教師信号
入力(青色)をチャープ信号としたτ=1[sec]の1次遅れ伝達関数の出力(緑色)
from scipy.signal import chirp, lfilter
dT = 0.01
def Plant1order(dT, x):
Tau = 1.0
a = [1, -np.exp(-dT/Tau)]
b = [0.0, (1.0 -np.exp(-dT/Tau))]
return lfilter(b, a, x)
def Data_Set(N):
Nyquist_F = (1/dT)/20
t = np.linspace(0, N*dT, N)
x = chirp(t, f0 = 0.01, f1 = Nyquist_F, t1 = N*dT, method = 'linear')
y = Plant1order(dT, x)
return x.astype(np.float32), y.astype(np.float32), t
NNへの入力
答えを知っているからという理由もあるのだが、うまく学習できれば、
y[n] = A \cdot x[n-1] + B \cdot y[n-1]
という結果になるはず(A, Bは定数)
したがって入力に現在の入力x, 1ステップ前の(x, y)を入れた3入力を入れることにする。
→ (入力, 1ステップ前の入力, 1ステップ前の出力の正解)を入力とする
tmp_DataSet_X, tmp_DataSet_Y, ttt = Data_Set(N_train)
x_train = np.vstack((tmp_DataSet_X, shift(tmp_DataSet_X, 1, cval=0.0), shift(tmp_DataSet_Y, 1, cval=0.0))).reshape(N_train,-1)
y_train = np.matrix(tmp_DataSet_Y).reshape(N_train,1)
卒業試験
ステップ応答(インデンシャル応答)させてみる(入力:青色、出力:緑色)
NNへの入力
NNが自分で予測した出力を使って計算すると誤差が累積して厳しいことになりそうなので、まずは甘やかし路線で教師信号と同じように、(入力, 1ステップ前の入力, 1ステップ前の出力の正解)を入力とする
NNの構成
全結合で
(入力, 1ステップ前の入力, 1ステップ前の出力) --> 60ユニット --> 60ユニット --> 出力
class MyChain(Chain):
# ニューラルネットワーク定義
n_input = 3
n_output = 1
n_units = 60
def __init__(self):
super(MyChain, self).__init__(
l1 = F.Linear(self.n_input, self.n_units),
l2 = F.Linear(self.n_units, self.n_units),
l3 = F.Linear(self.n_units, self.n_units),
l4 = F.Linear(self.n_units, self.n_output)
)
def __call__(self, x, t):
return F.mean_squared_error(self.predict(Variable(x)), Variable(t))
def predict(self, x):
h1 = F.leaky_relu(model.l1(x))
h2 = F.leaky_relu(model.l2(h1))
h3 = F.leaky_relu(model.l3(h2))
return model.l4(h3)
def get(self, x):
return self.predict(Variable(np.array([x]).astype(np.float32).reshape(len(x),3))).data
教育結果
うーん
Epoch: 1000
Loss: 0.191947328299 CV: 0.202642564848
うーん…
教師信号のバリエーションや教育数がいまひとつだったかなぁ…
参考
Appendix:ソース
# -*- coding: utf-8 -*-
import numpy as np
from scipy.signal import chirp, lfilter
from scipy.ndimage.interpolation import shift
# Chainerのライブラリ群
import chainer # 本体
import chainer.functions as F # ニューラルネットワークのシナプス関数群
from chainer import optimizers # 機械学習用関数
from chainer import Chain, Variable
import matplotlib.pyplot as plt
import argparse
# ランダムシードを固定することで学習モデルの違いによる差を評価しやすくする
np.random.seed(0)
parser = argparse.ArgumentParser(description='TF approximation')
parser.add_argument('--dT', '-t', default=0.1, type=float,
help='Sampling Time Step')
parser.add_argument('--second', '-s', default=False, action='store_true',
help='Use 2nd Lang TF')
args = parser.parse_args()
dT = args.dT
def random_range(a, b, N):
return (b - a)*np.random.rand(N) + a
def Plant1order(dT, x):
Tau = 1.0
a = [1, -np.exp(-dT/Tau)]
b = [0.0, (1.0 -np.exp(-dT/Tau))]
return lfilter(b, a, x)
def Plant2order(dT, x):
Omega0 = 1.0 * 2 * np.pi
Zeta = 0.7
OZ = Omega0 * Zeta
Omega = Omega0 * np.sqrt(1 - Zeta**2)
Phi = np.arccos(Zeta)
a = [1.,
-2.*np.exp(-OZ*dT)*np.cos(Omega*dT),
np.exp(-2.*OZ*dT)]
b = [0.,
1. - np.exp(-OZ*dT)/np.sqrt(1.-Zeta**2)*np.sin(Omega*dT + Phi),
np.exp(-2.*OZ*dT)+np.exp(-OZ*dT)/np.sqrt(1.-Zeta**2)*np.sin(Omega*dT - Phi)]
return lfilter(b, a, x)
def Data_Set(N):
Nyquist_F = (1/dT)/20
t = np.linspace(0, N*dT, N)
x = chirp(t, f0 = 0.01, f1 = Nyquist_F, t1 = N*dT, method = 'linear')
# random_range(-0.5, 0.5, N).astype(np.float32)
if not args.second:
y = Plant1order(dT, x)
else:
y = Plant2order(dT, x)
return x.astype(np.float32), y.astype(np.float32), t
class MyChain(Chain):
# ニューラルネットワーク定義
n_input = 3
n_output = 1
n_units = 60
def __init__(self):
super(MyChain, self).__init__(
l1 = F.Linear(self.n_input, self.n_units),
l2 = F.Linear(self.n_units, self.n_units),
l3 = F.Linear(self.n_units, self.n_units),
l4 = F.Linear(self.n_units, self.n_output)
)
def __call__(self, x, t):
return F.mean_squared_error(self.predict(Variable(x)), Variable(t))
def predict(self, x):
h1 = F.leaky_relu(model.l1(x))
h2 = F.leaky_relu(model.l2(h1))
h3 = F.leaky_relu(model.l3(h2))
return model.l4(h3)
def get(self, x):
return self.predict(Variable(np.array([x]).astype(np.float32).reshape(len(x),3))).data
if __name__ == "__main__":
# データセット
N_train = 4000
N_CV = 1000
tmp_DataSet_X, tmp_DataSet_Y, ttt = Data_Set(N_train)
x_train = np.vstack((tmp_DataSet_X, shift(tmp_DataSet_X, 1, cval=0.0), shift(tmp_DataSet_Y, 1, cval=0.0))).reshape(N_train,-1)
y_train = np.matrix(tmp_DataSet_Y).reshape(N_train,1)
print x_train.shape
fig = plt.figure(2)
plt.plot(ttt, tmp_DataSet_X)
plt.plot(ttt, tmp_DataSet_Y)
fig.patch.set_facecolor('white')
tmp_DataSet_X, tmp_DataSet_Y, ttt = Data_Set(N_CV)
x_CV = np.vstack((tmp_DataSet_X, shift(tmp_DataSet_X, 1, cval=0.0), shift(tmp_DataSet_Y, 1, cval=0.0))).reshape(N_CV, -1)
y_CV = np.matrix(tmp_DataSet_Y).reshape(N_CV,1)
# 学習モデル作成からのオプティマイザーにセット
model = MyChain()
optimizer = optimizers.Adam()
optimizer.setup(model)
# 学習の進みについてログを取るための変数
train_loss_means = []
CV_loss_means = []
# 学習のパラメータ
n_epoch = 1000
batch_size = 20
for epoch in xrange(1, n_epoch + 1):
print 'Epoch:', epoch
# Training
perm = np.random.permutation(N_train)
sum_loss = 0
for i in xrange(0, N_train, batch_size):
x_batch = np.asarray(x_train[perm[i: i + batch_size]])
y_batch = np.asarray(y_train[perm[i: i + batch_size]])
model.zerograds()
loss = model(x_batch, y_batch)
loss.backward()
optimizer.update()
sum_loss += loss.data * len(y_batch)
train_loss_means.append(sum_loss/N_train)
# Cross Validation
sum_loss = 0
for i in xrange(0, N_CV, batch_size):
x_batch = np.asarray(x_CV[i:i+batch_size])
y_batch = np.asarray(y_CV[i:i+batch_size])
loss = model(x_batch, y_batch)
sum_loss += loss.data * len(y_batch)
CV_loss_means.append(sum_loss/N_CV)
print "\tLoss:",train_loss_means[-1],"CV:",CV_loss_means[-1]
# Loss関数のプロット
plt.figure(1)
epolist = range(0, n_epoch)
plt.plot(epolist,train_loss_means)
plt.plot(epolist,CV_loss_means)
plt.yscale('log')
fig = plt.figure(3)
fig.patch.set_facecolor('white')
# 修業した成果を見せる
N_test = 200
# tmp_DataSet_X, tmp_DataSet_Y, t = Data_Set(N_test)
tmp_DataSet_X = np.ones(N_test).astype(np.float32)
tmp_DataSet_Y = Plant1order(dT, tmp_DataSet_X).astype(np.float32) if not args.second \
else Plant2order(dT, tmp_DataSet_X).astype(np.float32)
t = np.linspace(0, N_test*dT, N_test)
x = np.vstack((tmp_DataSet_X, shift(tmp_DataSet_X, 1, cval=0.0), shift(tmp_DataSet_Y, 1, cval=0.0))).reshape(N_test,-1)
y1 = model.get(x)
t *= dT
plt.plot(t, tmp_DataSet_X)
plt.plot(t, tmp_DataSet_Y)
plt.ylim(0.0,1.1)
plt.plot(t, y1)
plt.show()