今回は前回つくったニューラルネットワークを、Chainerに変換していきます。
今回もお世話になったのは、このページです。ありがとうございます。
Chainの導入
本格的にChainerのトピックであるChainを使っていきます.
これによってニューラルネットワークをクラスとして定義します.
また,順方向伝搬をCallを使って呼び出し実行します.
まずは,ニューラルネットワーク層と隠れ層の活性化関数を定義します.
Numpy表示
for t in range(EPOCHS):
# 順方向伝搬計算
h = F.matmul(x, w1)
h_r = F.relu(h)
y_p = F.matmul(h_r, w2)
Chainer表示
#順方向伝搬計算
class TwoLayerNet(chainer.Chain):
def __init__(self, d_in, h, d_out):
super(TwoLayerNet, self).__init__(
linear1=L.Linear(d_in, h, initialW=W1.transpose()),
linear2=L.Linear(h, d_out, initialW=W2.transpose())
)
def __call__(self, x):
g = self.linear1(x)
h_r = F.relu(g)
y_p = self.linear2(h_r)
return y_p
Classで定義する関数名をTwoLayerNetと名付け, (chainer.Chain)をいれることで,
DeepLearningの層であることを述べます.
その中は2つで構成されており,
はじめに,initで,初期設定であるDeepLearningの層の構成を定義します.
ここでは2層であること,重み付け層である各層のwが何であることを定義します.
そのあと,この関数を呼び出すcallを述べて,
パラメータは入力xは,numpy表示と同じ計算をしています.
その結果をy_p を出力として返しています.
def sample_3():
# 入力x,yを定義
x = Variable(X)
y = Variable(Y)
# ニューるるネットワークの定義
model = TwoLayerNet(3, 2, 3)
for t in range(EPOCHS):
# forward
y_p = model(x)
次に実際の計算のsample_3()を定義します.
x,yをVariable定義のパラメータとしてセットします.
次に,用いるニューラルネットワークとして,先に定義したTwoLayerNetを使いますよ,
とセネgんします.
次が計算の実行です.
Optimizerの導入
Numpy表示
## ロスの計算
loss = F.mean_squared_error(y_p, y)
print(loss.data)
# 重み付けwの傾き計算
w1.zerograd()
w2.zerograd()
# 逆伝搬計算
loss.backward()
# 重み付けWの更新
w1.data -= LEARNING_RATE * w1.grad
w2.data -= LEARNING_RATE * w2.grad
Chainer 表示
#出力層の関数を定義
optimizer = P.SGD(lr=LEARNING_RATE)
# 定義した出力層をニューラルネットワークのモデルに結合する
optimizer.setup(model)
# ロスの計算
loss = F.mean_squared_error(y_p, y)
print(loss.data)
# 重み付けwの傾き計算
model.cleargrads()
# 逆伝搬計算
loss.backward()
# 重み付けWの更新
optimizer.update()
これでわかるように,ニューラルモデルのモデルを作り,それを数式的にといて関数をつくり,
その関数をコードでひとつづつ記述していました.
それが,Chainerを使うことで,そのインスタンスとよばれるコードを呼び出しているだけです.
これで,自分で数式つかて解く必要もなく,呼び出したものをみるだけですむので間違いはないです.
直感的にわかりやすいです.
Trainerの導入
最後に,繰り返し分をもちいてパラメータの学習をしていたところを,Trainerを用いて処理します.
これfor文によるループ処理をなくせます.
これによって,コードを読むだけでどういったニューラルネットワーク構成になっており,またそれを計算させているのかが理解しやすくなります.
Numpy表示
class TwoLayerNet(chainer.Chain):
def __init__(self, d_in, h, d_out):
super(TwoLayerNet, self).__init__(
linear1=L.Linear(d_in, h, initialW=W1.transpose()),
linear2=L.Linear(h, d_out, initialW=W2.transpose())
)
def __call__(self, x):
g = self.linear1(x)
h_r = F.relu(g)
y_p = self.linear2(h_r)
return y_p
class LossCalculator(chainer.Chain): #ロス関数の定義
def __init__(self, model):
super(LossCalculator, self).__init__() #ロス関数の初期化
with self.init_scope(): #
self.model = model
def __call__(self, x, y):
y_p = self.model(x)
loss = F.mean_squared_error(y_p, y)
reporter.report({'loss': loss}, self)
return loss
Chainer表示
class TwoLayerNet(chainer.Chain):
def __init__(self, d_in, h, d_out):
super(TwoLayerNet, self).__init__(
linear1=L.Linear(d_in, h, initialW=W1.transpose()),
linear2=L.Linear(h, d_out, initialW=W2.transpose())
)
def __call__(self, x):
g = self.linear1(x)
h_r = F.relu(g)
y_p = self.linear2(h_r)
return y_p
class LossCalculator(chainer.Chain): #ロス関数の定義
def __init__(self, model):
super(LossCalculator, self).__init__() #ロス関数の初期化
with self.init_scope(): #
self.model = model
def __call__(self, x, y):
y_p = self.model(x)
loss = F.mean_squared_error(y_p, y)
reporter.report({'loss': loss}, self)
return loss
def sample_5():
# イタレータの作成
dataset = D.TupleDataset(X, Y)
train_iter = Iter.SerialIterator(dataset, batch_size=M, shuffle=False)
# ネットワークの構成 ここは変わらず
model = TwoLayerNet(N_I, N_H, N_O)
loss_calculator = LossCalculator(model)
# 出力層の関数を定義 ここは変わらず
optimizer = P.SGD(lr=LEARNING_RATE)
# 先に定義したロス計算でロスを求める.
optimizer.setup(loss_calculator)
# 計算したロスをアップデート
updater = training.StandardUpdater(train_iter, optimizer)
# Trainerを作成し,パラメータの更新を行う.
trainer = training.Trainer(updater, (EPOCHS, 'epoch'), out='result')
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'elapsed_time']))
# Trainerの実行(繰り返し計算)
trainer.run()
まずは,Iterlatorの作成.
初学者の私はイテレータってなに?から始まったのですが,以下が説明にありました.
”プログラミング言語において配列やそれに類似するデータ構造の各要素に対する繰返し処理の抽象化である”
これではよくわからないのですが,for文のような繰り返し処理をする場合,
繰り返しに用いるパラメータ(入力x, 計算の都度 出力と比較されるy), 一度の計算されるパラメータ数のbachを表現しておきます.
shuffle=False
としています.これをTrueにすると,エポックごとに呼び出されるBachがランダムに選ばられるようになるそうです.作成するモデルに汎用性をもたせるためにはTrueのほうがよいと思います.
このあたりは,モデルの精度,汎用性,計算時間との兼ね合いにおもうので,
まずはFalseで簡単に定義しておき,そのあとTrueにしてその効果をしる,というのがよいのかと理解しました.
ここで,ロス計算時のreporter.report({'loss': loss}, self)にて,計算したロスをその都度表示させていると思いました.本当にそうであるかどうか,これがあるときとないときの計算を実行し,確かめてみます.
その他は,ChaierでTrainerを書くときの決まった作法(おまじない)という理解です.
いくつかの実行をやめたときにどうするのかを確かめると,そのコードの意味がより理解できますね.
こんかいは,report.report()関数の意味を確かめてみます.
reporter.report({'loss': loss}, self) あり
epoch main/loss elapsed_time
1 15.1976 0.00607594
2 15.1322 0.0111498
3 15.0672 0.0143228
4 15.0027 0.0192569
5 14.9386 0.0227788
〜〜〜〜〜
294 5.68451 2.23332
295 5.66928 2.24488
296 5.65412 2.25483
297 5.63901 2.26516
298 5.62396 2.27733
299 5.60897 2.28802
300 5.59404 2.29986
reporter.report({'loss': loss}, self) なし
epoch main/loss elapsed_time
1 0.00508996
2 0.00868747
3 0.0118534
4 0.0155854
5 0.0192324
〜〜〜〜〜
295 1.9987
296 2.00716
297 2.01486
298 2.02375
299 2.03164
300 2.03996
これをみるとわかりましたが,lossを結果として保存しないため,
そのあとのmain/lossのためデータが欠落しています.
またreport.report(self)があるほうが,計算時間がかかっているのもわかりました.
これで上から下まで呼んでいくだけで,どういうニューラルネットワーク構成になっているのか,
わかりやすくなっています.
次はよくあるmninstをニューラルネットワークのモデルを作って分類処理をしてみます.
# -*- coding: utf-8 -*-
import numpy as np
import chainer
import chainer.functions as F
import chainer.optimizers as P
import chainer.links as L
import chainer.datasets as D
import chainer.iterators as Iter
from chainer import training
from chainer.training import extensions
from chainer import reporter
EPOCHS = 300
M = 2
N_I = 3
N_H = 3
N_O = 2
LEARNING_RATE = 1.0e-04
# 乱数をここでセット.プログラムをおこなうときは,同じ乱数が選ばれる.
np.random.seed(1)
X = np.random.randn(M, N_I).astype(np.float32)
Y = np.random.randn(M, N_O).astype(np.float32)
W1 = np.random.randn(N_I, N_H).astype(np.float32)
W2 = np.random.randn(N_H, N_O).astype(np.float32)
class TwoLayerNet(chainer.Chain):
def __init__(self, d_in, h, d_out):
super(TwoLayerNet, self).__init__(
linear1=L.Linear(d_in, h, initialW=W1.transpose()),
linear2=L.Linear(h, d_out, initialW=W2.transpose())
)
def __call__(self, x):
g = self.linear1(x)
h_r = F.relu(g)
y_p = self.linear2(h_r)
return y_p
class LossCalculator(chainer.Chain):
def __init__(self, model):
super(LossCalculator, self).__init__()
with self.init_scope():
self.model = model
def __call__(self, x, y):
y_p = self.model(x)
loss = F.mean_squared_error(y_p, y)
reporter.report({'loss': loss}, self)
return loss
def sample_5():
# イタレータの作成
dataset = D.TupleDataset(X, Y)
train_iter = Iter.SerialIterator(dataset, batch_size=M, shuffle=False)
# いたレータをネットワークと結合
model = TwoLayerNet(N_I, N_H, N_O)
loss_calculator = LossCalculator(model)
# optimizerをセット.出力層はSGD
optimizer = P.SGD(lr=LEARNING_RATE)
# Optimizerをネットワークに結合し,ロスを計算
optimizer.setup(loss_calculator)
# 計算結果をアップデート
updater = training.StandardUpdater(train_iter, optimizer)
# トレーニングを繰り返し行う
trainer = training.Trainer(updater, (EPOCHS, 'epoch'), out='result')
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'elapsed_time']))
trainer.run()
if __name__ == '__main__':
sample_5()