はじめに
※この記事には2021年度更新版があります。こちらをぜひご確認ください。
この記事は機械系 Advent Calendar 2018の2日目の記事です。
Deep Learningに関することをやろうとする人にとって悩ましいのが、どのライブラリ使えばええねん問題だと思います。
日本発だしChainer?FacebookのPyTorch?GoogleのTensorFlow?
個人的にはChainerやPyTorchの書き方がスッキリしていて好きなのですが、世界的にはKerasやGoogle Colaboratoryの登場によってTensorFlowの勢いが増しているように感じます。(というか実際そうですね)
ここでは、N番煎じではありますが、それぞれの書き方を並べて紹介することで、そのおおまかな型のようなものをつかんでみようと思います。
まあ、執筆者の勉強のついでなので、たいしたものではないですが、ご了承ください。
注意
比較のしやすさのために冗長な書き方になっている箇所があります。
また、Deep Learning自体については説明なしです。コードだけです。
あと、これは時間をかけたくないという怠惰な理由からなのですが、全然層数がDeepじゃないです。Deepにしても使用できますので、ご了承ください。
MLP編
まずは基本的なMLPを見ていきましょう。有名なMNISTの分類を行います。
Chainer
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions
# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color
batch_size = 10 # バッチサイズ
uses_device = 0 # GPU#0を使用(CPU=-1)
# 2.モデルの作成
class MLP(chainer.Chain):
def __init__(self):
super(MLP, self).__init__()
with self.init_scope():
self.l1 = L.Linear(None, 64)
self.l2 = L.Linear(None, 10)
def __call__(self, x, t=None, train=True):
h = F.relu(self.l1(x))
h = self.l2(h)
# 訓練時には損失を、テスト時には結果を返す
return F.softmax_cross_entropy(h, t) if train else F.softmax(h)
# 3.Updater(パラメータ更新手法)の作成
# ここではStandardUpdaterを使用するため、必要ない
# 4.実行する
# モデルを作成
model = MLP()
# GPUに対応させる
if uses_device >= 0:
chainer.cuda.get_device_from_id(uses_device).use()
chainer.cuda.check_cuda_available()
# GPU用データ形式に変換
model.to_gpu()
# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)
test_iter = iterators.SerialIterator(test, batch_size, repeat=False, shuffle=False)
# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)
# デバイスを選択してTrainer(訓練)を作成する
updater = training.StandardUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.Evaluator(test_iter, model, device=uses_device)) # テストをTrainerに設定する
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする
# 機械学習を実行する
trainer.run()
Chainerは大きく以下の4つの部分に分けられます。
- データセットを用意する部分
- モデルを作成する部分
- Updater(パラメータ更新手法)を作成する部分
- 実行する部分
何と言っても特徴的なのが、Updaterです。学習時のパラメータの更新の方法を記述することができるのですが、今回は特にいじらずStandardUpdaterをお手軽に使用します。これとTrainerというものを利用することで、何エポックも訓練させる部分をスッキリ書くことができます。普通であれば、次に紹介するPyTorchのようにfor文を使用しますが、この部分をうまく隠蔽しているのです。また、Trainerにはextensionという便利なオプションをつけることが可能です。
PyTorch
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
# 1.データセットを用意する
training_epochs = 5 # エポック数
batch_size = 10 # バッチサイズ
trainset = torchvision.datasets.MNIST(root='./data',
train=True,
download=True,
transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
batch_size=batch_size,
shuffle=True)
testset = torchvision.datasets.MNIST(root='./data',
train=False,
download=True,
transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset,
batch_size=batch_size,
shuffle=False)
# 2.モデルの作成
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.l1 = nn.Linear(784,64)
self.l2 = nn.Linear(64, 10)
def forward(self, x):
h = x.view(-1, 28 * 28) # 28*28の画像から1次元ベクトルにリサイズする
h = F.relu(self.l1(h))
return self.l2(h)
# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = MLP().to(device)
# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 訓練する
for epoch in range(training_epochs):
running_loss = 0.0
for i, (inputs, labels) in enumerate(trainloader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad() # 計算前に勾配をゼロにする
outputs = model(inputs) # 順伝播の計算をする
loss = criterion(outputs, labels) # 損失を計算する
loss.backward() # 誤差逆伝播の計算をする
optimizer.step() # 更新する
# 損失を定期的に出力する
running_loss += loss.item()
if i % 100 == 99:
print('[{:d}, {:5d}] loss: {:3f}'
.format(epoch + 1, i + 1, running_loss / 100))
running_loss = 0.0
print('Finished Training')
# テストする
correct = 0
total = 0
with torch.no_grad():
for (images, labels) in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))
PyTorchは大きく以下の3つの部分に分けられます。
- データセットを用意する部分
- モデルを作成する部分
- 実行する部分
Chainerと書き方はほぼ同じなのですが、Updaterはなく、訓練時にはfor文を使用して書きます。ただ、for文の中に記述されている処理もCNN編で紹介するChainerのUpdaterの中の処理とほぼ書き方が同じですので、ご安心ください。(Trainerもあるようですが、あまり使われているのを見たことがありません。)ChainerとPyTorchは対応関係がわかりやすく、片方を知っていればもう片方の学習コストはかなり低くなると思います。
TensorFlow
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.5 # 学習率
training_epochs = 1500 # エポック数
batch_size = 50 # ミニバッチのサイズ
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
img = tf.reshape(x,[-1,28,28,1])
# 画像をログとして出力
tf.summary.image("input_data", img, 20)
# 隠れ層
with tf.name_scope("hidden"):
# 重み(変数)
w1 = tf.Variable(tf.truncated_normal([784, 64], stddev=0.1), name="w1")
# バイアス(変数)
b1 = tf.Variable(tf.zeros([64]), name="b1")
# 活性化関数
h1 = tf.nn.relu(tf.matmul(x, w1) + b1)
# 出力層
with tf.name_scope("output"):
# 重み(変数)
w2 = tf.Variable(tf.truncated_normal([64, 10], stddev=0.1), name="w2")
# バイアス(変数)
b2 = tf.Variable(tf.zeros([10]), name="b2")
# 活性化関数
out = tf.nn.softmax(tf.matmul(h1, w2) + b2)
# 誤差関数
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(tf.square(t - out))
# 誤差をログとして出力
tf.summary.scalar("loss", loss)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.GradientDescentOptimizer(
learning_rate # 学習率
).minimize(loss) # 最小化問題を解く
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 精度をログとして出力
tf.summary.scalar("accuracy", accuracy)
# 変数を初期化するノード
init =tf.global_variables_initializer()
# ログをマージするノード
summary_op = tf.summary.merge_all()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
# FileWriterオブジェクトを生成する
summary_writer = tf.summary.FileWriter(
"logs", # イベントファイルの保存先
sess.graph # データフローグラフを視覚化する
)
sess.run(init) # initノードを実行して変数を初期化
for epoch in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels} # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
)
# 50回ごとにテストデータを使用して精度を出力
epoch += 1
if epoch % 50 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
print('(%d) accuracy = %.2f' % (epoch, acc_val))
# イベントログをsummary_strに代入
summary_str = sess.run(
summary_op, # ログをマージするノードを実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
# イベントファイルにログを追加
summary_writer.add_summary(summary_str, epoch)
## $tensorboard --logdir="イベントファイルの入ったフォルダーのパス" でイベントログが確認出来るURLが与えられる
## 例:$tensorboard --logdir=./logs
## コマンド実行中にそのURL(例えばlocalhost:6006)にアクセスすれば良い
## Ctrl+Cで止められる
TensorFlowは大きく以下の3つの部分に分けられます。
- データセットを用意する部分
- データフローグラフ(設計図)の作成部分
- セッションで実行する部分
おおまかにはPyTorchと同じ流れに感じますが、耳慣れない単語が出てきましたね。
実はこれまで紹介したものとは少し考え方が違っているのです。
ニューラルネットワークの計算を表現したものを計算グラフというのですが、先ほどまで紹介してきたChainerなどでは、計算の実行時にこの計算グラフが定義される「Define-by-Run」という方式が取られています。それに対してTensorFlowでは、計算グラフを学習の前にあらかじめ構築する「Define-and-Run」という方式が取られています。(ただし、Eager Execution for TensorFlowなど一部の機能では「Define-by-Run」方式が取られている)
そこで、TensorFlowではデータフローグラフ(設計図だと思えば良いです)を作ってから、セッションで実行して学習する、という手順になっているのです。
その他の特徴としては、見ての通りChainerやPyTorchほど隠蔽されていないので、書くのが大変なかわりに細かい設定ができたりします。
また、tensorboardという便利な可視化ツールを利用することができます。
TensorFlow(tf.layers)
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.5 # 学習率
training_epochs = 1500 # エポック数
batch_size = 50 # ミニバッチのサイズ
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
img = tf.reshape(x,[-1,28,28,1])
# 画像をログとして出力
tf.summary.image("input_data", img, 20)
# 隠れ層
with tf.name_scope("hidden"):
h1 = tf.layers.dense(inputs=x,units=64,activation=tf.nn.relu)
# 出力層
with tf.name_scope("output"):
out = tf.layers.dense(inputs=h1,units=10,activation=tf.nn.softmax)
# 誤差関数
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(tf.square(t - out))
# 誤差をログとして出力
tf.summary.scalar("loss", loss)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.GradientDescentOptimizer(
learning_rate # 学習率
).minimize(loss) # 最小化問題を解く
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 精度をログとして出力
tf.summary.scalar("accuracy", accuracy)
# 変数を初期化するノード
init =tf.global_variables_initializer()
# ログをマージするノード
summary_op = tf.summary.merge_all()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
# FileWriterオブジェクトを生成する
summary_writer = tf.summary.FileWriter(
"logs", # イベントファイルの保存先
sess.graph # データフローグラフを視覚化する
)
sess.run(init) # initノードを実行して変数を初期化
for epoch in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels} # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
)
# 50回ごとにテストデータを使用して精度を出力
epoch += 1
if epoch % 50 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
print('(%d) accuracy = %.2f' % (epoch, acc_val))
# イベントログをsummary_strに代入
summary_str = sess.run(
summary_op, # ログをマージするノードを実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
# イベントファイルにログを追加
summary_writer.add_summary(summary_str, epoch)
先ほどTensorFlowの説明の中で、かる〜く「書くのが大変ですが」などと流しましたが、実際にやってみると本当に大変です。TensorFlowに苦手意識を持つ原因として、機能や高レベルAPIが多すぎて混乱する、いろいろな書き方ができてしまい人のコードが読みづらい、ということがあると僕は感じています。
そんな中、モデルの作成部分だけでもChainerやPyTorchのように書けるようになるのが、このtf.layersです。層をいくつも重ねなければならない時には助かりますね。モデルの作成部分以外は先ほどのTensorFlowのコードと同じです。
TensorFlow(tf.keras)
import tensorflow as tf
from tensorflow.keras.backend import set_session
# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist
(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0
train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)
# 2.モデルの作成部分
learning_rate = 0.01 # 学習率
training_epochs = 5 # エポック数
batch_size = 50 # ミニバッチのサイズ
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
set_session(tf.Session(config=config))
# モデル作成(クラスを使わない場合)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation=tf.nn.relu),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)
さて、まだTensorFlowに苦手意識のある方に朗報です。このtf.kerasを使ってみてください。この短さ、もはや別物ですね笑。model.fitというところで訓練しているのですが、元々のTensorFlowに比べてかなり隠蔽されていると思います。細かい設定がしにくくなるかもしれませんが、初学者はここから入るのが良いかもしれません。ネット上にも実装例が多く転がっている印象です。
CNN編
次はCNNです。畳み込みを用いてMNISTの分類を行います。
画像を扱う場合、必ずと言っていいほど使われる手法です。
Chainer
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions
# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color
batch_size = 10 # バッチサイズ
uses_device = 0 # GPU#0を使用(CPU=-1)
# 2.モデルの作成
class CNN(chainer.Chain):
def __init__(self):
super(CNN, self).__init__()
with self.init_scope():
self.c1 = L.Convolution2D(1, 8, ksize=3) # フィルタサイズ=3で出力数8
self.l1 = L.Linear(None, 10)
def __call__(self, x, t=None, train=True):
h = F.relu(self.c1(x))
h = F.max_pooling_2d(h, 2)
h = self.l1(h)
# 損失か結果を返す
return F.softmax_cross_entropy(h, t) if train else F.softmax(h)
# 3.Updater(パラメータ更新手法)の作成
class MNISTUpdater(training.StandardUpdater):
def __init__(self, train_iter, optimizer, device):
super(MNISTUpdater, self).__init__(
train_iter,
optimizer,
device=device
)
def update_core(self):
# データを1バッチ分取得
batch = self.get_iterator('main').next()
x, t = chainer.dataset.convert.concat_examples(batch, self.device) # ここではx, t = self.converter(batch, self.device)でも良い
# Optimizerを取得
optimizer = self.get_optimizer('main')
# modelを取得
net = optimizer.target
# パラメータを更新(カッコの中には損失関数が入る、ここでは関数netにxとtを入力して計算された損失)
optimizer.update(net, x, t)
# 損失関数が複雑でUpdaterクラスのメンバーとして定義する場合
# optimizer.target.cleargrads() で計算前に勾配をクリアにする
# loss = self.loss_1(x,t) + self.loss_2(x,t) のように損失を計算する(例)
# loss.backward() で誤差逆伝播の計算をする
# optimizer.update() で更新する
# という書き方もできる
# 4.実行する
# モデルを作成
model = CNN()
# GPUに対応させる
if uses_device >= 0:
chainer.cuda.get_device_from_id(uses_device).use()
chainer.cuda.check_cuda_available()
# GPU用データ形式に変換
model.to_gpu()
# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)
# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)
# デバイスを選択してTrainer(訓練)を作成する
updater = MNISTUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする
# 機械学習を実行する
trainer.run()
今回はUpdaterを自作してみました。中身がPyTorchとほぼ同じ書き方であることがわかると思います。
PyTorch
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
# 1.データセットを用意する
training_epochs = 5 # エポック数
batch_size = 10 # バッチサイズ
trainset = torchvision.datasets.MNIST(root='./data',
train=True,
download=True,
transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
batch_size=batch_size,
shuffle=True)
testset = torchvision.datasets.MNIST(root='./data',
train=False,
download=True,
transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset,
batch_size=batch_size,
shuffle=False)
# 2.モデルの作成
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.c1 = nn.Conv2d(1, 8, 3) # フィルタサイズ=3で出力数8
self.l1 = nn.Linear(8*13*13, 10)
def forward(self, x):
h = F.relu(self.c1(x))
h = F.max_pool2d(h, (2, 2))
h = h.view(-1, 8*13*13)
h = self.l1(h)
return h
# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 訓練する
for epoch in range(training_epochs):
running_loss = 0.0
for i, (inputs, labels) in enumerate(trainloader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad() # 計算前に勾配をゼロにする
outputs = model(inputs) # 順伝播の計算をする
loss = criterion(outputs, labels) # 損失を計算する
loss.backward() # 誤差逆伝播の計算をする
optimizer.step() # 更新する
# 損失を定期的に出力する
running_loss += loss.item()
if i % 100 == 99:
print('[{:d}, {:5d}] loss: {:3f}'
.format(epoch + 1, i + 1, running_loss / 100))
running_loss = 0.0
print('Finished Training')
# テストする
correct = 0
total = 0
with torch.no_grad():
for (images, labels) in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))
畳み込みに変えただけで、MLPとほぼ同じですね。
TensorFlow
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.01 # 学習率
training_epochs = 1500 # エポック数
batch_size = 100 # ミニバッチのサイズ
dropout = 0.5 # 非ドロップアウト率
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層
with tf.name_scope("input"):
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
out_img = tf.reshape(x,[-1,28,28,1])
# 第1層:畳み込み層(ニューロン数=32)
with tf.name_scope("layer1_conv"):
# 2次元フィルター(3×3を32枚)
f1 = tf.Variable(
tf.truncated_normal([3, # フィルターの縦サイズ
3, # フィルターの横サイズ
1, # 画像のチャンネル数
32], # フィルターの枚数
stddev=0.1))
# 畳み込みを行うノード
conv1 = tf.nn.conv2d(out_img, # 28×28の画像
f1, # 2次元フィルター
strides=[1, # 入力データを1ずつ順送り
1, # 縦方向のストライドは1
1, # 横方向のストライドは1
1], # チャンネルのストライドは1
padding='SAME' # パディングを行う
)
# フィルターのバイアス
b1 = tf.Variable(
tf.constant(0.1, # 0.1で初期化
shape=[32] # フィルターの数だけ用意
))
# 活性化関数
out_conv1 = tf.nn.relu(conv1 + b1)
# 第2層:プーリング層(ニューロン数=32)
with tf.name_scope("layer2_pooling"):
# 最大プーリング
out_pool1 = tf.nn.max_pool(
out_conv1,
ksize=[1, # データに対するウィンドウサイズ
2, # タテ方向のウィンドウサイズ
2, # ヨコ方向のウィンドウサイズ
1 # チャンネル方向のウィンドウサイズ
],
strides=[1, # データのストライド量
2, # タテ方向のストライド量
2, # ヨコ方向のストライド量
1 # チャンネル方向のストライド量
],
padding='SAME' # フィルターに対するパディングを行う
)
# 第3層:畳み込み層(ニューロン数=64)
with tf.name_scope("layer3_conv"):
# 2次元フィルター(3×3を32枚)
f2 = tf.Variable(
tf.truncated_normal([3, # フィルターの縦サイズ
3, # フィルターの横サイズ
32, # 画像のチャンネル数
64], # フィルターの枚数
stddev=0.1))
# 畳み込みを行うノード
conv2 = tf.nn.conv2d(out_pool1, # 14×14の画像
f2, # 2次元フィルター
strides=[1, # 入力データを1ずつストライド
1, # 縦方向のストライドは1
1, # 横方向のストライドは1
1], # チャンネルのストライドは1
padding='SAME')
# フィルターのバイアス
b2 = tf.Variable(
tf.constant(0.1, # 0.1で初期化
shape=[64] # フィルターの数だけ用意
))
# 活性化関数
out_conv2 = tf.nn.relu(conv2 + b2)
# 第4層:プーリング層(ニューロン数=64)
with tf.name_scope("layer4_pooling"):
# 最大プーリング
out_pool2 = tf.nn.max_pool(
out_conv2, ksize=[1, # データに対するウィンドウサイズ
2, # タテ方向のウィンドウサイズ
2, # ヨコ方向のウィンドウサイズ
1], # チャンネル方向のウィンドウサイズ
strides=[1, # データのストライド量
2, # タテ方向のストライド量
2, # ヨコ方向のストライド量
1], # チャンネル方向のストライド量
padding='SAME')
# ドロップアウト
with tf.name_scope("dropout"):
# 非ドロップアウト率を保持するプレースホルダー
keep_prob = tf.placeholder(tf.float32)
# ドロップアウトを行うノード
out_drop = tf.nn.dropout(out_pool2, keep_prob)
# Flatten(ニューロン数=7×7×64)
# (画像の枚数, 7(タテ),7(ヨコ),64(チャンネル))を
# (画像の枚数, 7×7×64)の2次元に変換
with tf.name_scope("flatten"):
out_flat = tf.reshape(out_drop,
[-1, 7*7*64] # (画像の枚数, 画像データ)
)
# 第5層(全結合層):ニューロン数=1024
with tf.name_scope("layer5_binding"):
# 重み
w_comb1 = tf.Variable(tf.truncated_normal([7*7*64, 1024], stddev=0.1))
# バイアス
b_comb1 = tf.Variable(tf.constant(0.1, shape=[1024]))
# 活性化関数
out_comb1 = tf.nn.relu(tf.matmul(out_flat, w_comb1) + b_comb1)
# 第6層(出力層):ニューロン数=10
with tf.name_scope("layer7_output"):
# 重み
w_comb2 = tf.Variable(tf.truncated_normal([1024, 10], stddev=0.1))
# バイアス(
b_comb2 = tf.Variable(tf.constant(0.1, shape=[10]))
# 活性化関数
out = tf.nn.softmax(tf.matmul(out_comb1, w_comb2) + b_comb2)
#誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(
-tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.GradientDescentOptimizer(
learning_rate
).minimize(loss)
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 変数を初期化するノード
init =tf.global_variables_initializer()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
sess.run(init) # initノードを実行して変数を初期化
for step in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels, # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
keep_prob: dropout} # プレースホルダーkeep_probにはドロップアウトをセット
)
# 100回ごとにテストデータを使用して精度を出力
step += 1
if step % 100 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels, # プレースホルダーtにはテストデータの正解ラベルをセット
keep_prob: 1.0} # 非ドロップアウト率を1.0にする
)
print('Step %d: accuracy = %.2f' % (step, acc_val))
畳み込みやプーリングの記述が長いですが、全体の流れは先ほどと同じです。
TensorFlow(tf.layers)
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.01 # 学習率
training_epochs = 1500 # エポック数
batch_size = 100 # ミニバッチのサイズ
dropout = 0.5 # 非ドロップアウト率
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層
with tf.name_scope("input"):
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
out_img = tf.reshape(x,[-1,28,28,1])
# 第1層:畳み込み層(ニューロン数=32)
with tf.name_scope("layer1_conv"):
out_conv1 = tf.layers.conv2d(
inputs=out_img,
filters=32,
kernel_size=[3, 3],
padding="same",
activation=tf.nn.relu)
# 第2層:プーリング層(ニューロン数=32)
with tf.name_scope("layer2_pooling"):
out_pool1 = tf.layers.max_pooling2d(inputs=out_conv1, pool_size=[2, 2], strides=2, padding='same')
# 第3層:畳み込み層(ニューロン数=64)
with tf.name_scope("layer3_conv"):
out_conv2 = tf.layers.conv2d(
inputs=out_pool1,
filters=64,
kernel_size=[3, 3],
padding="same",
activation=tf.nn.relu)
# 第4層:プーリング層(ニューロン数=64)
with tf.name_scope("layer4_pooling"):
out_pool2 = tf.layers.max_pooling2d(inputs=out_conv2, pool_size=[2, 2], strides=2, padding='same')
# ドロップアウト
with tf.name_scope("dropout"):
# 非ドロップアウト率を保持するプレースホルダー
keep_prob = tf.placeholder(tf.float32)
# ドロップアウトを行うノード
out_drop = tf.layers.dropout(out_pool2, rate=keep_prob)
# Flatten(ニューロン数=7×7×64)
# (画像の枚数, 7(タテ),7(ヨコ),64(チャンネル))を
# (画像の枚数, 7×7×64)の2次元に変換
with tf.name_scope("flatten"):
out_flat = tf.layers.flatten(out_drop)
# 第5層(全結合層):ニューロン数=1024
with tf.name_scope("layer5_binding"):
out_comb1 = tf.layers.dense(inputs=out_flat, units=1024, activation=tf.nn.relu)
# 第6層(出力層):ニューロン数=10
with tf.name_scope("layer7_output"):
out = tf.layers.dense(inputs=out_comb1,units=10,activation=tf.nn.softmax)
#誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(
-tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.GradientDescentOptimizer(
learning_rate
).minimize(loss)
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 変数を初期化するノード
init =tf.global_variables_initializer()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
sess.run(init) # initノードを実行して変数を初期化
for step in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels, # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
keep_prob: dropout} # プレースホルダーkeep_probにはドロップアウトをセット
)
# 100回ごとにテストデータを使用して精度を出力
step += 1
if step % 100 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels, # プレースホルダーtにはテストデータの正解ラベルをセット
keep_prob: 1.0} # 非ドロップアウト率を1.0にする
)
print('Step %d: accuracy = %.2f' % (step, acc_val))
畳み込みとプーリングが多少スッキリ書けましたね。
TensorFlow(tf.keras)
import tensorflow as tf
from tensorflow.keras.backend import set_session
# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist
(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)
train_images, test_images = train_images / 255.0, test_images / 255.0
train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)
# 2.モデルの作成部分
learning_rate = 0.01 # 学習率
training_epochs = 5 # エポック数
batch_size = 50 # ミニバッチのサイズ
dropout = 0.5 # 非ドロップアウト率
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
set_session(tf.Session(config=config))
# モデル作成(クラスを使う場合)
class CNNModel(tf.keras.Model):
def __init__(self):
super(CNNModel, self).__init__()
self.conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(28,28,1))
self.pool1 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same')
self.conv2 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
self.pool2 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same')
self.drop1 = tf.keras.layers.Dropout(dropout)
self.flat1 = tf.keras.layers.Flatten()
self.dense1 = tf.keras.layers.Dense(1024, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(10, activation=tf.nn.softmax)
def call(self, inputs):
h = self.conv1(inputs)
h = self.pool1(h)
h = self.conv2(h)
h = self.pool2(h)
h = self.drop1(h)
h = self.flat1(h)
h = self.dense1(h)
return self.dense2(h)
model = CNNModel()
model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)
今回はモデルをクラスで書いてみました。こうするとChainerっぽく感じますね。
RNN編
最後はRNNです。やはりMNISTの分類を行います。
28x28の画像を、28画素が28時刻分並んだ時系列データとみなして無理やりRNNに入れています。
本来であれば、テキストの分類や機械翻訳、文章生成に使われる手法です。
Chainer
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions
# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color
batch_size = 10 # バッチサイズ
uses_device = 0 # GPU#0を使用(CPU=-1)
n_in = 28 # 1時刻当たりのデータ数
n_time = 28 # 時系列の数
n_unit = 128 # 中間層のセルの数
# 2.モデルの作成
class RNN(chainer.Chain):
def __init__(self):
super(RNN, self).__init__()
with self.init_scope():
self.lstm1 = L.NStepLSTM(3, n_in,n_unit, dropout=0.) #3層構造にする
self.l1 = L.Linear(None, 10)
def __call__(self, x, t=None, train=True):
h = F.reshape(x, (-1,28,28))
h = [i for i in h]
_, _, h = self.lstm1(None, None, h)
h = F.concat([F.expand_dims(i, axis=0) for i in h], axis=0)
last_output = h[:,-1,:] # 最後の時系列情報を取得
h = self.l1(last_output)
# 損失か結果を返す
return F.softmax_cross_entropy(h, t) if train else F.softmax(h)
# 3.Updater(パラメータ更新手法)の作成
class MNISTUpdater(training.StandardUpdater):
def __init__(self, train_iter, optimizer, device):
super(MNISTUpdater, self).__init__(
train_iter,
optimizer,
device=device
)
def update_core(self):
# データを1バッチ分取得
batch = self.get_iterator('main').next()
x, t = chainer.dataset.convert.concat_examples(batch, self.device) # ここではx, t = self.converter(batch, self.device)でも良い
# Optimizerを取得
optimizer = self.get_optimizer('main')
# modelを取得
net = optimizer.target
# パラメータを更新(カッコの中には損失関数が入る、ここでは関数netにxとtを入力して計算された損失)
optimizer.update(net, x, t)
# 損失関数が複雑でUpdaterクラスのメンバーとして定義する場合
# optimizer.target.cleargrads() で計算前に勾配をクリアにする
# loss = self.loss_1(x,t) + self.loss_2(x,t) のように損失を計算する(例)
# loss.backward() で誤差逆伝播の計算をする
# optimizer.update() で更新する
# という書き方もできる
# 4.実行する
# モデルを作成
model = RNN()
# GPUに対応させる
if uses_device >= 0:
chainer.cuda.get_device_from_id(uses_device).use()
chainer.cuda.check_cuda_available()
# GPU用データ形式に変換
model.to_gpu()
# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)
# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)
# デバイスを選択してTrainer(訓練)を作成する
updater = MNISTUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする
# 機械学習を実行する
trainer.run()
無理やり書き方を合わせたので、精度はそんなにです。
PyTorch
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
# 1.データセットを用意する
training_epochs = 1000 # エポック数
batch_size = 10 # バッチサイズ
n_in = 28 # 1時刻当たりのデータ数
n_time = 28 # 時系列の数
n_unit = 128 # 中間層のセルの数
trainset = torchvision.datasets.MNIST(root='./data',
train=True,
download=True,
transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
batch_size=batch_size,
shuffle=True)
testset = torchvision.datasets.MNIST(root='./data',
train=False,
download=True,
transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset,
batch_size=batch_size,
shuffle=False)
# 2.モデルの作成
class RNN(nn.Module):
def __init__(self):
super(RNN, self).__init__()
self.lstm = nn.LSTM(n_in, n_unit, 3) # 3層構造にする
self.l1 = nn.Linear(128, 10)
def forward(self, x):
h = x.view(-1, 28, 28)
h, _ = self.lstm(h, None)
last_output = h[:,-1,:] # 最後の時系列情報を取得
h = self.l1(last_output)
return h
# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = RNN().to(device)
# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 訓練する
for epoch in range(training_epochs):
running_loss = 0.0
for i, (inputs, labels) in enumerate(trainloader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad() # 計算前に勾配をゼロにする
outputs = model(inputs) # 順伝播の計算をする
loss = criterion(outputs, labels) # 損失を計算する
loss.backward() # 誤差逆伝播の計算をする
optimizer.step() # 更新する
# 損失を定期的に出力する
running_loss += loss.item()
if i % 100 == 99:
print('[{:d}, {:5d}] loss: {:3f}'
.format(epoch + 1, i + 1, running_loss / 100))
running_loss = 0.0
print('Finished Training')
# テストする
correct = 0
total = 0
with torch.no_grad():
for (images, labels) in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))
同じく無理やり書き方を合わせたので、精度はそんなにです。
TensorFlow
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
training_epochs = 1200 # エポック数
batch_size = 100 # ミニバッチのサイズ
n_in = 28 # 1時刻当たりのデータ数
n_time = 28 # 時系列の数
n_unit = 128 # 中間層のセルの数
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層(ニューロン数=28)
with tf.name_scope("input"):
# 入力データなど、実行時に確定する値はプレースホルダーで扱う
x = tf.placeholder(tf.float32, [None, 784])
# [データ数, 28時刻, 28画素]の3次元配列に変換
input = tf.reshape(x, [-1, n_in, n_time])
# 中間層(3層構造)
with tf.name_scope("layer_lstm"):
# 中間層として1ユニットあたり128個のLSTMセルを配置し、これを3個生成
stacked_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_unit) for _ in range(3)]
# 3個の中間層をラッピングして3層構造にする
cell = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_cells)
# 出力:[バッチサイズ, 時間長(n_time), 出力長(n_unit)]
outputs, _ = tf.nn.dynamic_rnn(cell=cell, # LSTMのセルを指定
inputs=input, # 入力を指定
dtype=tf.float32 # 出力データの型を指定
)
# 出力層(ニューロン数=10)
with tf.name_scope("layer_output"):
# [データ数, 時間長(n_time), 出力長(n_unit)]から
# 最後の時系列情報を取得
last_output = outputs[:,-1,:]
# 重み
w = tf.Variable(tf.truncated_normal([n_unit,10], stddev=0.1))
# バイアス
b = tf.Variable(tf.zeros([10]))
# 活性化関数
out = tf.nn.softmax(tf.matmul(last_output, w ) + b)
# 誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(
-tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.AdamOptimizer().minimize(loss)
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 変数を初期化するノード
init = tf.global_variables_initializer()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
sess.run(init) # initノードを実行して変数を初期化
# 訓練(学習)の実行
for epoch in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels} # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
)
# 100エポックごとにテストデータを使用して精度を出力
epoch += 1
if epoch % 100 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels} # プレースホルダーtにはテストデータの正解ラベルをセット
)
print('(%d): accuracy = %.2f' % (epoch, acc_val))
精度はそこそこ良いです。
TensorFlow(tf.layers)
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く
# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images # テスト用の画像
test_labels = mnist.test.labels # テスト用の正解ラベル
# 2.データフローグラフ(設計図)の作成部分
training_epochs = 1200 # エポック数
batch_size = 100 # ミニバッチのサイズ
n_in = 28 # 1時刻当たりのデータ数
n_time = 28 # 時系列の数
n_unit = 128 # 中間層のセルの数
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
# 入力層(ニューロン数=28)
with tf.name_scope("input"):
# 入力データなど、実行時に確定する値はプレースホルダーで扱う
x = tf.placeholder(tf.float32, [None, 784])
# [データ数, 28時刻, 28画素]の3次元配列に変換
input = tf.reshape(x, [-1, n_in, n_time])
# 中間層(3層構造)
with tf.name_scope("layer_lstm"):
# 中間層として1ユニットあたり128個のLSTMセルを配置し、これを3個生成
stacked_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_unit) for _ in range(3)]
# 3個の中間層をラッピングして3層構造にする
cell = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_cells)
# 出力:[バッチサイズ, 時間長(n_time), 出力長(n_unit)]
outputs, _ = tf.nn.dynamic_rnn(cell=cell, # LSTMのセルを指定
inputs=input, # 入力を指定
dtype=tf.float32 # 出力データの型を指定
)
# 出力層(ニューロン数=10)
with tf.name_scope("layer_output"):
# [データ数, 時間長(n_time), 出力長(n_unit)]から
# 最後の時系列情報を取得
last_output = outputs[:,-1,:]
out = tf.layers.dense(inputs=last_output,units=10,activation=tf.nn.softmax)
# 誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
# 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
t = tf.placeholder(tf.float32, [None, 10])
loss = tf.reduce_mean(
-tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
)
# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
train_step = tf.train.AdamOptimizer().minimize(loss)
# 評価
with tf.name_scope("accuracy"):
# (out=t)の最大値を比較
correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
# True(正解=1)の平均を取る
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# 変数を初期化するノード
init = tf.global_variables_initializer()
# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
sess.run(init) # initノードを実行して変数を初期化
# 訓練(学習)の実行
for epoch in range(training_epochs):
# ミニバッチを抽出
train_images, train_labels = mnist.train.next_batch(batch_size)
sess.run(
train_step, # 訓練を実行
feed_dict={x:train_images, # プレースホルダーxには訓練データのミニバッチをセット
t:train_labels} # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
)
# 100エポックごとにテストデータを使用して精度を出力
epoch += 1
if epoch % 100 == 0:
acc_val = sess.run(
accuracy, # 評価を実行
feed_dict={x:test_images, # プレースホルダーxにはテストテータをセット
t:test_labels} # プレースホルダーtにはテストデータの正解ラベルをセット
)
print('(%d): accuracy = %.2f' % (epoch, acc_val))
tf.layersをあまり使える箇所がないため、ほぼ変わらないです。
TensorFlow(tf.keras)
import tensorflow as tf
from tensorflow.keras.backend import set_session
# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist
(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape(train_images.shape[0], 28, 28)
test_images = test_images.reshape(test_images.shape[0], 28, 28)
train_images, test_images = train_images / 255.0, test_images / 255.0
train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)
# 2.モデルの作成部分
learning_rate = 0.01 # 学習率
training_epochs = 5 # エポック数
batch_size = 50 # ミニバッチのサイズ
n_in = 28 # 1時刻当たりのデータ数
n_time = 28 # 時系列の数
n_unit = 128 # 中間層のセルの数
# GPUを指定する
config = tf.ConfigProto(
gpu_options=tf.GPUOptions(
visible_device_list="0", # GPU番号を指定
allow_growth=True
)
)
set_session(tf.Session(config=config))
# モデル作成(クラスを使う場合)
class RNNModel(tf.keras.Model):
def __init__(self):
super(RNNModel, self).__init__()
stacked_cells = [tf.keras.layers.LSTMCell(n_unit) for _ in range(3)]
self.lstm1 = tf.keras.layers.RNN(stacked_cells,return_sequences=True)
self.dense1 = tf.keras.layers.Dense(10, activation=tf.nn.softmax)
def call(self, inputs):
h = self.lstm1(inputs)
last_output = h[:,-1,:]
h = self.dense1(last_output)
return h
model = RNNModel()
model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy'])
model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)
無理やり書き方を合わせたので、精度はそんなにです。
TensorFlow公式のサンプルを見ればわかりますが、tf.keras.layers.LSTMCellで書いたのは少し冗長で、tf.keras.layers.LSTMを使う方が普通です。
まとめ
自分のライブラリの勉強を兼ねて、長々とN番煎じの記事を執筆してしまいましたが、今回機械系 Advent Calendar 2018を勢いのままに作って、参加できたのはとても楽しかったです。
今回はそこまで参加者は多くありませんでしたが、やりたい人がやれるように場所を作ったのは良い機会だったと思います。
興味ある方は来年もぜひ参加してみてください。それでは。
参考
本記事は以下の書籍とサイトを参考にさせていただきました。
Chainerで作る コンテンツ自動生成AIプログラミング入門
TensorFlow&Kerasプログラミング実装ハンドブック
Chainer公式
PyTorch公式
TensorFlow公式