今回は、CIFAR-10 を使った畳み込みニューラルネットワークによるカラー画像のカテゴリー分類を行います。
(追記)2017.7.7
ソースコード input_data.py 内にてデータ形式の変換処理が抜けておりました。
お詫びして訂正いたします。
CIFAR-10
CIFAR-10 は、10カテゴリーのカラー画像のデータセットになります。
カテゴリーは [飛行機・自動車・鳥・猫・鹿・犬・カエル・馬・船・トラック] の10種類になります。
このデータセットには、画像サイズ 32x32 の データが、トレーニングデータとして 50,000件、
テストデータとして 10,000件 含まれています。
又、正解ラベルのデータも同数含まれています。
- サンプル
このデータセット使い、対象の画像が何であるかを求めます。
実装コード全体
実装内容は、前回の MINST エントリーのコードをベースに修正・追加を行ったものになります。
実装コードの全体は、下記となります。
deep_cifar10_softmax.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import sys
import numpy as np
from input_data_cifar import get_input_data, create_dataset, create_datasets
from tensorflow.contrib.learn.python.learn.datasets import base
import tensorflow as tf
# 入力データパス
DATASET_DIR = "/path/to/dir/"
# バリデーションサイズ
VALIDATION_SIZE = 7000
# 重み変数
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
# バイアス変数
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
# 畳み込み
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
# プーリング
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1], padding='SAME')
#
# 入力データ
#
def _input_data() -> base.Datasets:
images1, labels1 = get_input_data(DATASET_DIR, "data_batch_1", 32, 32, 3, 10)
images2, labels2 = get_input_data(DATASET_DIR, "data_batch_2", 32, 32, 3, 10)
images3, labels3 = get_input_data(DATASET_DIR, "data_batch_3", 32, 32, 3, 10)
images4, labels4 = get_input_data(DATASET_DIR, "data_batch_4", 32, 32, 3, 10)
images5, labels5 = get_input_data(DATASET_DIR, "data_batch_5", 32, 32, 3, 10)
test_images, test_labels = get_input_data(DATASET_DIR, "test_batch", 32, 32, 3, 10)
# 連結
images = np.concatenate((images1, images2, images3, images4, images5), axis=0)
labels = np.concatenate((labels1, labels2, labels3, labels4, labels5), axis=0)
# トレーニングデータを分割
validation_images = images[:VALIDATION_SIZE]
validation_labels = labels[:VALIDATION_SIZE]
train_images = images[VALIDATION_SIZE:]
train_labels = labels[VALIDATION_SIZE:]
# データセット生成
train_dataset = create_dataset(train_images, train_labels)
validation_dataset = create_dataset(validation_images, validation_labels)
test_dataset = create_dataset(test_images, test_labels)
inputdatas = create_datasets(train_dataset, validation_dataset, test_dataset)
return inputdatas
def main(_):
# データ取得
input_data = _input_data()
# プレースホルダ作成
# 実行時にデータを入力する必要があるプレースホルダを生成する
x = tf.placeholder(tf.float32, [None, 32 * 32 * 3])
y_ = tf.placeholder(tf.float32, [None, 10])
# 畳み込み1層目
W_conv1 = weight_variable([5, 5, 3, 32])
b_conv1 = bias_variable([32])
x_image = tf.reshape(x, [-1,32,32,3])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
# 畳み込み2層目
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
# 全結合層
W_fc1 = weight_variable([8 * 8 * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
# ドロップアウト層
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
# 出力層
W_fc2 = weight_variable([1024, 10])
b_fc2 = bias_variable([10])
y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
# 損失関数(交差エントロピー誤差)
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(y_conv, y_))
# 勾配
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
# 精度
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# セッション
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
# トレーニング
for i in range(20000):
# トレーニングデータから50件を読み込む
# データの最後まで読み込んだ時点でトレーニングデータのシャッフルを行い、再び先頭からデータを読み込む
batch_x, batch_y = input_data.train.next_batch(50, array_2d=True)
if i % 500 == 0:
# 途中経過(500件ごと)
train_accuracy = accuracy.eval(feed_dict={x: batch_x, y_: batch_y, keep_prob: 1.0})
print("step %d, training accuracy %f" % (i, train_accuracy))
# トレーニング実行
train_step.run(feed_dict={x: batch_x, y_: batch_y, keep_prob: 0.5})
# 評価
test_x, test_y = input_data.test.next_batch(1000, array_2d=True)
print("test accuracy %f" % accuracy.eval(feed_dict={x: test_x, y_: test_y, keep_prob: 1.0}))
if __name__ == '__main__':
tf.app.run(main=main)
input_data.py(データ取得等)
# -*- coding: UTF-8 -*-
import pickle
from tensorflow.contrib.learn.python.learn.datasets import base
from tensorflow.python.framework import dtypes
import numpy as np
import numpy
#
# データセット
#
class DataSet(object):
def __init__(self, images, labels, dtype=dtypes.float32):
dtype = dtypes.as_dtype(dtype).base_dtype
if dtype not in (dtypes.uint8, dtypes.float32):
raise TypeError('Invalid image dtype %r, expected uint8 or float32' % dtype)
assert images.shape[0] == labels.shape[0], (
'images.shape: %s labels.shape: %s' % (images.shape, labels.shape))
# データ件数セット
self._num_examples = images.shape[0]
if dtype == dtypes.float32:
# Convert from [0, 255] -> [0.0, 1.0].
images = images.astype(numpy.float32)
images = numpy.multiply(images, 1.0 / 255.0)
self._images = images
self._labels = labels
self._epochs_completed = 0
self._index_in_epoch = 0
@property
def images(self):
return self._images
@property
def images_2d_array(self):
images = np.reshape(self._images, [-1, self._images.shape[1] * self._images.shape[2] * self._images.shape[3]])
return images
@property
def labels(self):
return self._labels
@property
def num_examples(self):
return self._num_examples
@property
def epochs_completed(self):
return self._epochs_completed
#
# batch_size のデータを取得
# 最後まで到達した場合、データをシャップルして最初からデータを取り出す
#
def next_batch(self, batch_size, array_2d=False):
start = self._index_in_epoch
self._index_in_epoch += batch_size
if self._index_in_epoch > self._num_examples:
# Finished epoch
self._epochs_completed += 1
# Shuffle the data
perm = numpy.arange(self._num_examples)
numpy.random.shuffle(perm)
self._images = self._images[perm]
self._labels = self._labels[perm]
# Start next epoch
start = 0
self._index_in_epoch = batch_size
assert batch_size <= self._num_examples
end = self._index_in_epoch
images = self._images[start:end]
# データを 2D へ整形
if array_2d:
images = np.reshape(images,
[-1, self.images.shape[1] * self.images.shape[2] * self.images.shape[3]])
return images, self._labels[start:end]
#
# One-hot 変換
#
def dense_to_one_hot(labels_dense: numpy.ndarray, num_classes: int) -> numpy.ndarray:
"""
Scalars から One-hot へコンバートする
:param labels_dense: ラベル配列
:param num_classes: 分類クラス数
:return: ラベル配列(One-hot)
"""
num_labels = labels_dense.shape[0]
index_offset = numpy.arange(num_labels) * num_classes
labels_one_hot = numpy.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
#
# 入力データ取得
#
def get_input_data(dir: str, filename: str, rows: int, cols: int, channel: int, class_numm: int) \
-> (numpy.ndarray, numpy.ndarray):
# データ読み込み(CIFAR-10)
filepath = dir + filename
dict = unpickle(filepath)
# データ取得
data = dict[b'data']
labels = np.array(dict[b'labels'])
# サイズ取得
if labels.shape[0] != data.shape[0]:
raise Exception('Error: Different length')
num_images = labels.shape[0]
# 入力データ整形(4次元配列)
data = data.reshape(num_images, channel, rows, cols) # 2017.7.7 修正
# [n, 3, 32, 32] -> [n, 32, 32, 3] へ変換
data = data.transpose([0, 2, 3, 1]) # 2017.7.7 追加
# ラベルデータ整形(One-Hot)
labels = dense_to_one_hot(labels, class_numm)
return data, labels
#
# Pickle の アンシリアライズ
#
def unpickle(file: str) -> dict:
with open(file, 'rb') as f:
dict = pickle.load(f, encoding="bytes")
return dict
#
# データセット生成
#
def create_dataset(images: numpy.ndarray, labels: numpy.ndarray) -> DataSet:
dataset = DataSet(images, labels, dtype=dtypes.float32)
return dataset
#
# データセット生成
#
def create_datasets(train: DataSet, validation: DataSet, test: DataSet) -> base.Datasets:
return base.Datasets(train=train, validation=validation, test=test)
(追記)2017.7.7
CIFAR-10 のデータ形式は、n x 3 x 32 x 32 のデータ構造となっているため、
畳み込み関数 tf.nn.conv2d の引数 input で指定されている形式である
[batch, in_height, in_width, in_channels] の形に変換する必要があります。
そのため、データ読み込み時に transpose 関数を使用して n x 32 x 32 x 3 の形へ変換します。
ニューラルネットワーク
上記コードの処理の流れとニューラルネットワークの形状は以下のようになります。
※前回と同様です。
処理の流れ
形状
実装コード詳細
実装コードの詳細を以下に記載します。
- 入力データ用プレースホルダ作成
# プレースホルダ作成
# 実行時にデータを入力する必要があるプレースホルダを生成する
x = tf.placeholder(tf.float32, [None, 32 * 32 * 3])
y_ = tf.placeholder(tf.float32, [None, 10])
入力が3チャンネルとなっているため、
プレースホルダも [None, 32 * 32 * 3]
のように指定します。
- 畳み込み1層目
# 畳み込み1層目
W_conv1 = weight_variable([5, 5, 3, 32])
b_conv1 = bias_variable([32])
x_image = tf.reshape(x, [-1,32,32,3])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
こちらも入力チャンネル数に対応して、W_conv1 = weight_variable([5, 5, 3, 32])
や、
x_image = tf.reshape(x, [-1,32,32,3])
のように指定します。
- トレーニング
# トレーニング
for i in range(20000):
# トレーニングデータから50件を読み込む
# データの最後まで読み込んだ時点でトレーニングデータのシャッフルを行い、再び先頭からデータを読み込む
batch_x, batch_y = input_data.train.next_batch(50, array_2d=True)
if i % 500 == 0:
# 途中経過(500件ごと)
train_accuracy = accuracy.eval(feed_dict={x: batch_x, y_: batch_y, keep_prob: 1.0})
print("step %d, training accuracy %f" % (i, train_accuracy))
# トレーニング実行
train_step.run(feed_dict={x: batch_x, y_: batch_y, keep_prob: 0.5})
トレーニング回数は、20000回を設定
一度のトレーニングで、トレーニングデータを 50件ずつ読み込み train_step
を実行します。
また、途中経過として、500回毎に精度を print出力しています。
(この途中経過の精度出力は、算出データとしてトレーニングデータをそのまま使っており、
かつ、データ件数も50件と少ないため信頼度は低くなっています)
- 評価
# 評価
test_x, test_y = input_data.test.next_batch(1000, array_2d=True)
print("test accuracy %f" % accuracy.eval(feed_dict={x: test_x, y_: test_y, keep_prob: 1.0}))
ここでは、テストデータ 1,000件 を使って精度を算出します。
keep_prob
は 1.0 を指定しています。
実行
コードを実行します。
python deep_cifar10_softmax.py
※以前のエントリーでの作成した環境で実行する場合は、仮想環境を起動後に実行します。
結果
実行結果は以下のようになります。
精度は 70.6% でした。
モデルの変更や試行回数を増やすことでもう少し精度が上がるかもしれません。
以上、今回は CIFAR-10 を使った畳み込みニューラルネットワークによるカラー画像のカテゴリー分類を行いました。