LoginSignup
4
12

More than 1 year has passed since last update.

型で覚えるDeep Learningのライブラリ【2021年度版】

Posted at

はじめに

※この記事は以前アドベントカレンダーで書いた記事の更新版です。TensorFlow v2での書き方やモデルの保存、自前のデータを利用するためのカスタムデータセットの書き方を追記しました。(それ以外の部分は変更していません。)また、Chainerはメンテナンスフェーズに移行しましたが一応載せたままにします。

Deep Learningに関することをやろうとする人にとって悩ましいのが、どのライブラリ使えばええねん問題だと思います。
日本発だしChainer?FacebookのPyTorch?GoogleのTensorFlow?

個人的にはChainerやPyTorchの書き方がスッキリしていて好きなのですが、世界的にはKerasやGoogle Colaboratoryの登場、特許関係によってTensorFlowの勢いが増しているように感じます。(というか実際そうですね)

ここでは、N番煎じではありますが、それぞれの書き方を並べて紹介することで、そのおおまかな型のようなものをつかんでみようと思います。
まあ、執筆者の勉強のついでなので、たいしたものではないですが、ご了承ください。

注意

比較のしやすさのために冗長な書き方になっている箇所があります。
また、Deep Learning自体については説明なしです。コードだけです。
あと、これは時間をかけたくないという怠惰な理由からなのですが、全然層数がDeepじゃないです。Deepにしても使用できますので、ご了承ください。

MLP編

まずは基本的なMLPを見ていきましょう。有名なMNISTの分類を行います。

サンプルコード

Chainer

mlp1.py
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions

# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color

batch_size = 10                # バッチサイズ
uses_device = 0                # GPU#0を使用(CPU=-1)

# 2.モデルの作成
class MLP(chainer.Chain):

    def __init__(self):
        super(MLP, self).__init__()
        with self.init_scope():
            self.l1 = L.Linear(None, 64)
            self.l2 = L.Linear(None, 10)

    def __call__(self, x, t=None, train=True):
        h = F.relu(self.l1(x))
        h = self.l2(h)
        # 訓練時には損失を、テスト時には結果を返す
        return F.softmax_cross_entropy(h, t) if train else F.softmax(h)

# 3.Updater(パラメータ更新手法)の作成
# ここではStandardUpdaterを使用するため、必要ない

# 4.実行する
# モデルを作成
model = MLP()

# GPUに対応させる
if uses_device >= 0:
    chainer.cuda.get_device_from_id(uses_device).use()
    chainer.cuda.check_cuda_available()
    # GPU用データ形式に変換
    model.to_gpu()

# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)
test_iter = iterators.SerialIterator(test, batch_size, repeat=False, shuffle=False)

# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)

# デバイスを選択してTrainer(訓練)を作成する
updater = training.StandardUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.Evaluator(test_iter, model, device=uses_device)) # テストをTrainerに設定する
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする

# 機械学習を実行する
trainer.run()

Chainerは大きく以下の4つの部分に分けられます。
1. データセットを用意する部分
2. モデルを作成する部分
3. Updater(パラメータ更新手法)を作成する部分
4. 実行する部分

何と言っても特徴的なのが、Updaterです。学習時のパラメータの更新の方法を記述することができるのですが、今回は特にいじらずStandardUpdaterをお手軽に使用します。これとTrainerというものを利用することで、何エポックも訓練させる部分をスッキリ書くことができます。普通であれば、次に紹介するPyTorchのようにfor文を使用しますが、この部分をうまく隠蔽しているのです。また、Trainerにはextensionという便利なオプションをつけることが可能です。

PyTorch

mlp2.py
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F 
from torch.utils.data import Dataset, DataLoader, TensorDataset

# 1.データセットを用意する
training_epochs = 5            # エポック数
batch_size = 10                # バッチサイズ

trainset = torchvision.datasets.MNIST(root='./data', 
                                        train=True,
                                        download=True,
                                        transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
                                            batch_size=batch_size,
                                            shuffle=True)

testset = torchvision.datasets.MNIST(root='./data', 
                                        train=False, 
                                        download=True, 
                                        transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset, 
                                            batch_size=batch_size,
                                            shuffle=False)

# 2.モデルの作成
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.l1 = nn.Linear(784,64)
        self.l2 = nn.Linear(64, 10)

    def forward(self, x):
        h = x.view(-1, 28 * 28) # 28*28の画像から1次元ベクトルにリサイズする
        h = F.relu(self.l1(h))
        return self.l2(h)

# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = MLP().to(device)

# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 訓練する
for epoch in range(training_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(trainloader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad() # 計算前に勾配をゼロにする
        outputs = model(inputs) # 順伝播の計算をする
        loss = criterion(outputs, labels) # 損失を計算する
        loss.backward() # 誤差逆伝播の計算をする
        optimizer.step() # 更新する

        # 損失を定期的に出力する
        running_loss += loss.item()
        if i % 100 == 99:
            print('[{:d}, {:5d}] loss: {:3f}'
                    .format(epoch + 1, i + 1, running_loss / 100))
            running_loss = 0.0

print('Finished Training')

# テストする
correct = 0
total = 0

with torch.no_grad():
    for (images, labels) in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))

PyTorchは大きく以下の3つの部分に分けられます。
1. データセットを用意する部分
2. モデルを作成する部分
3. 実行する部分

Chainerと書き方はほぼ同じなのですが、Updaterはなく、訓練時にはfor文を使用して書きます。ただ、for文の中に記述されている処理もCNN編で紹介するChainerのUpdaterの中の処理とほぼ書き方が同じですので、ご安心ください。(Trainerもあるようですが、あまり使われているのを見たことがありません。)ChainerとPyTorchは対応関係がわかりやすく、片方を知っていればもう片方の学習コストはかなり低くなると思います。

TensorFlow

mlp3.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.5    # 学習率
training_epochs = 1500 # エポック数
batch_size = 50        # ミニバッチのサイズ

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
img = tf.reshape(x,[-1,28,28,1])
# 画像をログとして出力
tf.summary.image("input_data", img, 20) 

# 隠れ層
with tf.name_scope("hidden"):
    # 重み(変数)
    w1 = tf.Variable(tf.truncated_normal([784, 64], stddev=0.1), name="w1")
    # バイアス(変数)
    b1 = tf.Variable(tf.zeros([64]), name="b1")
    # 活性化関数
    h1 = tf.nn.relu(tf.matmul(x, w1) + b1)

# 出力層
with tf.name_scope("output"):
    # 重み(変数)
    w2 = tf.Variable(tf.truncated_normal([64, 10], stddev=0.1), name="w2")
    # バイアス(変数)
    b2 = tf.Variable(tf.zeros([10]), name="b2")
    # 活性化関数
    out = tf.nn.softmax(tf.matmul(h1, w2) + b2)

# 誤差関数
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(tf.square(t - out))
    # 誤差をログとして出力
    tf.summary.scalar("loss", loss)

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.GradientDescentOptimizer(
        learning_rate   # 学習率
    ).minimize(loss)    # 最小化問題を解く

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
    # 精度をログとして出力
    tf.summary.scalar("accuracy", accuracy)

# 変数を初期化するノード
init =tf.global_variables_initializer()
# ログをマージするノード
summary_op = tf.summary.merge_all()

# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
    # FileWriterオブジェクトを生成する
    summary_writer = tf.summary.FileWriter(
        "logs",    # イベントファイルの保存先
        sess.graph # データフローグラフを視覚化する
        )
    sess.run(init)   # initノードを実行して変数を初期化
    for epoch in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels}     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
            )

        # 50回ごとにテストデータを使用して精度を出力
        epoch += 1
        if epoch % 50 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
            print('(%d) accuracy = %.2f' % (epoch, acc_val))
            # イベントログをsummary_strに代入
            summary_str = sess.run(
                summary_op,                # ログをマージするノードを実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
            # イベントファイルにログを追加
            summary_writer.add_summary(summary_str, epoch)

## $tensorboard --logdir="イベントファイルの入ったフォルダーのパス" でイベントログが確認出来るURLが与えられる
## 例:$tensorboard --logdir=./logs
## コマンド実行中にそのURL(例えばlocalhost:6006)にアクセスすれば良い
## Ctrl+Cで止められる

TensorFlowは大きく以下の3つの部分に分けられます。
1. データセットを用意する部分
2. データフローグラフ(設計図)の作成部分
3. セッションで実行する部分

おおまかにはPyTorchと同じ流れに感じますが、耳慣れない単語が出てきましたね。
実はこれまで紹介したものとは少し考え方が違っているのです。
ニューラルネットワークの計算を表現したものを計算グラフというのですが、先ほどまで紹介してきたChainerなどでは、計算の実行時にこの計算グラフが定義される「Define-by-Run」という方式が取られています。それに対してTensorFlowでは、計算グラフを学習の前にあらかじめ構築する「Define-and-Run」という方式が取られています。(ただし、Eager Execution for TensorFlowなど一部の機能では「Define-by-Run」方式が取られている)
そこで、TensorFlowではデータフローグラフ(設計図だと思えば良いです)を作ってから、セッションで実行して学習する、という手順になっているのです。
その他の特徴としては、見ての通りChainerやPyTorchほど隠蔽されていないので、書くのが大変なかわりに細かい設定ができたりします。
また、tensorboardという便利な可視化ツールを利用することができます。

TensorFlow(tf.layers)

mlp4.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.5    # 学習率
training_epochs = 1500 # エポック数
batch_size = 50        # ミニバッチのサイズ

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層
x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
# 画像データの部分を28×28の行列に変換
img = tf.reshape(x,[-1,28,28,1])
# 画像をログとして出力
tf.summary.image("input_data", img, 20) 

# 隠れ層
with tf.name_scope("hidden"):
    h1 = tf.layers.dense(inputs=x,units=64,activation=tf.nn.relu)

# 出力層
with tf.name_scope("output"):
    out = tf.layers.dense(inputs=h1,units=10,activation=tf.nn.softmax)

# 誤差関数
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(tf.square(t - out))
    # 誤差をログとして出力
    tf.summary.scalar("loss", loss)

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.GradientDescentOptimizer(
        learning_rate   # 学習率
    ).minimize(loss)    # 最小化問題を解く

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
    # 精度をログとして出力
    tf.summary.scalar("accuracy", accuracy)

# 変数を初期化するノード
init =tf.global_variables_initializer()
# ログをマージするノード
summary_op = tf.summary.merge_all()

# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
    # FileWriterオブジェクトを生成する
    summary_writer = tf.summary.FileWriter(
        "logs",    # イベントファイルの保存先
        sess.graph # データフローグラフを視覚化する
        )
    sess.run(init)   # initノードを実行して変数を初期化
    for epoch in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels}     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
            )

        # 50回ごとにテストデータを使用して精度を出力
        epoch += 1
        if epoch % 50 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
            print('(%d) accuracy = %.2f' % (epoch, acc_val))
            # イベントログをsummary_strに代入
            summary_str = sess.run(
                summary_op,                # ログをマージするノードを実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}) # プレースホルダーtにはテストデータの正解ラベルをセット
            # イベントファイルにログを追加
            summary_writer.add_summary(summary_str, epoch)

先ほどTensorFlowの説明の中で、かる〜く「書くのが大変ですが」などと流しましたが、実際にやってみると本当に大変です。TensorFlowに苦手意識を持つ原因として、機能や高レベルAPIが多すぎて混乱するいろいろな書き方ができてしまい人のコードが読みづらい、ということがあると僕は感じています。
そんな中、モデルの作成部分だけでもChainerやPyTorchのように書けるようになるのが、このtf.layersです。層をいくつも重ねなければならない時には助かりますね。モデルの作成部分以外は先ほどのTensorFlowのコードと同じです。

TensorFlow(tf.keras)

mlp5.py
import tensorflow as tf
from tensorflow.keras.backend import set_session

# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0

train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)

# 2.モデルの作成部分
learning_rate = 0.01    # 学習率
training_epochs = 5     # エポック数
batch_size = 50         # ミニバッチのサイズ

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)
set_session(tf.Session(config=config))

# モデル作成(クラスを使わない場合)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(64, activation=tf.nn.relu),
  tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])

model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)

さて、まだTensorFlowに苦手意識のある方に朗報です。このtf.kerasを使ってみてください。この短さ、もはや別物ですね笑。model.fitというところで訓練しているのですが、元々のTensorFlowに比べてかなり隠蔽されていると思います。細かい設定がしにくくなるかもしれませんが、初学者はここから入るのが良いかもしれません。ネット上にも実装例が多く転がっている印象です。

TensorFlow v2

mlp6.py
# https://www.tensorflow.org/tutorials/quickstart/advanced?hl=ja
import tensorflow as tf

# 1.データセットの準備部分
batch_size = 50         # ミニバッチのサイズ
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0

train_ds = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels)).shuffle(10000).batch(batch_size)
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

# 2.モデルの作成部分
learning_rate = 0.001   # 学習率
training_epochs = 5     # エポック数

class MLPModel(tf.keras.Model):

    def __init__(self):
        super(MLPModel, self).__init__()
        self.flat1 = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(64, activation='relu')
        self.dense2 = tf.keras.layers.Dense(10, activation='softmax')

    def call(self, inputs):
        h = self.flat1(inputs)
        h = self.dense1(h)
        return self.dense2(h)

model = MLPModel()

# 3.実行する部分
loss_object = tf.keras.losses.SparseCategoricalCrossentropy() # CategoricalCrossentropyと違い、ラベルのOne-hot化不要
optimizer = tf.keras.optimizers.Adam(learning_rate)

train_loss = tf.keras.metrics.Mean(name='train_loss') # 損失の記録用
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') # 精度の記録用
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# 訓練用
@tf.function
def train_step(images, labels):
  with tf.GradientTape() as tape:
    predictions = model(images)
    loss = loss_object(labels, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)

# テスト用
@tf.function
def test_step(images, labels):
  predictions = model(images)
  t_loss = loss_object(labels, predictions)

  test_loss(t_loss)
  test_accuracy(labels, predictions)

# 学習開始
for epoch in range(training_epochs):
  for images, labels in train_ds:
    train_step(images, labels)

  for test_images, test_labels in test_ds:
    test_step(test_images, test_labels)

  template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print (template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result()*100,
                         test_loss.result(),
                         test_accuracy.result()*100))

  # 次のエポック用にメトリクスをリセット
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

TensorFlowの新バージョンでは、「Define-by-Run」方式のライブラリに近い書き方で、高レベルAPIもtf.kerasに統一されています。tf.dataによるデータセット準備部分やtf.functionデコレータを使った学習の実行部分が特徴的ですが、全体的にシンプルにまとまっています。

CNN編

次はCNNです。畳み込みを用いてMNISTの分類を行います。
画像を扱う場合、必ずと言っていいほど使われる手法です。

サンプルコード

Chainer

cnn1.py
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions

# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color

batch_size = 10                # バッチサイズ
uses_device = 0                # GPU#0を使用(CPU=-1)

# 2.モデルの作成
class CNN(chainer.Chain):

    def __init__(self):
        super(CNN, self).__init__()
        with self.init_scope():
            self.c1 = L.Convolution2D(1, 8, ksize=3) # フィルタサイズ=3で出力数8
            self.l1 = L.Linear(None, 10)

    def __call__(self, x, t=None, train=True):
        h = F.relu(self.c1(x))
        h = F.max_pooling_2d(h, 2)
        h = self.l1(h)
        # 損失か結果を返す
        return F.softmax_cross_entropy(h, t) if train else F.softmax(h)

# 3.Updater(パラメータ更新手法)の作成
class MNISTUpdater(training.StandardUpdater):

    def __init__(self, train_iter, optimizer, device):
        super(MNISTUpdater, self).__init__(
            train_iter,
            optimizer,
            device=device
        )

    def update_core(self):
        # データを1バッチ分取得
        batch = self.get_iterator('main').next()
        x, t = chainer.dataset.convert.concat_examples(batch, self.device) # ここではx, t = self.converter(batch, self.device)でも良い
        # Optimizerを取得
        optimizer = self.get_optimizer('main')
        # modelを取得
        net = optimizer.target

        # パラメータを更新(カッコの中には損失関数が入る、ここでは関数netにxとtを入力して計算された損失)
        optimizer.update(net, x, t)

        # 損失関数が複雑でUpdaterクラスのメンバーとして定義する場合
        # optimizer.target.cleargrads() で計算前に勾配をクリアにする
        # loss = self.loss_1(x,t) + self.loss_2(x,t) のように損失を計算する(例)
        # loss.backward() で誤差逆伝播の計算をする
        # optimizer.update() で更新する
        # という書き方もできる

# 4.実行する
# モデルを作成
model = CNN()

# GPUに対応させる
if uses_device >= 0:
    chainer.cuda.get_device_from_id(uses_device).use()
    chainer.cuda.check_cuda_available()
    # GPU用データ形式に変換
    model.to_gpu()

# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)

# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)

# デバイスを選択してTrainer(訓練)を作成する
updater = MNISTUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする

# 機械学習を実行する
trainer.run()

今回はUpdaterを自作してみました。中身がPyTorchとほぼ同じ書き方であることがわかると思います。

PyTorch

cnn2.py
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F 
from torch.utils.data import Dataset, DataLoader, TensorDataset

# 1.データセットを用意する
training_epochs = 5            # エポック数
batch_size = 10                # バッチサイズ

trainset = torchvision.datasets.MNIST(root='./data', 
                                        train=True,
                                        download=True,
                                        transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
                                            batch_size=batch_size,
                                            shuffle=True)

testset = torchvision.datasets.MNIST(root='./data', 
                                        train=False, 
                                        download=True, 
                                        transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset, 
                                            batch_size=batch_size,
                                            shuffle=False)

# 2.モデルの作成
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()        
        self.c1 =  nn.Conv2d(1, 8, 3) # フィルタサイズ=3で出力数8
        self.l1 = nn.Linear(8*13*13, 10)

    def forward(self, x):
        h = F.relu(self.c1(x))
        h = F.max_pool2d(h, (2, 2))
        h = h.view(-1, 8*13*13)
        h = self.l1(h)
        return h

# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)

# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 訓練する
for epoch in range(training_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(trainloader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad() # 計算前に勾配をゼロにする
        outputs = model(inputs) # 順伝播の計算をする
        loss = criterion(outputs, labels) # 損失を計算する
        loss.backward() # 誤差逆伝播の計算をする
        optimizer.step() # 更新する

        # 損失を定期的に出力する
        running_loss += loss.item()
        if i % 100 == 99:
            print('[{:d}, {:5d}] loss: {:3f}'
                    .format(epoch + 1, i + 1, running_loss / 100))
            running_loss = 0.0

print('Finished Training')

# テストする
correct = 0
total = 0

with torch.no_grad():
    for (images, labels) in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))

畳み込みに変えただけで、MLPとほぼ同じですね。

TensorFlow

cnn3.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.01   # 学習率
training_epochs = 1500 # エポック数
batch_size = 100       # ミニバッチのサイズ
dropout = 0.5          # 非ドロップアウト率

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層
with tf.name_scope("input"):
    x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
    # 画像データの部分を28×28の行列に変換
    out_img = tf.reshape(x,[-1,28,28,1])

# 第1層:畳み込み層(ニューロン数=32)
with tf.name_scope("layer1_conv"):
    # 2次元フィルター(3×3を32枚)
    f1 = tf.Variable(
        tf.truncated_normal([3,         # フィルターの縦サイズ
                             3,         # フィルターの横サイズ
                             1,         # 画像のチャンネル数
                             32],       # フィルターの枚数
                            stddev=0.1))
    # 畳み込みを行うノード
    conv1 = tf.nn.conv2d(out_img,       # 28×28の画像
                         f1,            # 2次元フィルター
                         strides=[1,    # 入力データを1ずつ順送り
                                  1,    # 縦方向のストライドは1
                                  1,    # 横方向のストライドは1
                                  1],   # チャンネルのストライドは1
                         padding='SAME' # パディングを行う
                        )
    # フィルターのバイアス
    b1 = tf.Variable(
        tf.constant(0.1,                # 0.1で初期化
                    shape=[32]          # フィルターの数だけ用意
                    ))
    # 活性化関数
    out_conv1 = tf.nn.relu(conv1 + b1)

# 第2層:プーリング層(ニューロン数=32)
with tf.name_scope("layer2_pooling"):
    # 最大プーリング
    out_pool1 = tf.nn.max_pool(
        out_conv1,
        ksize=[1,      # データに対するウィンドウサイズ
               2,      # タテ方向のウィンドウサイズ
               2,      # ヨコ方向のウィンドウサイズ
               1       # チャンネル方向のウィンドウサイズ
              ],
        strides=[1,    # データのストライド量
                 2,    # タテ方向のストライド量
                 2,    # ヨコ方向のストライド量
                 1     # チャンネル方向のストライド量
                ],
        padding='SAME' # フィルターに対するパディングを行う
    )

# 第3層:畳み込み層(ニューロン数=64)
with tf.name_scope("layer3_conv"):
    # 2次元フィルター(3×3を32枚)
    f2 = tf.Variable(
        tf.truncated_normal([3,          # フィルターの縦サイズ
                             3,          # フィルターの横サイズ
                             32,         # 画像のチャンネル数
                             64],        # フィルターの枚数
                            stddev=0.1))
    # 畳み込みを行うノード
    conv2 = tf.nn.conv2d(out_pool1,      # 14×14の画像
                         f2,             # 2次元フィルター
                         strides=[1,     # 入力データを1ずつストライド
                                  1,     # 縦方向のストライドは1
                                  1,     # 横方向のストライドは1
                                  1],    # チャンネルのストライドは1
                         padding='SAME')
    # フィルターのバイアス
    b2 = tf.Variable(
        tf.constant(0.1,                 # 0.1で初期化
                    shape=[64]           # フィルターの数だけ用意
                   ))
    # 活性化関数
    out_conv2 = tf.nn.relu(conv2 + b2)

# 第4層:プーリング層(ニューロン数=64)
with tf.name_scope("layer4_pooling"):
    # 最大プーリング
    out_pool2 = tf.nn.max_pool(
        out_conv2, ksize=[1,      # データに対するウィンドウサイズ
                          2,      # タテ方向のウィンドウサイズ
                          2,      # ヨコ方向のウィンドウサイズ
                          1],     # チャンネル方向のウィンドウサイズ
        strides=[1,    # データのストライド量
                 2,    # タテ方向のストライド量
                 2,    # ヨコ方向のストライド量
                 1],   # チャンネル方向のストライド量
        padding='SAME')

# ドロップアウト
with tf.name_scope("dropout"):
    # 非ドロップアウト率を保持するプレースホルダー
    keep_prob = tf.placeholder(tf.float32)
    # ドロップアウトを行うノード
    out_drop = tf.nn.dropout(out_pool2, keep_prob)

# Flatten(ニューロン数=7×7×64)
# (画像の枚数, 7(タテ),7(ヨコ),64(チャンネル))を
# (画像の枚数, 7×7×64)の2次元に変換
with tf.name_scope("flatten"):
    out_flat = tf.reshape(out_drop,
                              [-1, 7*7*64] # (画像の枚数, 画像データ)
                             )

# 第5層(全結合層):ニューロン数=1024
with tf.name_scope("layer5_binding"):
    # 重み
    w_comb1 = tf.Variable(tf.truncated_normal([7*7*64, 1024], stddev=0.1))
    # バイアス
    b_comb1 = tf.Variable(tf.constant(0.1, shape=[1024]))
    # 活性化関数
    out_comb1 = tf.nn.relu(tf.matmul(out_flat, w_comb1) + b_comb1)

# 第6層(出力層):ニューロン数=10
with tf.name_scope("layer7_output"):
    # 重み
    w_comb2 = tf.Variable(tf.truncated_normal([1024, 10], stddev=0.1))
    # バイアス(
    b_comb2 = tf.Variable(tf.constant(0.1, shape=[10]))
    # 活性化関数
    out = tf.nn.softmax(tf.matmul(out_comb1, w_comb2) + b_comb2)

#誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(
        -tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
        )

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.GradientDescentOptimizer(
        learning_rate
    ).minimize(loss)

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

# 変数を初期化するノード
init =tf.global_variables_initializer()

# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
    sess.run(init)   # initノードを実行して変数を初期化

    for step in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels,     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
                       keep_prob: dropout} # プレースホルダーkeep_probにはドロップアウトをセット
            )

        # 100回ごとにテストデータを使用して精度を出力
        step += 1
        if step % 100 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels,  # プレースホルダーtにはテストデータの正解ラベルをセット
                           keep_prob: 1.0} # 非ドロップアウト率を1.0にする
            )
            print('Step %d: accuracy = %.2f' % (step, acc_val))

畳み込みやプーリングの記述が長いですが、全体の流れは先ほどと同じです。

TensorFlow(tf.layers)

cnn4.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
learning_rate = 0.01   # 学習率
training_epochs = 1500 # エポック数
batch_size = 100       # ミニバッチのサイズ
dropout = 0.5          # 非ドロップアウト率

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層
with tf.name_scope("input"):
    x = tf.placeholder(tf.float32, [None, 784]) # 入力データなど、実行時に確定する値はプレースホルダーで扱う
    # 画像データの部分を28×28の行列に変換
    out_img = tf.reshape(x,[-1,28,28,1])

# 第1層:畳み込み層(ニューロン数=32)
with tf.name_scope("layer1_conv"):
    out_conv1 = tf.layers.conv2d(
          inputs=out_img,
          filters=32,
          kernel_size=[3, 3],
          padding="same",
          activation=tf.nn.relu)

# 第2層:プーリング層(ニューロン数=32)
with tf.name_scope("layer2_pooling"):
    out_pool1 = tf.layers.max_pooling2d(inputs=out_conv1, pool_size=[2, 2], strides=2, padding='same')

# 第3層:畳み込み層(ニューロン数=64)
with tf.name_scope("layer3_conv"):
    out_conv2 = tf.layers.conv2d(
          inputs=out_pool1,
          filters=64,
          kernel_size=[3, 3],
          padding="same",
          activation=tf.nn.relu)

# 第4層:プーリング層(ニューロン数=64)
with tf.name_scope("layer4_pooling"):
    out_pool2 = tf.layers.max_pooling2d(inputs=out_conv2, pool_size=[2, 2], strides=2, padding='same')

# ドロップアウト
with tf.name_scope("dropout"):
    # 非ドロップアウト率を保持するプレースホルダー
    keep_prob = tf.placeholder(tf.float32)
    # ドロップアウトを行うノード
    out_drop = tf.layers.dropout(out_pool2, rate=keep_prob)

# Flatten(ニューロン数=7×7×64)
# (画像の枚数, 7(タテ),7(ヨコ),64(チャンネル))を
# (画像の枚数, 7×7×64)の2次元に変換
with tf.name_scope("flatten"):
    out_flat = tf.layers.flatten(out_drop)

# 第5層(全結合層):ニューロン数=1024
with tf.name_scope("layer5_binding"):
    out_comb1 = tf.layers.dense(inputs=out_flat, units=1024, activation=tf.nn.relu)

# 第6層(出力層):ニューロン数=10
with tf.name_scope("layer7_output"):
    out = tf.layers.dense(inputs=out_comb1,units=10,activation=tf.nn.softmax)

#誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(
        -tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
        )

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.GradientDescentOptimizer(
        learning_rate
    ).minimize(loss)

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

# 変数を初期化するノード
init =tf.global_variables_initializer()

# 3.セッションで実行する部分
with tf.Session(config=config) as sess:
    sess.run(init)   # initノードを実行して変数を初期化

    for step in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels,     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
                       keep_prob: dropout} # プレースホルダーkeep_probにはドロップアウトをセット
            )

        # 100回ごとにテストデータを使用して精度を出力
        step += 1
        if step % 100 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels,  # プレースホルダーtにはテストデータの正解ラベルをセット
                           keep_prob: 1.0} # 非ドロップアウト率を1.0にする
            )
            print('Step %d: accuracy = %.2f' % (step, acc_val))

畳み込みとプーリングが多少スッキリ書けましたね。

TensorFlow(tf.keras)

cnn5.py
import tensorflow as tf
from tensorflow.keras.backend import set_session

# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)
train_images, test_images = train_images / 255.0, test_images / 255.0

train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)

# 2.モデルの作成部分
learning_rate = 0.01    # 学習率
training_epochs = 5     # エポック数
batch_size = 50         # ミニバッチのサイズ
dropout = 0.5           # 非ドロップアウト率

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)
set_session(tf.Session(config=config))

# モデル作成(クラスを使う場合)
class CNNModel(tf.keras.Model):

    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(28,28,1))
        self.pool1 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same')
        self.conv2 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
        self.pool2 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same')
        self.drop1 = tf.keras.layers.Dropout(dropout)
        self.flat1 = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(10, activation=tf.nn.softmax)

    def call(self, inputs):
        h = self.conv1(inputs)
        h = self.pool1(h)
        h = self.conv2(h)
        h = self.pool2(h)
        h = self.drop1(h)
        h = self.flat1(h)
        h = self.dense1(h)
        return self.dense2(h)

model = CNNModel()

model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)

今回はモデルをクラスで書いてみました。こうするとChainerっぽく感じますね。

TensorFlow v2

cnn6.py
import tensorflow as tf

# 1.データセットの準備部分
batch_size = 50         # ミニバッチのサイズ
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0
train_images = train_images[..., tf.newaxis] # NHWC (Number, height, width, channel)の形にするため次元追加
test_images = test_images[..., tf.newaxis]   # .reshapeを使っても良い

train_ds = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels)).shuffle(10000).batch(batch_size)
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

# データ拡張を加える場合は以下のように書くこともできる
# datagen = tf.keras.preprocessing.image.ImageDataGenerator(rotation_range=90)
# train_ds = datagen.flow(train_images, train_labels, batch_size, shuffle=True)
# test_ds = datagen.flow(test_images, test_labels, batch_size, shuffle=False)

# 2.モデルの作成部分
learning_rate = 0.001   # 学習率
training_epochs = 5     # エポック数
dropout = 0.5           # 非ドロップアウト率

class CNNModel(tf.keras.Model):

    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(28,28,1)) # input_shapeは無くても良い
        self.pool1 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same') # tf.keras.layers.MaxPool2Dとも書ける
        self.conv2 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
        self.pool2 = tf.keras.layers.MaxPooling2D(pool_size=2, strides=2, padding='same')
        self.drop1 = tf.keras.layers.Dropout(dropout)
        self.flat1 = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(1024, activation='relu')
        self.dense2 = tf.keras.layers.Dense(10, activation='softmax')

    def call(self, inputs):
        h = self.conv1(inputs)
        h = self.pool1(h)
        h = self.conv2(h)
        h = self.pool2(h)
        h = self.drop1(h)
        h = self.flat1(h)
        h = self.dense1(h)
        return self.dense2(h)

model = CNNModel()

# 3.実行する部分
loss_object = tf.keras.losses.SparseCategoricalCrossentropy() # CategoricalCrossentropyと違い、ラベルのOne-hot化不要
optimizer = tf.keras.optimizers.Adam(learning_rate)

train_loss = tf.keras.metrics.Mean(name='train_loss') # 損失の記録用
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') # 精度の記録用
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# 訓練用
@tf.function
def train_step(images, labels):
  with tf.GradientTape() as tape:
    predictions = model(images)
    loss = loss_object(labels, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)

# テスト用
@tf.function
def test_step(images, labels):
  predictions = model(images)
  t_loss = loss_object(labels, predictions)

  test_loss(t_loss)
  test_accuracy(labels, predictions)

# 学習開始
for epoch in range(training_epochs):
  for images, labels in train_ds:
    train_step(images, labels)

  for test_images, test_labels in test_ds:
    test_step(test_images, test_labels)

  template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print (template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result()*100,
                         test_loss.result(),
                         test_accuracy.result()*100))

  # 次のエポック用にメトリクスをリセット
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

注意が必要なのは入力データの形だけです。

RNN編

次はRNNです。やはりMNISTの分類を行います。
28x28の画像を、1行28画素が縦に28時刻分並んだ時系列データとみなして無理やりRNNに入れています。
本来であれば、テキストの分類や機械翻訳、文章生成に使われる手法です。

サンプルコード

Chainer

rnn1.py
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training, datasets, iterators, optimizers
from chainer.training import extensions

# 1.データセットを用意する
train, test = chainer.datasets.get_mnist(ndim=3)# ndim=width,height,color

batch_size = 10                # バッチサイズ
uses_device = 0                # GPU#0を使用(CPU=-1)

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

# 2.モデルの作成
class RNN(chainer.Chain):

    def __init__(self):
        super(RNN, self).__init__()
        with self.init_scope():
            self.lstm1 = L.NStepLSTM(3, n_in,n_unit, dropout=0.) #3層構造にする
            self.l1 = L.Linear(None, 10)

    def __call__(self, x, t=None, train=True):
        h = F.reshape(x, (-1,28,28))
        h = [i for i in h]
        _, _, h = self.lstm1(None, None, h)
        h = F.concat([F.expand_dims(i, axis=0) for i in h], axis=0)
        last_output = h[:,-1,:] # 最後の時系列情報を取得
        h = self.l1(last_output)
        # 損失か結果を返す
        return F.softmax_cross_entropy(h, t) if train else F.softmax(h)

# 3.Updater(パラメータ更新手法)の作成
class MNISTUpdater(training.StandardUpdater):

    def __init__(self, train_iter, optimizer, device):
        super(MNISTUpdater, self).__init__(
            train_iter,
            optimizer,
            device=device
        )

    def update_core(self):
        # データを1バッチ分取得
        batch = self.get_iterator('main').next()
        x, t = chainer.dataset.convert.concat_examples(batch, self.device) # ここではx, t = self.converter(batch, self.device)でも良い
        # Optimizerを取得
        optimizer = self.get_optimizer('main')
        # modelを取得
        net = optimizer.target

        # パラメータを更新(カッコの中には損失関数が入る、ここでは関数netにxとtを入力して計算された損失)
        optimizer.update(net, x, t)

        # 損失関数が複雑でUpdaterクラスのメンバーとして定義する場合
        # optimizer.target.cleargrads() で計算前に勾配をクリアにする
        # loss = self.loss_1(x,t) + self.loss_2(x,t) のように損失を計算する(例)
        # loss.backward() で誤差逆伝播の計算をする
        # optimizer.update() で更新する
        # という書き方もできる

# 4.実行する
# モデルを作成
model = RNN()

# GPUに対応させる
if uses_device >= 0:
    chainer.cuda.get_device_from_id(uses_device).use()
    chainer.cuda.check_cuda_available()
    # GPU用データ形式に変換
    model.to_gpu()

# iterator(繰り返し条件)を作成する
train_iter = iterators.SerialIterator(train, batch_size, shuffle=True)

# 誤差逆伝播法アルゴリズムを選択する
optimizer = optimizers.Adam()
optimizer.setup(model)

# デバイスを選択してTrainer(訓練)を作成する
updater = MNISTUpdater(train_iter, optimizer, device=uses_device)
trainer = training.Trainer(updater, (100, 'epoch'), out="result")
# Trainerにはextensionという便利なオプションがつけられる
trainer.extend(extensions.ProgressBar()) # 学習の進展を表示するようにする

# 機械学習を実行する
trainer.run()

無理やり書き方を合わせたので、精度はそんなにです。

PyTorch

rnn2.py
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
import torch.nn.functional as F 
from torch.utils.data import Dataset, DataLoader, TensorDataset

# 1.データセットを用意する
training_epochs = 1000         # エポック数
batch_size = 10                # バッチサイズ

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

trainset = torchvision.datasets.MNIST(root='./data', 
                                        train=True,
                                        download=True,
                                        transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset,
                                            batch_size=batch_size,
                                            shuffle=True)

testset = torchvision.datasets.MNIST(root='./data', 
                                        train=False, 
                                        download=True, 
                                        transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset, 
                                            batch_size=batch_size,
                                            shuffle=False)

# 2.モデルの作成
class RNN(nn.Module):
    def __init__(self):
        super(RNN, self).__init__()   
        self.lstm = nn.LSTM(n_in, n_unit, 3) # 3層構造にする
        self.l1 = nn.Linear(128, 10)

    def forward(self, x):
        h = x.view(-1, 28, 28)
        h, _ = self.lstm(h, None)
        last_output = h[:,-1,:] # 最後の時系列情報を取得
        h = self.l1(last_output)
        return h

# 3.実行する
# GPUに対応させる
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = RNN().to(device)

# 誤差逆伝播法アルゴリズムを選択する
criterion = nn.CrossEntropyLoss() # 損失関数を選択
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 訓練する
for epoch in range(training_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(trainloader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad() # 計算前に勾配をゼロにする
        outputs = model(inputs) # 順伝播の計算をする
        loss = criterion(outputs, labels) # 損失を計算する
        loss.backward() # 誤差逆伝播の計算をする
        optimizer.step() # 更新する

        # 損失を定期的に出力する
        running_loss += loss.item()
        if i % 100 == 99:
            print('[{:d}, {:5d}] loss: {:3f}'
                    .format(epoch + 1, i + 1, running_loss / 100))
            running_loss = 0.0

print('Finished Training')

# テストする
correct = 0
total = 0

with torch.no_grad():
    for (images, labels) in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
print('Accuracy: {:.2f} %%'.format(100 * float(correct/total)))

同じく無理やり書き方を合わせたので、精度はそんなにです。

TensorFlow

rnn3.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
training_epochs = 1200 # エポック数
batch_size = 100       # ミニバッチのサイズ

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層(ニューロン数=28)
with tf.name_scope("input"):
    # 入力データなど、実行時に確定する値はプレースホルダーで扱う
    x = tf.placeholder(tf.float32, [None, 784])
    # [データ数, 28時刻, 28画素]の3次元配列に変換
    input = tf.reshape(x, [-1, n_in, n_time])

# 中間層(3層構造)
with tf.name_scope("layer_lstm"):
    # 中間層として1ユニットあたり128個のLSTMセルを配置し、これを3個生成
    stacked_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_unit) for _ in range(3)]
    # 3個の中間層をラッピングして3層構造にする
    cell = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_cells)
    # 出力:[バッチサイズ, 時間長(n_time), 出力長(n_unit)]
    outputs, _ = tf.nn.dynamic_rnn(cell=cell,            # LSTMのセルを指定
                                        inputs=input,    # 入力を指定
                                        dtype=tf.float32 # 出力データの型を指定
                                       )

# 出力層(ニューロン数=10)
with tf.name_scope("layer_output"):
    # [データ数, 時間長(n_time), 出力長(n_unit)]から
    # 最後の時系列情報を取得
    last_output = outputs[:,-1,:]
    # 重み
    w = tf.Variable(tf.truncated_normal([n_unit,10], stddev=0.1))
    # バイアス
    b = tf.Variable(tf.zeros([10]))
    # 活性化関数
    out = tf.nn.softmax(tf.matmul(last_output, w ) + b)

# 誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(
        -tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
        )

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.AdamOptimizer().minimize(loss)

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

# 変数を初期化するノード
init = tf.global_variables_initializer()

# 3.セッションで実行する部分            
with tf.Session(config=config) as sess:
    sess.run(init) # initノードを実行して変数を初期化
    # 訓練(学習)の実行
    for epoch in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels}     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
            )

        # 100エポックごとにテストデータを使用して精度を出力
        epoch += 1
        if epoch % 100 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}  # プレースホルダーtにはテストデータの正解ラベルをセット
            )
            print('(%d): accuracy = %.2f' % (epoch, acc_val))

精度はそこそこ良いです。

TensorFlow(tf.layers)

rnn4.py
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.ERROR) # MNIST読み込み時の警告を出力させないために書く

# 1.データセットの準備部分
mnist = input_data.read_data_sets("data/", one_hot=True)
train_images = mnist.train.images # 訓練用の画像
train_labels = mnist.train.labels # 訓練用の正解ラベル
test_images = mnist.test.images   # テスト用の画像
test_labels = mnist.test.labels   # テスト用の正解ラベル

# 2.データフローグラフ(設計図)の作成部分
training_epochs = 1200 # エポック数
batch_size = 100       # ミニバッチのサイズ

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)

# 入力層(ニューロン数=28)
with tf.name_scope("input"):
    # 入力データなど、実行時に確定する値はプレースホルダーで扱う
    x = tf.placeholder(tf.float32, [None, 784])
    # [データ数, 28時刻, 28画素]の3次元配列に変換
    input = tf.reshape(x, [-1, n_in, n_time])

# 中間層(3層構造)
with tf.name_scope("layer_lstm"):
    # 中間層として1ユニットあたり128個のLSTMセルを配置し、これを3個生成
    stacked_cells = [tf.nn.rnn_cell.LSTMCell(num_units=n_unit) for _ in range(3)]
    # 3個の中間層をラッピングして3層構造にする
    cell = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_cells)
    # 出力:[バッチサイズ, 時間長(n_time), 出力長(n_unit)]
    outputs, _ = tf.nn.dynamic_rnn(cell=cell,            # LSTMのセルを指定
                                        inputs=input,    # 入力を指定
                                        dtype=tf.float32 # 出力データの型を指定
                                       )

# 出力層(ニューロン数=10)
with tf.name_scope("layer_output"):
    # [データ数, 時間長(n_time), 出力長(n_unit)]から
    # 最後の時系列情報を取得
    last_output = outputs[:,-1,:]
    out = tf.layers.dense(inputs=last_output,units=10,activation=tf.nn.softmax)

# 誤差関数(クロスエントロピー)
with tf.name_scope("loss"):
    # 正解ラベルも実行時に確定する値なのでプレースホルダーで扱う
    t = tf.placeholder(tf.float32, [None, 10])
    loss = tf.reduce_mean(
        -tf.reduce_sum(t*tf.log(out + 1e-5), axis=[1])
        )

# 訓練(誤差逆伝播法アルゴリズムを選択)
with tf.name_scope("train"):
    train_step = tf.train.AdamOptimizer().minimize(loss)

# 評価
with tf.name_scope("accuracy"):
    # (out=t)の最大値を比較
    correct = tf.equal(tf.argmax(out,1), tf.argmax(t,1))
    # True(正解=1)の平均を取る
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

# 変数を初期化するノード
init = tf.global_variables_initializer()

# 3.セッションで実行する部分            
with tf.Session(config=config) as sess:
    sess.run(init) # initノードを実行して変数を初期化
    # 訓練(学習)の実行
    for epoch in range(training_epochs):
        # ミニバッチを抽出
        train_images, train_labels = mnist.train.next_batch(batch_size)
        sess.run(
            train_step,                    # 訓練を実行
            feed_dict={x:train_images,     # プレースホルダーxには訓練データのミニバッチをセット
                       t:train_labels}     # プレースホルダーtには訓練データの正解ラベルのミニバッチをセット
            )

        # 100エポックごとにテストデータを使用して精度を出力
        epoch += 1
        if epoch % 100 == 0:
            acc_val = sess.run(
                accuracy,                  # 評価を実行
                feed_dict={x:test_images,  # プレースホルダーxにはテストテータをセット
                           t:test_labels}  # プレースホルダーtにはテストデータの正解ラベルをセット
            )
            print('(%d): accuracy = %.2f' % (epoch, acc_val))

tf.layersをあまり使える箇所がないため、ほぼ変わらないです。

TensorFlow(tf.keras)

rnn5.py
import tensorflow as tf
from tensorflow.keras.backend import set_session

# 1.データセットの準備部分
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images = train_images.reshape(train_images.shape[0], 28, 28)
test_images = test_images.reshape(test_images.shape[0], 28, 28)
train_images, test_images = train_images / 255.0, test_images / 255.0

train_labels = tf.keras.utils.to_categorical(train_labels, 10)
test_labels = tf.keras.utils.to_categorical(test_labels, 10)

# 2.モデルの作成部分
learning_rate = 0.01    # 学習率
training_epochs = 5     # エポック数
batch_size = 50         # ミニバッチのサイズ

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

# GPUを指定する
config = tf.ConfigProto(
    gpu_options=tf.GPUOptions(
        visible_device_list="0", # GPU番号を指定
        allow_growth=True
    )
)
set_session(tf.Session(config=config))

# モデル作成(クラスを使う場合)
class RNNModel(tf.keras.Model):

    def __init__(self):
        super(RNNModel, self).__init__()
        stacked_cells = [tf.keras.layers.LSTMCell(n_unit) for _ in range(3)]
        self.lstm1 = tf.keras.layers.RNN(stacked_cells,return_sequences=True)
        self.dense1 = tf.keras.layers.Dense(10, activation=tf.nn.softmax)

    def call(self, inputs):
        h = self.lstm1(inputs)
        last_output = h[:,-1,:]
        h = self.dense1(last_output)
        return h

model = RNNModel()

model.compile(optimizer=tf.train.RMSPropOptimizer(learning_rate),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.fit(train_images, train_labels, batch_size=batch_size, epochs=training_epochs)
model.evaluate(test_images, test_labels)

無理やり書き方を合わせたので、精度はそんなにです。
TensorFlow公式のサンプルを見ればわかりますが、tf.keras.layers.LSTMCellで書いたのは少し冗長で、tf.keras.layers.LSTMを使う方が普通です。

TensorFlow v2

rnn6.py
import tensorflow as tf

# 1.データセットの準備部分
batch_size = 50         # ミニバッチのサイズ
mnist = tf.keras.datasets.mnist

(train_images, train_labels),(test_images, test_labels) = mnist.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0

train_ds = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels)).shuffle(10000).batch(batch_size)
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

# 2.モデルの作成部分
learning_rate = 0.001   # 学習率
training_epochs = 5     # エポック数

n_in = 28    # 1時刻当たりのデータ数
n_time = 28  # 時系列の数
n_unit = 128 # 中間層のセルの数

class RNNModel(tf.keras.Model):

    def __init__(self):
        super(RNNModel, self).__init__()
        # 入力の形を明示するためにtf.keras.layers.InputLayer(input_shape=(28,28))を最初に入れても良い
        stacked_cells = [tf.keras.layers.LSTMCell(n_unit) for _ in range(3)]
        self.lstm1 = tf.keras.layers.RNN(stacked_cells, return_sequences=True)
        self.dense1 = tf.keras.layers.Dense(10, activation='softmax')

    def call(self, inputs):
        h = self.lstm1(inputs)
        last_output = h[:,-1,:]
        h = self.dense1(last_output)
        return h

# 以下のようにも書ける
#class RNNModel(tf.keras.Model):
#
#    def __init__(self):
#        super(RNNModel, self).__init__()
#        self.lstm1 = tf.keras.layers.LSTM(n_unit, return_sequences=True)
#        self.lstm2 = tf.keras.layers.LSTM(n_unit, return_sequences=True)
#        self.lstm3 = tf.keras.layers.LSTM(n_unit, return_sequences=False)
#        self.dense1 = tf.keras.layers.Dense(10, activation='softmax')
#
#    def call(self, inputs):
#        h = self.lstm1(inputs)
#        h = self.lstm2(h)
#        h = self.lstm3(h)
#        h = self.dense1(h)
#        return h

model = RNNModel()

# 3.実行する部分
loss_object = tf.keras.losses.SparseCategoricalCrossentropy() # CategoricalCrossentropyと違い、ラベルのOne-hot化不要
optimizer = tf.keras.optimizers.Adam(learning_rate)

train_loss = tf.keras.metrics.Mean(name='train_loss') # 損失の記録用
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') # 精度の記録用
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# 訓練用
@tf.function
def train_step(images, labels):
  with tf.GradientTape() as tape:
    predictions = model(images)
    loss = loss_object(labels, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)

# テスト用
@tf.function
def test_step(images, labels):
  predictions = model(images)
  t_loss = loss_object(labels, predictions)

  test_loss(t_loss)
  test_accuracy(labels, predictions)

# 学習開始
for epoch in range(training_epochs):
  for images, labels in train_ds:
    train_step(images, labels)

  for test_images, test_labels in test_ds:
    test_step(test_images, test_labels)

  template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print (template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result()*100,
                         test_loss.result(),
                         test_accuracy.result()*100))

  # 次のエポック用にメトリクスをリセット
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

精度はそこそこ出ます。普通はtf.keras.layers.LSTMの方で書きます。

モデルの保存と読み込み編

ここでは学習したモデルの保存と読み込みの方法を紹介します。

サンプルコード

Chainer

model_path = 'model.hdf5' # .npzでも保存できる
chainer.serializers.save_hdf5(model_path, model)
chainer.serializers.load_hdf5(model_path, model)

PyTorch

model_path = 'model.pth'
torch.save(model.state_dict(), model_path) # for gpu
torch.save(model.to('cpu').state_dict(), model_path) # for cpu
model.load_state_dict(torch.load(model_path))

TensorFlow

model_path = 'model'
with tf.Session() as sess:
    saver = tf.train.Saver()
    saver.save(sess, model_path)
    saver.restore(sess, model_path)

TensorFlow v2

model_path = 'model'
tf.saved_model.save(model, model_path)
model = tf.saved_model.load(model_path)

# この他にもtf.keras.models.save_modelやmodel.save_weights()やmodel.to_json()を使った方法がある

カスタムデータセット編

ここでは自前のデータを利用して学習するための書き方を紹介します。画像の入ったディレクトリやラベルの書かれたテキストファイルへのパスは自分で設定してください。

サンプルコード

Chainer

class CustomDataset(chainer.dataset.DatasetMixin):
    def __init__(self, images_path, image_size, labels_path, dtype=np.float32):
        self._images_path = images_path
        self._image_size = image_size
        self._labels_path = labels_path
        self._dtype = dtype

        self._images = []
        self._labels = []
        images = os.listdir(self._images_path)
        labels = [0] * len(images)
        # 以下のようにファイルから読み込むパターンもある
        #for line in open(self._labels_path):
        #  self._labels.append(int(line.strip()))

        for image, label in zip(images, labels):
            self._images.append(image)
            self._labels.append(label)

    def __len__(self):
        # データセットの数を返す
        return len(self._images_path)

    def get_example(self, index):
        # データセットのインデックスを受け取ってデータを返す
        image = self._images[index]
        label = self._labels[index]
        with open(image, 'rb') as f:
            image = Image.open(f)
            image = image.convert('RGB')
        image = image.resize(self._image_size)
        image = np.asarray(image, dtype=self._dtype)
        image = image.transpose(2, 0, 1) # PILのImageは(height, width, channel)なのでChainerの形式に変換
        return image, np.array(label)

custom_dataset = CustomDataset(images_path, (64, 64), labels_path)
train_iter = iterators.SerialIterator(dataset=custom_dataset, batch_size=64, shuffle=True)

PyTorch

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, transform=None):
        self.transform = transform
        self.images = []
        self.labels = []
        images_path = "images_path"
        images = os.listdir(images_path)
        labels = [0] * len(images)
        # 以下のようにファイルから読み込むパターンもある
        #for line in open(os.path.join(root_path, name + '.txt')):
        #  self.labels.append(int(line.strip()))

        for image, label in zip(images, labels):
            self.images.append(image)
            self.labels.append(label)

    def __getitem__(self, index):
        # データセットのインデックスを受け取ってデータを返す
        image = self.images[index]
        label = self.labels[index]
        with open(image, 'rb') as f:
            image = Image.open(f)
            image = image.convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, np.array(label)

    def daya(self):
        # データセットの数を返す
        return len(self.images)

custom_dataset = CustomDataset(transform=torchvision.transforms.ToTensor()))
train_loader = torch.utils.data.DataLoader(dataset=custom_dataset, batch_size=64, shuffle=True)

TensorFlow

# 基本的にデータセットはNHWCの4階テンソルNumpy配列で与えれば良い(tf.Tensorにしても良い)
class CustomDataset:
    def __init__(self, images_path, image_size, labels_path, dtype=np.float32):
        self._images_path = images_path
        self._image_size = image_size
        self._labels_path = labels_path
        self._dtype = dtype
        self._index_in_epoch = 0
        self._epochs_completed = 0

        self._images = []
        self._labels = []
        images = os.listdir(self._images_path)
        labels = [0] * len(images)
        # 以下のようにファイルから読み込むパターンもある
        #for line in open(self._labels_path):
        #  self._labels.append(int(line.strip()))

        for image, label in zip(images, labels):
            self._images.append(image)
            self._labels.append(label)

    def next_batch(self, batch_size, shuffle=False):
        # バッチサイズの分だけデータを返す
        start = self._index_in_epoch
        self._index_in_epoch += batch_size
        if self._index_in_epoch > self.datanum():
            # 1 epoch終わった後の処理
            self._epochs_completed += 1
            perm = np.arange(self.datanum())
            if shuffle:
                np.random.shuffle(perm)
            self._images = self._images[perm]
            self._labels = self._labels[perm]
            start = 0
            self._index_in_epoch = batch_size
        end = self._index_in_epoch

        image = self._images[start:end]
        label = self._labels[start:end]
        image_arr = []
        for image_ele in image:
            with open(image_ele, 'rb') as f:
                image_ele = Image.open(f)
                image_ele = image_ele.convert('RGB')
            image_ele = image_ele.resize(self._image_size)
            image_arr.append(image_ele)
        image = np.asarray(image_arr, dtype=self._dtype)
        return image, np.array(label)

    def datanum(self):
        # データセットの数を返す
        return len(self._images_path)

custom_dataset = CustomDataset(images_path, (64, 64), labels_path)
with tf.Session() as sess:
    # ・・・
    train_images, train_labels = custom_dataset.next_batch(batch_size=64, shuffle=True)
    # ・・・

TensorFlow v2

def load_and_preprocess_image(path):
  # pathの画像を読み込み正規化する関数
  image = tf.io.read_file(path)
  image = tf.image.decode_jpeg(image, channels=3)
  image = tf.image.resize(image, [64, 64])
  image /= 255.0
  return image

all_images_path = os.listdir(images_path)
all_labels = [0] * len(all_images_path)
path_ds = tf.data.Dataset.from_tensor_slices(all_images_path)
image_ds = path_ds.map(load_and_preprocess_image) # 前処理を適用
label_ds = tf.data.Dataset.from_tensor_slices(all_label)
custom_dataset = tf.data.Dataset.zip((image_ds, label_ds))
# 前処理が無ければ最初から画像とラベルを以下のように与えても良い
# custom_dataset = tf.data.Dataset.from_tensor_slices((all_images, all_labels)).batch(batch_size=64)
custom_dataset = custom_dataset.apply(tf.data.experimental.shuffle_and_repeat(buffer_size=len(all_images_path)))
custom_dataset = custom_dataset.batch(64).prefetch(tf.data.experimental.AUTOTUNE)
train_images, train_labels = next(iter(custom_dataset))

# datasetをTFRecordに書きこんで読み込みを早くすることも可能
# https://www.tensorflow.org/tutorials/load_data/images?hl=ja

まとめ

自分のライブラリの勉強を兼ねて、2021年度版に更新しました。誰かの参考になれば幸いです。それでは。

参考

本記事は以下の書籍とサイトを参考にさせていただきました。
Chainerで作る コンテンツ自動生成AIプログラミング入門
TensorFlow&Kerasプログラミング実装ハンドブック
TensorFlow2プログラミング実装ハンドブック
Chainer公式
PyTorch公式
TensorFlow公式

4
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
12