TensorFlowを使ってみる 〜その4〜 畳み込みニューラルネットワーク(CIFAR-10)

今回は、CIFAR-10 を使った畳み込みニューラルネットワークによるカラー画像のカテゴリー分類を行います。

ソースコード input_data.py 内にてデータ形式の変換処理が抜けておりました。


CIFAR-10 は、10カテゴリーのカラー画像のデータセットになります。
カテゴリーは [飛行機・自動車・鳥・猫・鹿・犬・カエル・馬・船・トラック] の10種類になります。
このデータセットには、画像サイズ 32x32 の データが、トレーニングデータとして 50,000件、
テストデータとして 10,000件 含まれています。

  • サンプル




実装内容は、前回の MINST エントリーのコードをベースに修正・追加を行ったものになります。


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import sys
import numpy as np

from input_data_cifar import get_input_data, create_dataset, create_datasets
from tensorflow.contrib.learn.python.learn.datasets import base

import tensorflow as tf

# 入力データパス
DATASET_DIR = "/path/to/dir/"
# バリデーションサイズ

# 重み変数
def weight_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial)

# バイアス変数
def bias_variable(shape):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial)

# 畳み込み
def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

# プーリング
def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

# 入力データ
def _input_data() -> base.Datasets:
  images1, labels1 = get_input_data(DATASET_DIR, "data_batch_1", 32, 32, 3, 10)
  images2, labels2 = get_input_data(DATASET_DIR, "data_batch_2", 32, 32, 3, 10)
  images3, labels3 = get_input_data(DATASET_DIR, "data_batch_3", 32, 32, 3, 10)
  images4, labels4 = get_input_data(DATASET_DIR, "data_batch_4", 32, 32, 3, 10)
  images5, labels5 = get_input_data(DATASET_DIR, "data_batch_5", 32, 32, 3, 10)
  test_images, test_labels = get_input_data(DATASET_DIR, "test_batch", 32, 32, 3, 10)

  # 連結
  images = np.concatenate((images1, images2, images3, images4, images5), axis=0)
  labels = np.concatenate((labels1, labels2, labels3, labels4, labels5), axis=0)

  # トレーニングデータを分割
  validation_images = images[:VALIDATION_SIZE]
  validation_labels = labels[:VALIDATION_SIZE]
  train_images = images[VALIDATION_SIZE:]
  train_labels = labels[VALIDATION_SIZE:]

  # データセット生成
  train_dataset = create_dataset(train_images, train_labels)
  validation_dataset = create_dataset(validation_images, validation_labels)
  test_dataset = create_dataset(test_images, test_labels)

  inputdatas = create_datasets(train_dataset, validation_dataset, test_dataset)
  return inputdatas

def main(_):
  # データ取得
  input_data = _input_data()

  # プレースホルダ作成
  # 実行時にデータを入力する必要があるプレースホルダを生成する
  x = tf.placeholder(tf.float32, [None, 32 * 32 * 3])
  y_ = tf.placeholder(tf.float32, [None, 10])

  # 畳み込み1層目
  W_conv1 = weight_variable([5, 5, 3, 32])
  b_conv1 = bias_variable([32])
  x_image = tf.reshape(x, [-1,32,32,3])
  h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
  h_pool1 = max_pool_2x2(h_conv1)

  # 畳み込み2層目
  W_conv2 = weight_variable([5, 5, 32, 64])
  b_conv2 = bias_variable([64])
  h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
  h_pool2 = max_pool_2x2(h_conv2)

  # 全結合層
  W_fc1 = weight_variable([8 * 8 * 64, 1024])
  b_fc1 = bias_variable([1024])
  h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
  h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

  # ドロップアウト層
  keep_prob = tf.placeholder(tf.float32)
  h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

  # 出力層
  W_fc2 = weight_variable([1024, 10])
  b_fc2 = bias_variable([10])
  y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

  # 損失関数(交差エントロピー誤差)
  cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(y_conv, y_))

  # 勾配
  train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

  # 精度
  correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
  accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
  # セッション
  sess = tf.InteractiveSession()

  # トレーニング
  for i in range(20000):
    # トレーニングデータから50件を読み込む
    # データの最後まで読み込んだ時点でトレーニングデータのシャッフルを行い、再び先頭からデータを読み込む
    batch_x, batch_y = input_data.train.next_batch(50, array_2d=True)

    if i % 500 == 0:
      # 途中経過(500件ごと)
      train_accuracy = accuracy.eval(feed_dict={x: batch_x, y_: batch_y, keep_prob: 1.0})
      print("step %d, training accuracy %f" % (i, train_accuracy))

    # トレーニング実行
    train_step.run(feed_dict={x: batch_x, y_: batch_y, keep_prob: 0.5})

  # 評価
  test_x, test_y = input_data.test.next_batch(1000, array_2d=True)
  print("test accuracy %f" % accuracy.eval(feed_dict={x: test_x, y_: test_y, keep_prob: 1.0}))

if __name__ == '__main__':


# -*- coding: UTF-8 -*-

import pickle

from tensorflow.contrib.learn.python.learn.datasets import base
from tensorflow.python.framework import dtypes
import numpy as np
import numpy

# データセット
class DataSet(object):

    def __init__(self, images, labels, dtype=dtypes.float32):
        dtype = dtypes.as_dtype(dtype).base_dtype
        if dtype not in (dtypes.uint8, dtypes.float32):
            raise TypeError('Invalid image dtype %r, expected uint8 or float32' % dtype)

        assert images.shape[0] == labels.shape[0], (
            'images.shape: %s labels.shape: %s' % (images.shape, labels.shape))

        # データ件数セット
        self._num_examples = images.shape[0]

        if dtype == dtypes.float32:
            # Convert from [0, 255] -> [0.0, 1.0].
            images = images.astype(numpy.float32)
            images = numpy.multiply(images, 1.0 / 255.0)

        self._images = images
        self._labels = labels
        self._epochs_completed = 0
        self._index_in_epoch = 0

    def images(self):
        return self._images

    def images_2d_array(self):
        images = np.reshape(self._images, [-1, self._images.shape[1] * self._images.shape[2] * self._images.shape[3]])
        return images

    def labels(self):
        return self._labels

    def num_examples(self):
        return self._num_examples

    def epochs_completed(self):
        return self._epochs_completed

    # batch_size のデータを取得
    #   最後まで到達した場合、データをシャップルして最初からデータを取り出す
    def next_batch(self, batch_size, array_2d=False):
        start = self._index_in_epoch
        self._index_in_epoch += batch_size
        if self._index_in_epoch > self._num_examples:
            # Finished epoch
            self._epochs_completed += 1
            # Shuffle the data
            perm = numpy.arange(self._num_examples)
            self._images = self._images[perm]
            self._labels = self._labels[perm]
            # Start next epoch
            start = 0
            self._index_in_epoch = batch_size
            assert batch_size <= self._num_examples

        end = self._index_in_epoch
        images = self._images[start:end]

        # データを 2D へ整形
        if array_2d:
            images = np.reshape(images,
                [-1, self.images.shape[1] * self.images.shape[2] * self.images.shape[3]])

        return images, self._labels[start:end]

# One-hot 変換
def dense_to_one_hot(labels_dense: numpy.ndarray, num_classes: int) -> numpy.ndarray:
    Scalars から One-hot へコンバートする
    :param labels_dense: ラベル配列
    :param num_classes:  分類クラス数
    :return: ラベル配列(One-hot)
    num_labels = labels_dense.shape[0]
    index_offset = numpy.arange(num_labels) * num_classes
    labels_one_hot = numpy.zeros((num_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

# 入力データ取得
def get_input_data(dir: str, filename: str, rows: int, cols: int, channel: int, class_numm: int) \
        -> (numpy.ndarray, numpy.ndarray):

    # データ読み込み(CIFAR-10)
    filepath = dir + filename
    dict = unpickle(filepath)

    # データ取得
    data = dict[b'data']
    labels = np.array(dict[b'labels'])

    # サイズ取得
    if labels.shape[0] != data.shape[0]:
        raise Exception('Error: Different length')
    num_images = labels.shape[0]

    # 入力データ整形(4次元配列)
    data = data.reshape(num_images, channel, rows, cols) # 2017.7.7 修正
    # [n, 3, 32, 32] -> [n, 32, 32, 3] へ変換
    data = data.transpose([0, 2, 3, 1]) # 2017.7.7 追加

    # ラベルデータ整形(One-Hot)
    labels = dense_to_one_hot(labels, class_numm)

    return data, labels

# Pickle の アンシリアライズ
def unpickle(file: str) -> dict:
    with open(file, 'rb') as f:
        dict = pickle.load(f, encoding="bytes")
    return dict

# データセット生成
def create_dataset(images: numpy.ndarray, labels: numpy.ndarray) -> DataSet:
    dataset = DataSet(images, labels, dtype=dtypes.float32)
    return dataset

# データセット生成
def create_datasets(train: DataSet, validation: DataSet, test: DataSet) -> base.Datasets:
    return base.Datasets(train=train, validation=validation, test=test)

CIFAR-10 のデータ形式は、n x 3 x 32 x 32 のデータ構造となっているため、
畳み込み関数 tf.nn.conv2d の引数 input で指定されている形式である
[batch, in_height, in_width, in_channels] の形に変換する必要があります。
そのため、データ読み込み時に transpose 関数を使用して n x 32 x 32 x 3 の形へ変換します。









  • 入力データ用プレースホルダ作成
  # プレースホルダ作成
  # 実行時にデータを入力する必要があるプレースホルダを生成する
  x = tf.placeholder(tf.float32, [None, 32 * 32 * 3])
  y_ = tf.placeholder(tf.float32, [None, 10])

プレースホルダも [None, 32 * 32 * 3] のように指定します。

  • 畳み込み1層目
  # 畳み込み1層目
  W_conv1 = weight_variable([5, 5, 3, 32])
  b_conv1 = bias_variable([32])
  x_image = tf.reshape(x, [-1,32,32,3])
  h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
  h_pool1 = max_pool_2x2(h_conv1)  

こちらも入力チャンネル数に対応して、W_conv1 = weight_variable([5, 5, 3, 32]) や、
x_image = tf.reshape(x, [-1,32,32,3]) のように指定します。

  • トレーニング
# トレーニング
for i in range(20000):
  # トレーニングデータから50件を読み込む
  # データの最後まで読み込んだ時点でトレーニングデータのシャッフルを行い、再び先頭からデータを読み込む
  batch_x, batch_y = input_data.train.next_batch(50, array_2d=True)

  if i % 500 == 0:
    # 途中経過(500件ごと)
    train_accuracy = accuracy.eval(feed_dict={x: batch_x, y_: batch_y, keep_prob: 1.0})
    print("step %d, training accuracy %f" % (i, train_accuracy))

  # トレーニング実行
  train_step.run(feed_dict={x: batch_x, y_: batch_y, keep_prob: 0.5})

一度のトレーニングで、トレーニングデータを 50件ずつ読み込み train_step を実行します。
また、途中経過として、500回毎に精度を print出力しています。

  • 評価
  # 評価
  test_x, test_y = input_data.test.next_batch(1000, array_2d=True)
  print("test accuracy %f" % accuracy.eval(feed_dict={x: test_x, y_: test_y, keep_prob: 1.0}))

ここでは、テストデータ 1,000件 を使って精度を算出します。
keep_prob は 1.0 を指定しています。



python deep_cifar10_softmax.py



精度は 70.6% でした。

(修正 2017.7.7)

以上、今回は CIFAR-10 を使った畳み込みニューラルネットワークによるカラー画像のカテゴリー分類を行いました。


