前略
Chaninerのチュートリアルをやってみました。公式はノートブック形式だったのでスクリプトですぐ動かせるように書き直したものをメモとして残しておきます。GPUは使わない設定にしました。
手順
- 14. Chainer の基礎 (https://tutorials.chainer.org/ja/src/14_Basics_of_Chainer_ja.html)
- 15. Chainer の応用 (https://tutorials.chainer.org/ja/src/15_Advanced_Usage_of_Chainer_ja.html#Chainer-%E3%81%AE%E5%BF%9C%E7%94%A8)
- 16. トレーナとエクステンション (https://tutorials.chainer.org/ja/src/16_Trainer_and_Extension_ja.html)
Chainer の基礎
Chainerのコードの流れ。
- データ準備
- ネットワークを定義
- 勾配法選択
- トレーニング
- forwardで予測値を計算し目的関数を計算
- backwardで勾配を計算
- パラメータ更新
- テストデータで予測値を計算
import chainer
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import chainer.links as L
import chainer.functions as F
from chainer import Sequential
import numpy as np
import matplotlib.pyplot as plt
from chainer.iterators import SerialIterator
# Iris データセットの読み込み
x, t = load_iris(return_X_y=True)
# それぞれデータ型を変換
x = x.astype('float32')
t = t.astype('int32')
# データセットを分割
x_train_val, x_test, t_train_val, t_test = train_test_split(x, t, test_size=0.3, random_state=0)
# 訓練データセットと検証データセットに分割
x_train, x_val, t_train, t_val = train_test_split(x_train_val, t_train_val, test_size=0.3, random_state=0)
# ネットワークを定義
# net としてインスタンス化
n_input = 4
n_hidden = 6
n_output = 3
net = Sequential(
L.Linear(n_input, n_hidden), F.relu,
L.Linear(n_hidden, n_hidden), F.relu,
L.Linear(n_hidden, n_output)
)
# optimizerを決める
# optimizerとしてインスタンス化
optimizer = chainer.optimizers.SGD(lr=0.05)
# optimizerにnetをセットする
optimizer.setup(net)
# ネットワークを訓練する
# 1. 訓練用のバッチを準備
# 2. 予測値を計算し、目的関数を適用 (順伝播)
# 3. 勾配を計算 (逆伝播)
# 4. パラメータを更新
n_epoch = 30
n_batchsize = 16
iteration = 0
# ログの保存用
results_train = {
'loss': [],
'accuracy': []
}
results_valid = {
'loss': [],
'accuracy': []
}
for epoch in range(n_epoch):
# データセット並べ替えた順番を取得
order = np.random.permutation(range(len(x_train)))
# 各バッチ毎の目的関数の出力と分類精度の保存用
loss_list = []
accuracy_list = []
for i in range(0, len(order), n_batchsize):
# バッチを準備
index = order[i:i+n_batchsize]
x_train_batch = x_train[index,:]
t_train_batch = t_train[index]
# 予測値を出力
y_train_batch = net(x_train_batch)
# 目的関数を適用し、分類精度を計算
loss_train_batch = F.softmax_cross_entropy(y_train_batch, t_train_batch)
accuracy_train_batch = F.accuracy(y_train_batch, t_train_batch)
loss_list.append(loss_train_batch.array)
accuracy_list.append(accuracy_train_batch.array)
# 勾配のリセットと勾配の計算
net.cleargrads()
loss_train_batch.backward()
# パラメータの更新
optimizer.update()
# カウントアップ
iteration += 1
# 訓練データに対する目的関数の出力と分類精度を集計
loss_train = np.mean(loss_list)
accuracy_train = np.mean(accuracy_list)
# 1エポック終えたら、検証データで評価
# 検証データで予測値を出力
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
y_val = net(x_val)
# 目的関数を適用し、分類精度を計算
loss_val = F.softmax_cross_entropy(y_val, t_val)
accuracy_val = F.accuracy(y_val, t_val)
# 結果の表示
print('epoch: {}, iteration: {}, loss (train): {:.4f}, loss (valid): {:.4f}'.format(
epoch, iteration, loss_train, loss_val.array))
# ログを保存
results_train['loss'] .append(loss_train)
results_train['accuracy'] .append(accuracy_train)
results_valid['loss'].append(loss_val.array)
results_valid['accuracy'].append(accuracy_val.array)
# 目的関数の出力 (loss)
plt.plot(results_train['loss'], label='train') # label で凡例の設定
plt.plot(results_valid['loss'], label='valid') # label で凡例の設定
plt.legend() # 凡例の表示
plt.show()
# 分類精度 (accuracy)
plt.plot(results_train['accuracy'], label='train') # label で凡例の設定
plt.plot(results_valid['accuracy'], label='valid') # label で凡例の設定
plt.legend() # 凡例の表示
plt.show()
# テストデータで予測値を計算
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
y_test = net(x_test)
accuracy_test = F.accuracy(x_test, t_test)
print("test", accuracy_test.array)
chainer.serializers.save_npz('my_iris.net', net)
保存したモデルをロードする場合。Sequential
から書き換える。
loaded_net = Sequential(
L.Linear(n_input, n_hidden), F.relu,
L.Linear(n_hidden, n_hidden), F.relu,
L.Linear(n_hidden, n_output)
)
chainer.serializers.load_npz('my_iris.net', loaded_net)
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
y_test = loaded_net(x_test)
print(np.argmax(y_test[0, :].array))
Chainer の応用
データの分割、バッチの利用、正則化および MomentumSGD を使用します。
import matplotlib.pyplot as plt
from chainer.optimizer_hooks import WeightDecay
from chainer import optimizers
from sklearn.datasets import load_iris
from chainer.datasets import TupleDataset
from chainer.datasets import split_dataset_random
from chainer.iterators import SerialIterator
import chainer
import chainer.links as L
import chainer.functions as F
# データ読み込み
x, t = load_iris(return_X_y=True)
x = x.astype('float32')
t = t.astype('int32')
# 説明変数と目的変数をパックする
dataset = TupleDataset(x, t)
# データをトレーニング・バリデーション・テストにスプリット
train_val, test = split_dataset_random(
dataset, int(len(dataset) * 0.7), seed=0)
train, valid = split_dataset_random(
train_val, int(len(train_val) * 0.7), seed=0)
# バッチで出力するgeneratorをつくる
train_iter = SerialIterator(train, batch_size=4, repeat=True, shuffle=True)
# ネットワークのクラスを定義する
class Net(chainer.Chain):
def __init__(self, n_in=4, n_hidden=2, n_out=3):
super().__init__()
with self.init_scope():
self.li = L.Linear(n_in, n_hidden)
self.l1 = L.Linear(n_hidden, n_hidden)
self.lf = L.Linear(n_hidden, n_out)
def forward(self, x):
h = F.relu(self.li(x))
h = F.relu(self.l1(h))
h = self.lf(h)
return h
# Netをインスタンス化
net = Net()
# optimizerを選択
# 学習率を 0.01, モーメンタムを0.9 に設定
optimizer = optimizers.SGD(lr=0.01)
optimizer.setup(net)
# パラメータに処理をフックする
for param in net.params():
if param.name != 'b': # バイアス以外だったら
param.update_rule.add_hook(WeightDecay(0.0001)) # 重み減衰を適用
# 学習
n_batch = 64 # バッチサイズ
n_epoch = 100 # エポック数
# ログ
results_train, results_valid = {}, {}
results_train['loss'], results_train['accuracy'] = [], []
results_valid['loss'], results_valid['accuracy'] = [], []
count = 1
for epoch in range(n_epoch):
while True:
# ミニバッチの取得
train_batch = train_iter.next()
# x と t に分割
x_train, t_train = chainer.dataset.concat_examples(train_batch)
# 予測値と目的関数の計算
y_train = net(x_train)
loss_train = F.softmax_cross_entropy(y_train, t_train)
acc_train = F.accuracy(y_train, t_train)
# 勾配の初期化と勾配の計算
net.cleargrads()
loss_train.backward()
# パラメータの更新
optimizer.update()
# カウントアップ
count += 1
# 1エポック終えたら、valid データで評価する
if train_iter.is_new_epoch:
# 検証用データに対する結果の確認
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
x_valid, t_valid = chainer.dataset.concat_examples(valid)
y_valid = net(x_valid)
loss_valid = F.softmax_cross_entropy(y_valid, t_valid)
acc_valid = F.accuracy(y_valid, t_valid)
# 結果の表示
print('epoch: {}, iteration: {}, loss (train): {:.4f}, loss (valid): {:.4f}'
'acc (train): {:.4f}, acc (valid): {:.4f}'.format(
epoch, count, loss_train.array.mean(), loss_valid.array.mean(),
acc_train.array.mean(), acc_valid.array.mean()))
# 可視化用に保存
results_train['loss'] .append(loss_train.array)
results_train['accuracy'] .append(acc_train.array)
results_valid['loss'].append(loss_valid.array)
results_valid['accuracy'].append(acc_valid.array)
break
# 損失 (loss)
plt.plot(results_train['loss'], label='train') # label で凡例の設定
plt.plot(results_valid['loss'], label='valid') # label で凡例の設定
plt.legend() # 凡例の表示
plt.show()
# 精度 (accuracy)
plt.plot(results_train['accuracy'], label='train') # label で凡例の設定
plt.plot(results_valid['accuracy'], label='valid') # label で凡例の設定
plt.legend() # 凡例の表示
plt.show()
# テストデータに対する損失と精度を計算
x_test, t_test = chainer.dataset.concat_examples(test)
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
y_test = net(x_test)
loss_test = F.softmax_cross_entropy(y_test, t_test)
acc_test = F.accuracy(y_test, t_test)
print('test loss: {:.4f}'.format(loss_test.array))
print('test accuracy: {:.4f}'.format(acc_test.array))
トレーナとエクステンション
トレーナを使う場合の流れ。
- ネットワークを定義
- chainer.links.Classifierで損失関数をモデルに含める
- 勾配法を選択する
- chainer.training.StandardUpdaterにoptimizerとバッチのgeneratorをセット
- (chainer.training.triggers.EarlyStoppingTriggerでEarly Stoppingを実施)
- エクステンションをセット
- トレーニング実施
- テストデータで予測値を計算
from chainer.training.triggers import EarlyStoppingTrigger
from chainer import reporter
from chainer.training import extensions
from chainer import training
from chainer import optimizers
from chainer import iterators
from chainer.datasets import split_dataset_random
from chainer.datasets import TupleDataset
import numpy as np
import matplotlib.pyplot as plt
import chainer
import chainer.links as L
import chainer.functions as F
from sklearn.datasets import load_iris
# Iris データセットの読み込み
dataset = load_iris()
# 入力値と目標値を別々の変数へ格納
x = dataset.data
t = dataset.target
# Chainer がデフォルトで用いる float32 型へ変換
x = np.array(x, np.float32)
t = np.array(t, np.int32)
# 入力値と目標値を引数に与え、`TupleDataset` オブジェクトを作成
dataset = TupleDataset(x, t)
n_train = int(len(dataset) * 0.7)
n_valid = int(len(dataset) * 0.1)
train, valid_test = split_dataset_random(dataset, n_train, seed=0)
valid, test = split_dataset_random(valid_test, n_valid, seed=0)
print('Training dataset size:', len(train))
print('Validation dataset size:', len(valid))
print('Test dataset size:', len(test))
# イテレータ準備
batchsize = 32
train_iter = iterators.SerialIterator(
train, batchsize, shuffle=True, repeat=True)
valid_iter = iterators.SerialIterator(
valid, batchsize, shuffle=False, repeat=False)
# ネットワークを定義
class MLP(chainer.Chain):
def __init__(self, n_mid_units=100, n_out=3):
super().__init__()
with self.init_scope():
self.fc1 = L.Linear(None, n_mid_units)
self.fc2 = L.Linear(n_mid_units, n_mid_units)
self.fc3 = L.Linear(n_mid_units, n_out)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
h = self.fc3(h)
return h
# ネットワークを作成
predictor = MLP()
# L.Classifier でラップし、損失の計算などをモデルに含める
net = L.Classifier(predictor)
# 最適化手法を選択してオプティマイザを作成し、最適化対象のネットワークを持たせる
optimizer = optimizers.MomentumSGD(lr=0.1).setup(net)
# アップデータにイテレータとオプティマイザを渡す
updater = training.StandardUpdater(
train_iter, optimizer, device=-1) # device=-1でCPUでの計算実行を指定
# トレーナを作成
# 繰り返しの終了タイミングを第2引数に(整数, 単位)というタプルを渡して指定
# 単位には'iteration'もしくは'epoch'のいずれかの文字列を指定
# 1イテレーション(iteration)とはミニバッチ 1 個分を処理することを表し、
# 1 エポック(epoch)とはイテレーションを繰り返してデータセット全体を1周することを表す
# trainer = training.Trainer(updater=updater, stop_trigger=(30, 'epoch'), out='results/iris_result1', extensions=None)
# early stoppingを使用する場合
trigger = EarlyStoppingTrigger(monitor='val/main/loss', check_trigger=(1, 'epoch'),
patients=5, max_trigger=(30, 'epoch'))
trainer = training.Trainer(
updater=updater, stop_trigger=trigger, out='results/iris_result1', extensions=None)
# 以下でエクステンションを追加(https://docs.chainer.org/en/stable/reference/training.html#extensions)
# レポートした値を自動的に集計し、Trainer オブジェクト作成時に
# out引数で指定したディレクトリに、log_name引数に指定されたファイル名で
# それらの集計された情報を JSON 形式で保存
# 値の集計を行う周期は、trigger という引数に (数, 単位) を表すタプルを与えて指定
# 単位には'epoch'または'iteration'を指定
trainer.extend(extensions.LogReport(trigger=(1, 'epoch'), log_name='log'))
# トレーナオブジェクトを指定されたタイミング(デフォルトでは 1 エポックごと)で保存
# {.updater.iteration} はイテレーション数を表す数値に、
# {.updater.epoch} はエポック数を表す数値に置き換えられる
trainer.extend(extensions.snapshot(
filename='snapshot_epoch-{.updater.epoch}'))
# 指定されたVariableオブジェクトからたどることができる計算グラフを
# Graphvizで描画可能なDOT形式で保存
trainer.extend(extensions.dump_graph('main/loss'))
# 訓練中に指定されたタイミングで検証用データセットを用いたネットワークの評価を行う
trainer.extend(extensions.Evaluator(valid_iter, net, device=-1), name='val')
# LogReport で集計した値を標準出力に出力
# どの値を出力するかをリストの形で与える
trainer.extend(extensions.PrintReport(['epoch', 'iteration', 'main/loss', 'main/accuracy',
'val/main/loss', 'val/main/accuracy', 'fc1/W/data/mean', 'elapsed_time']))
# 第 1 引数に与えられるリストで指定された値の時間変化をグラフに描画し、
# 出力ディレクトリにfile_name引数に指定されたファイル名で画像として保存
# 1層目の全結合層のパラメータが持つ勾配の平均値の変遷を描画したグラフをmean.pngに保存
trainer.extend(extensions.PlotReport(
['fc1/W/grad/mean'], x_key='epoch', file_name='mean.png'))
# 訓練用データセットと検証用データセットのそれぞれで計算した損失の値の変遷を
# 1つのグラフにまとめ、loss.pngという画像ファイルに保存
trainer.extend(extensions.PlotReport(
['main/loss', 'val/main/loss'], x_key='epoch', file_name='loss.png'))
# 訓練用・検証用データセットのそれぞれで計算した正解率の値の変遷を
# 1つのグラフにまとめ、accuracy.pngに保存
trainer.extend(extensions.PlotReport(
['main/accuracy', 'val/main/accuracy'], x_key='epoch', file_name='accuracy.png'))
# 指定したLinkが持つパラメータの平均・分散・最小値・最大値などの統計値を計算し、レポート
# パラメータが発散していないかなどをチェックするのに便利
trainer.extend(extensions.ParameterStatistics(
net.predictor.fc1, {'mean': np.mean}, report_grads=True))
# 訓練を実施
trainer.run()
# テストデータに対する損失と精度を計算
x_test, t_test = chainer.dataset.concat_examples(test)
with chainer.using_config('train', False), chainer.using_config('enable_backprop', False):
y_test = net.predictor(x_test)
loss_test = F.softmax_cross_entropy(y_test, t_test)
acc_test = F.accuracy(y_test, t_test)
print('test loss: {:.4f}'.format(loss_test.array.mean()))
print('test accuracy: {:.4f}'.format(acc_test.array.mean()))
chainer.Reporterで集計したい値を記録する。
from chainer import reporter
from chainer.training import extensions
from chainer import training
from chainer import optimizers
from chainer import iterators
from chainer.datasets import split_dataset_random
from chainer.datasets import TupleDataset
import numpy as np
import matplotlib.pyplot as plt
import chainer
import chainer.links as L
import chainer.functions as F
from sklearn.datasets import load_iris
# Iris データセットの読み込み
dataset = load_iris()
# 入力値と目標値を別々の変数へ格納
x = dataset.data
t = dataset.target
# Chainer がデフォルトで用いる float32 型へ変換
x = np.array(x, np.float32)
t = np.array(t, np.int32)
# 入力値と目標値を引数に与え、`TupleDataset` オブジェクトを作成
dataset = TupleDataset(x, t)
n_train = int(len(dataset) * 0.7)
n_valid = int(len(dataset) * 0.1)
train, valid_test = split_dataset_random(dataset, n_train, seed=0)
valid, test = split_dataset_random(valid_test, n_valid, seed=0)
print('Training dataset size:', len(train))
print('Validation dataset size:', len(valid))
print('Test dataset size:', len(test))
# イテレータ準備
batchsize = 32
train_iter = iterators.SerialIterator(train, batchsize)
valid_iter = iterators.SerialIterator(
valid, batchsize, shuffle=False, repeat=False)
# 別のネットワークを試す
class MLP2(chainer.Chain):
def __init__(self, n_mid_units=100, n_out=3):
super().__init__()
with self.init_scope():
self.fc1 = L.Linear(None, n_mid_units)
self.fc2 = L.Linear(n_mid_units, n_mid_units)
self.fc3 = L.Linear(n_mid_units, n_out)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
reporter.report(
{'avg_y': F.average(h), 'var_y': F.cross_covariance(h, h)}, self)
h = self.fc3(h)
return h
# ネットワーク (+ Classifier)
net = L.Classifier(MLP2())
# オプティマイザ
optimizer = optimizers.MomentumSGD(lr=0.1).setup(net)
# イテレータ
train_iter = iterators.SerialIterator(train, 32)
# アップデータ
updater = training.StandardUpdater(
train_iter, optimizer, device=-1) # device=-1でCPUでの計算実行を指定
# トレーナ
trainer = training.Trainer(updater, (30, 'epoch'))
"""
次に、 LogReport と PrintReport を設定します。
このとき他のエクステンションも LogReport が集計した値を用いるため、
LogReport の追加は必須です。
PrintReport には、表示したい値の名前を設定します。
- epoch、iteration、elapsed_time: トレーナオブジェクトがデフォルトでレポートする値で、
それぞれエポック数、イテレーション数、経過時間を表します。
- main は、オプティマイザが保持するモデル(今回は L.Classifier)を表します。
- main/accuracy は、L.Classifier がデフォルトでレポートする正解率です。
- main/loss は、L.Classifier がデフォルトでレポートする損失の値です。
- main/predictor は、L.Classifier に渡されたネットワーク(今回は MLP2)を表します。
- main/predictor/avg_y は、net が保持している predictor の中でレポートされている
avg_y の値を指します。
- main/predictor/var_y は、同様に、net が保持している predictor の中で
レポートされている var_y の値を指します。
"""
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport([
'epoch', 'iteration',
'main/accuracy',
'main/predictor/avg_y',
'main/predictor/var_y',
]))
# 訓練を実施
trainer.run()
草々
画像処理のチュートリアルなども出るっぽい?ので楽しみですね!