2
1

More than 1 year has passed since last update.

【TensorFlow2.x系】Subclassing APIを用いて今更MNIST

Last updated at Posted at 2021-11-12

はじめに

TensorFlowは機械学習用のライブラリであり、近年発表された論文のモデル等も実装してくれていたりします。タイトルにあるように、現在は「TensorFlow2.x系」の開発が進められています。過去には「TensorFlow1.x系」なるライブラリが提供されていましたが(今でも提供されているが)、慣れないと扱いが難しい記述方式でした。
TensorFlow2.x系になり、実装方法が「Sequential API」「Functional API」「Subclassing API」の三種類になりました。所感ですが、簡単に実装したい場合は「Sequential API」、1.x系に慣れている人は「Functional API」、色々と拡張したい人は「Subclassing API」を用いて実装するのかな〜という感じです。研究には「Subclassing API」を用いるため、この記事ではSubclassing APIを用いてMNISTに挑戦したいと思います。

実装

GitHub→TensorFlow2_Subclassing_MNIST

インポートからパラメータ設定

利用するライブラリをあらかじめインポートしておきます。

必要なライブラリをインポート
import numpy as np
import os
import tensorflow as tf
import matplotlib.pyplot as plt
import pdb

TensorFlowの学習にGPUを利用する際に記述しておきたいコードを書いておきましょう!

GPU設定
# tensorflow2.xでのGPUの設定
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    #
    for k in range(len(physical_devices)):
        tf.config.set_visible_devices(physical_devices[k], 'GPU')
        tf.config.experimental.set_memory_growth(physical_devices[k], True)
        print('memory growth:', tf.config.experimental.get_memory_growth(physical_devices[k]))
    os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
else:
    print("Not enough GPU hardware devices available")

各種パスやパラメータを設定しておきます。

パラメータ設定
# parameter setting
visual_path = 'visualization'
checkpoint_path = 'checkpoint'
checkpoint_file = 'weights_nn.ckpt'

BATCH_SIZE = 128
Epochs = 10

isVisualize = True
isLoadModel = False
isTraining = True

# ディレクトリが存在しない場合は作成
if not os.path.exists(visual_path):
    os.makedirs(visual_path)
if not os.path.exists(checkpoint_path):
    os.makedirs(checkpoint_path)

データ読み込み

今回は王道のMNISTを用います。MNISTとは、手書き数字の画像データです。詳しくはコチラ
TensorFlowでは、MNISTのロードが非常に簡単です!

MNISTをロード
# MNISTデータの読み込み
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

これだけでMNISTデータを扱えるのは、非常に便利ですよね〜
ロードした画像データは0から255までの値を持つモノクロ画像ですので、これを0から1の範囲に正規化しておきます。

正規化
# 画像の正規化
x_train = x_train / 255.
x_test = x_test / 255.

読み込んだ画像のshapeは(データ数,28,28)となっているので、チャネル軸を増やしておきます。

チャネル軸を追加
# チャネル軸を追加
x_train = np.expand_dims(x_train, axis=-1)
x_test = np.expand_dims(x_test, axis=-1)

このように、最後にチャネル軸を配置(データ数、高さ、幅、チャネル)する方法をChannels Lastと言い、TensroFlow(Keras)ではこれを用います。PyTorch等ではChannels First (データ数、チャネル、高さ、幅)を用います。TensorFlowでも、オプションを設定することにより、Channels Firstを利用することができます。

データ供給ライブラリを利用

TensorFlowのモデルにNumpy形式のままデータを渡すことも可能ですが、効率よくデータを供給する方法があります。それがtf.dataであり、詳しくはコチラの記事を参照して下さい。(このライブラリのおかげで、今まではGPUに乗り切らなかったデータを用いた実験を行えるようになりました。)

tf.data
# データ供給ライブラリを利用
TRAIN_SIZE = int(0.8 * len(x_train)) # 訓練データ8割の数を取得
TRAIN_DATA = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(x_train.shape[0]) # 訓練データをシャッフル
train_data = TRAIN_DATA.take(TRAIN_SIZE).batch(BATCH_SIZE) # 訓練データの8割を学習用に用いて、バッチを生成
val_data = TRAIN_DATA.skip(TRAIN_SIZE).batch(BATCH_SIZE) # 訓練データの2割を検証用に用いて、バッチ生成
test_data = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(x_test.shape[0]).batch(BATCH_SIZE) # 評価用データをシャッフルしてバッチ生成

モデル構築

まずは簡単に、単純なニューラルネットワークを用いて学習してみたいと思います(後ろの方でCNNも実装します)。
処理としては、フラット化(28*28=784次元)した後に、100次元→50次元として、最後に10次元(0〜9という10個の出力に対応するため)として出力します。

モデル
class myModel(tf.keras.Model):
    def __init__(self):
        super(myModel, self).__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(100, activation='sigmoid')
        self.dense2 = tf.keras.layers.Dense(50, activation='sigmoid')
        self.dense3 = tf.keras.layers.Dense(10, activation='softmax')

    def call(self, x):
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def train_step(self, data):
        x, y_true = data
        with tf.GradientTape() as tape:
            # 予測
            y_pred = self(x, training=True)
            # train using gradients 
            trainable_vars = self.trainable_variables
            # loss
            loss = self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # 勾配を用いた学習
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients((grad, var) for (grad, var) in zip(gradients, trainable_vars) if grad is not None)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        x, y_true = data
        # 予測
        y_pred = self(x, training=False)
        # loss
        self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def predict_step(self, x):
        # 予測
        y_pred = self(x, training=False)
        return y_pred

最初のinit関数内で用いる関数を用意し、call関数内でどのように入力画像データをフローさせるかを記述します。train_step関数では、入力された画像およびラベルデータを分離し、call関数に従って出力された結果と真値を比較して誤差を取得しています。tf.GradientTapeは勾配を計算するために、出力と各変数がどのように作用しているかを記録しており、誤差から各パラメータを更新しています。test_stepとpredict_stepの説明は割愛します。

モデルの設定および訓練

前項で構築したモデルを利用して学習を行います。まず、モデルを設定します。

モデルの設定
# モデルの設定
model = myModel()

# 学習方法の設定
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'], run_eagerly=True)

model.compileでは、最適化アルゴリズムや誤差関数などを指定しています。最後にモデルを学習させます。

モデルの学習
# 学習
history = model.fit(train_data, validation_data=val_data, epochs=Epochs)

検証・予測

モデルの検証や予測は学習と同じように、model.◯◯を実行するだけです。

検証
# 検証
result = model.evaluate(test_data)
print(dict(zip(model.metrics_names, result)))
予測
# 予測(今回はx_testを入力していますが、コンペ等では提出用データを予測します)
pred_data = tf.data.Dataset.from_tensor_slices(x_test).batch(BATCH_SIZE)
pred = model.predict(pred_data).argmax(axis=1)

大体の流れはこんな感じでしょうか。これだけで、正答率が97%を超えるのは楽ですね〜

全文

最終的なコードは以下のようになります。可視化やモデルロード等も追加されていますが、特に解説はしません。

mnist_nn.py
import numpy as np
import os
import tensorflow as tf
import matplotlib.pyplot as plt
import pdb

#============================================================
# tensorflow2.xでのGPUの設定
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    #
    for k in range(len(physical_devices)):
        tf.config.set_visible_devices(physical_devices[k], 'GPU')
        tf.config.experimental.set_memory_growth(physical_devices[k], True)
        print('memory growth:', tf.config.experimental.get_memory_growth(physical_devices[k]))
    os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
else:
    print("Not enough GPU hardware devices available")
#============================================================

#----------------------------
# parameter setting
visual_path = 'visualization'
checkpoint_path = 'checkpoint'
checkpoint_file = 'weights_nn.ckpt'

BATCH_SIZE = 128
Epochs = 10

isVisualize = True
isLoadModel = False
isTraining = True

# ディレクトリが存在しない場合は作成
if not os.path.exists(visual_path):
    os.makedirs(visual_path)
if not os.path.exists(checkpoint_path):
    os.makedirs(checkpoint_path)
#----------------------------

#----------------------------
# データの作成

# MNISTデータの読み込み
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

# 画像の正規化
x_train = x_train / 255.
x_test = x_test / 255.

# チャネル軸を追加
x_train = np.expand_dims(x_train, axis=-1)
x_test = np.expand_dims(x_test, axis=-1)

# データ供給ライブラリを利用
TRAIN_SIZE = int(0.8 * len(x_train))
TRAIN_DATA = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(x_train.shape[0])
train_data = TRAIN_DATA.take(TRAIN_SIZE).batch(BATCH_SIZE)
val_data = TRAIN_DATA.skip(TRAIN_SIZE).batch(BATCH_SIZE)
test_data = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(x_test.shape[0]).batch(BATCH_SIZE)

if isVisualize:
    # 画像を可視化
    fig = plt.figure()
    for i in range(20):
        fig.add_subplot(4,5,i+1)
        plt.imshow(x_train[i,:,:,0], vmin=0, vmax=1, cmap='gray')
        plt.title(f'number:{y_train[i]}')
        plt.axis('off')
    plt.tight_layout()
    plt.savefig(f'{visual_path}/trainImgs.pdf')

    fig = plt.figure()
    for i in range(20):
        fig.add_subplot(4,5,i+1)
        plt.imshow(x_test[i,:,:,0], vmin=0, vmax=1, cmap='gray')
        plt.title(f'number:{y_test[i]}')
        plt.axis('off')
    plt.tight_layout()
    plt.savefig(f'{visual_path}/testImgs.pdf')
#----------------------------

#----------------------------
# Subclassingを用いたネットワークの定義

class myModel(tf.keras.Model):
    def __init__(self):
        super(myModel, self).__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(100, activation='sigmoid')
        self.dense2 = tf.keras.layers.Dense(50, activation='sigmoid')
        self.dense3 = tf.keras.layers.Dense(10, activation='softmax')

    def call(self, x):
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def train_step(self, data):
        x, y_true = data
        with tf.GradientTape() as tape:
            # 予測
            y_pred = self(x, training=True)
            # train using gradients 
            trainable_vars = self.trainable_variables
            # loss
            loss = self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # 勾配を用いた学習
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients((grad, var) for (grad, var) in zip(gradients, trainable_vars) if grad is not None)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        x, y_true = data
        # 予測
        y_pred = self(x, training=False)
        # loss
        self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def predict_step(self, x):
        # 予測
        y_pred = self(x, training=False)
        return y_pred
#----------------------------

#----------------------------
# モデルの設定
model = myModel()

# 学習方法の設定
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'], run_eagerly=True)

if isLoadModel:
    try:
        # load trained parameters
        model.load_weights(f'{checkpoint_path}/{checkpoint_file}')
    except tf.errors.NotFoundError:
        print('Could not load weights!')
    else:
        print('load weights')

if isTraining:
    # make checkpoint callback to save trained parameters
    callback = tf.keras.callbacks.ModelCheckpoint(f'{checkpoint_path}/{checkpoint_file}', monitor='val_loss', save_weights_only=True, save_best_only=True, verbose=1)
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, mode='auto', min_delta=0, verbose=1)
    # 学習
    history = model.fit(train_data, validation_data=val_data, epochs=Epochs, callbacks=[callback, early_stopping])

if isVisualize and isTraining:
    # 学習曲線をプロット
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    plt.figure()
    plt.plot(np.arange(Epochs), loss, 'bo-', label='training loss')
    plt.plot(np.arange(Epochs), val_loss, 'b', label='validation loss')
    plt.title('Training and Validation loss')
    plt.legend()
    plt.savefig(f'{visual_path}/loss.pdf')

    # 正解率をプロット
    accuracy = history.history['acc']
    val_accuracy = history.history['val_acc']

    plt.figure()
    plt.plot(np.arange(Epochs), accuracy, 'bo-', label='training accuracy')
    plt.plot(np.arange(Epochs), val_accuracy, 'b', label='validation accuracy')
    plt.title('Training and Validation accuracy')
    plt.legend()
    plt.savefig(f'{visual_path}/accuracy.pdf')

# 検証
result = model.evaluate(test_data)
print(dict(zip(model.metrics_names, result)))

# 予測(今回はx_testを入力していますが、コンペ等では提出用データを予測します)
pred_data = tf.data.Dataset.from_tensor_slices(x_test).batch(BATCH_SIZE)
pred = model.predict(pred_data).argmax(axis=1)

CNNをSubclassing APIで実装

モデル構築

先程のように、myModelクラスの中に全て記述することも良いですが、特殊なレイヤーを自作したいという思いに駆られる日が来るかもしれません。そんな時には以下のように、tf.keras.layers.Layerクラスを継承して自作レイヤークラスを作成しましょう!

モデル
# Layerクラスを継承して独自のconvolution用のレイヤークラスを作成
class myConv(tf.keras.layers.Layer):
    def __init__(self,chn=32, conv_kernel=(3,3), pool_kernel=(2,2), isPool=True):
        super(myConv, self).__init__()
        self.isPool = isPool

        self.conv = tf.keras.layers.Conv2D(chn, conv_kernel)
        self.batchnorm = tf.keras.layers.BatchNormalization()
        self.relu = tf.keras.layers.ReLU()
        self.pool = tf.keras.layers.MaxPool2D(pool_kernel)        

    def call(self, x):
        x = self.conv(x)
        x = self.batchnorm(x)
        x = self.relu(x)

        if self.isPool:
            x = self.pool(x)
        return x

# Layerクラスを継承して独自のFC用のレイヤークラスを作成
class myFC(tf.keras.layers.Layer):
    def __init__(self, hidden_chn=64, out_chn=10):
        super(myFC, self).__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(hidden_chn, activation='relu')
        self.fc2 = tf.keras.layers.Dense(out_chn, activation='softmax')

    def call(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return x

# Modelクラスを継承し,独自のlayerクラス(myConvとmyFC)を用いてネットワークを定義する
# 独自のモデルクラスを作成
class myModel(tf.keras.Model):
    def __init__(self):
        super(myModel, self).__init__()
        self.conv1 = myConv(chn=32, conv_kernel=(3,3), pool_kernel=(2,2))
        self.conv2 = myConv(chn=64, conv_kernel=(3,3), pool_kernel=(2,2))
        self.conv3 = myConv(chn=64, conv_kernel=(3,3), isPool=False)
        self.fc = myFC(hidden_chn=64, out_chn=10)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        return self.fc(x)

    def train_step(self, data):
        x, y_true = data
        with tf.GradientTape() as tape:
            # 予測
            y_pred = self(x, training=True)
            # train using gradients 
            trainable_vars = self.trainable_variables
            # loss
            loss = self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # 勾配を用いた学習
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients((grad, var) for (grad, var) in zip(gradients, trainable_vars) if grad is not None)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        x, y_true = data
        # 予測
        y_pred = self(x, training=False)
        # loss
        self.compiled_loss(y_true, y_pred, regularization_losses=self.losses)
        # update metrics
        self.compiled_metrics.update_state(y_true, y_pred)
        return {m.name: m.result() for m in self.metrics}

    def predict_step(self, x):
        # 予測
        y_pred = self(x, training=False)
        return y_pred

上記を見るだけでは、この記述方式の必要性を感じることはできないかもしれませんが、研究等で新規の関数を作成する際には利用するかもしれません。

課題

MNISTの分類を参考に、皆さんが探してきた課題にモデルを適用させてみましょう!前処理やモデル構築のあたりを重点的に頑張って、その点について発表してもらいたいと考えています。(発表7分+質疑3分)
注意点として、次のデータは用いないようにして下さい。
MNISTが名前に含まれているデータ(Fashion MNIST等)、CIFAR-10/100、ワインのデータセット、iris
なるべく面白そうなデータを選択してくれると嬉しく思います!

データセットの提供サイト

Kaggle Datasets:https://www.kaggle.com/datasets
UCI Machine Learning Repository:https://archive.ics.uci.edu/ml/index.php

参考記事

tensorflow2を用いたCNNの実装方法
tf.dataの使い方メモ

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1