Python
機械学習
python3
CNN
TensorFlow
OriginalBitStarDay 22

TensorFlowとCNNで、自作データセットを画像分類する

初めまして!
BitStarでエンジニアのインターンをしている久根間です。

普段は、Railsで、社内システムや自社サービスの開発をしていますが、個人的に、機械学習に興味があるので、自作したデータセットを画像解析する方法をまとめてみようと思います。

はじめに

この記事では、TensorflowのDeep MNIST for Expertsチュートリアルのコードを少し変えて、自作したデータセットを学習させていく方法を書いていきたいと思います。なので、今回は、データの扱いを中心にいきたいと思います。チュートリアルやCNNについては、深く突っ込まないので、ご了承ください。

具体的に、何を分類するかですが、BitStarでは、YouTubeのチャンネルを、ジャンルごとに分類したりしています。そこで、『YouTubeのサムネイルから、動画の内容を分類する』をやっていきたいと思います。〜やってみたの動画、ゲーム実況の動画、スポーツの試合の動画などが、YouTubeには多いので、それらを分類器にかけていこうと思います。ジャンルは、〜やってみた系とスポーツ系とゲーム系になるので、3クラスの分類になります。

少しずつ解説しながらいくので、コードをまとめて見たい方は、一番下のコード全般を見て下さい。

今回の記事は、Jupyter NotebookでPython3を動かしていきます。

チュートリアルを理解する

どういったデータを用意すればいいのか、チュートリアルのコードから決めていきます。
そこで、ますは、データを用意する前に、チュートリアルを少し見ていきましょう。

学習部分
with tf.Session() as sess :
    sess.run(tf.global_variables_initializer())

    for i in range(20000) :
        # ここで、batchにMNISTのデータをわたしている。
        batch = mnist.train.next_batch(50)
        # ここが、学習部分
        train_step.run(feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5})

上記のコードは、実際にMNISTデータを入れて、学習させている部分を抜粋して省略したものです。変数batchに、バッチ数分のデータを入れています。batch[0]には、手書きで書かれた数字の画像データ、batch[1]には、その画像が、0~9のどの数字なのかを示したラベルデータが入っています。なので、次に、変数batchにどういったデータが入るか確認します。

MNISTの中身の確認
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# MNISTデータのダウンロード
# TensorFlowが入っていれば、簡単にダウンロードできます。
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

# バッチ数分だけ、mnistからランダムに選択し、データを返すメソッド
batch = mnist.train.next_batch(50)

print("画像データ", "オブジェクト: " + str(type(batch[0])),
      "データ型: " + str(batch[0].dtype), "N次元配列: " + str(batch[0].shape), sep='\n')
print("ラベルデータ", "オブジェクト: " + str(type(batch[1])),
      "データ型: " + str(batch[1].dtype), "N次元配列: " + str(batch[1].shape), sep='\n')

画像データ
オブジェクト: <class 'numpy.ndarray'>
データ型: float32
N次元配列: (50, 784)

ラベルデータ
オブジェクト: <class 'numpy.ndarray'>
データ型: float64
N次元配列: (50, 10)

mnistに入ってるデータは、画像、ラベルともに、numpyのN次元配列になっていることがわかります。また、どちらもバッチサイズの50個分の行が入っているので、一行に画像、ラベル、それぞれ一つずつのデータが入っていることがわかります。データの型は、float32と64になっています。

一つの画像が、784個の列で表されているのは、mnist画像が、28x28ピクセルの白黒データだからです。この画像データを、一行で表すには、28*28*1=784個の要素が必要になります。つまり、「ピクセル数xピクセル数xチャネル=要素数」となります。
チャネルとは、画像ファイルに格納されている、色情報のことです。MNISTは、白黒なので「1」になります。

また、ラベルデータが、10個の列なのは、分類するクラスが、0~9の10クラスになるからです。

ここで、試しに画像データを一つ出力してみます。

先頭の画像データ
batch[0][0]

array([ 0.,  0.,  0.,  0.,  0.,  ...,  0.09803922,  0.40784317,  0.85490203,  0.67058825,
          ...,  0.40784317,  0.85490203,  0.67058825,  ...,  0.,  0.,  0.])

画像データは、1ピクセルごとに、数値で表されています。また、どの要素も、0.0~1.0の範囲の数値で表されています。これは、その範囲に収まるように正規化されているからです。

次に、上の画像データに対応するラベルデータを出力してみます。

先頭のラベルデータ
batch[1][0]

array([ 0.,  0.,  0.,  0.,  0.,  0.,  1.,  0.,  0.,  0.])

「1.」の位置が、前から6番目にあるので、数字の6を示すラベルになります。つまり、上の画像データは、数字の6の画像ということになります。

ここまでのまとめ

ここまでのことをまとめて、どういったデータにすべきか、箇条書きします。

データセットは、numpyのN次元配列

画像データ
1. (バッチ数, ピスセル数xピクセル数xチャネル数)のN次元配列
2. 0〜1の値に正規化する
3. dtypeは、float32

ラベルデータ
1. (バッチ数, 分類するクラスの数)のN次元配列
2. 画像に対応する位置に「1.」を入れる
3. dtypeは、float64

これらを意識して、データセットを作っていきます。

データセットを自作する

ここから、実際にデータセットを作っていきます。まず、画像を読み込み、数値の配列にする方法ですが、PythonライブラリのOpenCVで簡単にできます。実際に、画像を取り出してみます。

OpenCVで画像を読み込む
import cv2

image = cv2.imread("image/youtuber/10000.jpg")
print(image)

array([[[103,  65,   8],
        [107,  79,  57],
        [169, 153,  97],
        ..., 
        [ 97,  66,  66],
        [ 95,  62,  63],
        [100,  54,  54]]], dtype=uint8)

こういった感じに、cv2.imread("パス")の引数に画像へのパスを渡すと、OpenCVが画像を読み込んでくれます。ここで、取り出した画像の型などをみてみます。

OpenCVで読み込まれた画像
print("ラベルデータ", "オブジェクト: " + str(type(image)),
      "データ型: " + str(image.dtype), "N次元配列: " + str(image.shape), sep='\n')

オブジェクト: <class 'numpy.ndarray'>
データ型: uint8
N次元配列: (110, 196, 3)

OpenCVで、読み込むとnumpyのN次元配列で返ってきてくれるのですが、dtypeは、unit8となっているので、後で変更します。また、ピスセル数は、28x28になるように、一つ一つの要素も0~1に収まるようにします。

画像へのパスとラベルの配列を作る

OpenCVで画像は、読み込めるのですが、ひとつずつやっていては、大変なので、画像へのパスと対応するラベルを記載したファイルをつくり、それを利用してアクセスすることにします。

その前に、〜やってみた系とゲーム系、スポーツ系で、3つのサムネイルにジャンル分けしたいので、imageディレクトリ以下に、youtuber、game、sportの3つのディレクトリを作成し、それぞれ画像データを保存していきます。それぞれ、1000個のサムネイルを用意しました。

次に、パスと対応するラベルを記載したファイルをつくります。

画像ファイルへのパスとラベル(教師データ)を記述する
with open("path_and_label.txt", "w") as file :
    for count in range(0, 1000) :
        file.write("image/youtuber/" + str(0) + str(count).zfill(4) + ".jpg" + " " + "0" + "\n")
        count += 1
    for count in range(0, 1000) :
        file.write("image/game/" + str(1) + str(count).zfill(4) + ".jpg" + " " + "1" + "\n")
        count += 1
    for count in range(0, 1000) :
        file.write("image/sport/" + str(2) + str(count).zfill(4) + ".jpg" + " " + "2" + "\n")
        count += 1

まず、パスを記載し、スペースをあけて、ラベルをつけるようなコードにしました。
実際のファイルは、以下のようになりなす。

path_and_label.txt
image/youtuber/00000.jpg 0
image/youtuber/00001.jpg 0
image/youtuber/00002.jpg 0
image/youtuber/00003.jpg 0
     :
     :
image/youtuber/00998.jpg 0
image/youtuber/00999.jpg 0
image/game/10000.jpg 1
image/game/10001.jpg 1
     :
     :
image/game/10998.jpg 0
image/game/10999.jpg 0
image/sport/20000.jpg 1
image/sport/20001.jpg 1
     :
     :
image/sport/20996.jpg 1
image/sport/20997.jpg 1
image/sport/20998.jpg 1
image/sport/20999.jpg 1

OpenCVで読み込み、データセットをつくる

先ほど箇条書きしたことを念頭において、データを整形していきます。ここでは、(110, 196, 3)の画像データを、(2352, )の一次元の形にし、データ型をfloat32にしたあと、0.0~1.0の範囲に正規化します。また、分類するクラス数の長さを持つ仮のN次元配列のラベルデータを作成、画像データとペアにして、DATA_SETリストにいれていきます。
(2352, ) = (28x28x3, )

データセットを作成する
import random
import numpy as np

PATH_AND_LABEL = []

# パスとラベルが記載されたファイルから、それらのリストを作成する
with open("パスとラベルが記載されたファイル", mode='r') as file :
      for line in file :
           # 改行を除く
         line = line.rstrip()
       # スペースで区切られたlineを、リストにする
       line_list = line.split()
       PATH_AND_LABEL.append(line_list)
              # 同じジャンルのサムネイルが、かたまらないように、シャッフルする
       random.shuffle(PATH_AND_LABEL)


DATA_SET = []

for path_label in PATH_AND_LABEL :

    tmp_list = []

    # 画像を読み込み、サイズを変更する
    img = cv2.imread(path_label[0])
    img = cv2.resize(img, (28, 28))
    # (28, 28, 3)のN次元配列を一次元に、dtypeをfloat32に、0〜1の値に正規化する
    img = img.flatten().astype(np.float32)/255.0

    tmp_list.append(img)

    # 分類するクラス数の長さを持つ仮のN次元配列を作成する
    classes_array = np.zeros(3, dtype = 'float64')
    # ラベルの数字によって、リストを更新する
    classes_array[int(path_label[1])] = 1

    tmp_list.append(classes_array)

    DATA_SET.append(tmp_list)

パスとラベルのリスト、PATH_AND_LABELを作成します。その後、PATH_AND_LABELリストをfor文で回し、画像データとラベルで構成されたデータセット(DATA_SET)を作成します。

最後に、訓練用のデータと、テスト用のデータに、データセットを分割します。訓練用に、全データの8割をあてました。

データの分割
TRAIN_DATA_SIZE = int(len(DATA_SET) * 0.8)
TRAIN_DATA_SET = DATA_SET[:TRAIN_DATA_SIZE]
TEST_DATA_SET = DATA_SET[TRAIN_DATA_SIZE:]

これで、データは完成です。

バッチする

mnistチュートリアルでは、データセットにnext_batch(50)と言う特別ばメソッドが用意されていますが、自作のデータセットには、それがないので、バッチを返す関数を定義します。

バッチ数だけデータを取り出す関数
def batch_data(data_set, batch_size) :

    data_set = random.sample(data_set, batch_size)

    return data_set

自作したデータセットは、訓練用とテスト用には、分かれていますが、画像データとラベルがセットで同じリストに入っています。そこで、これらを分割する関数を定義しました。また、最終的に、(バッチ数, 2352)の形で画像データが、(バッチ数, 3)の形でラベルデータが返ってきて欲しいので、その処理も入れます。

画像とラベルを分割する関数
def devide_data_set(data_set) :
    # ndarrayにすることで、以下のように配列にアクセスすることができる。
    data_set = np.array(data_set)
    image_data_set = data_set[:int(len(data_set)), :1].flatten()
    label_data_set = data_set[:int(len(data_set)), 1:].flatten()

    image_ndarray = np.empty((0, 2352))
    label_ndarray = np.empty((0, 3))

    for (img, label) in zip(image_data_set, label_data_set) :
        image_ndarray = np.append(image_ndarray, np.reshape(img, (1, 2352)), axis=0)
        label_ndarray = np.append(label_ndarray, np.reshape(label, (1, 3)), axis=0)

    return image_ndarray, label_ndarray

訓練を行う

訓練を行うコード
with tf.Session() as sess :
    sess.run(tf.global_variables_initializer())
    for epoch_step in range(MAX_EPOCH) :
        train_data_set = batch_data(TRAIN_DATA_SET, BATCH_SIZE)
        train_image, train_label = devide_data_set(train_data_set)

        if epoch_step % BATCH_SIZE == 0 :
            train_accuracy = accuracy.eval(feed_dict={x: train_image, y_: train_label, keep_prob: 1.0})
            print('epoch_step %d, training accuracy %g' % (epoch_step, train_accuracy))

        train_step.run(feed_dict={x: train_image, y_: train_label, keep_prob: 0.5})

    test_image, test_label = devide_data_set(TEST_DATA_SET)
    print('test accuracy %g' % accuracy.eval(feed_dict={
        x: test_image, y_: test_label, keep_prob: 1.0}))

それでは、実際に訓練を行っていきます。上のコードは、省略しているので、注意してください。全コードは、一番下にあります。

実行結果
MAX_EPOCH = 2000
BATCH_SIZE = 50

epoch_step 0, training accuracy 0.44
epoch_step 50, training accuracy 0.5
epoch_step 100, training accuracy 0.64
epoch_step 150, training accuracy 0.7
epoch_step 200, training accuracy 0.58
epoch_step 250, training accuracy 0.76
epoch_step 300, training accuracy 0.74
epoch_step 350, training accuracy 0.72
epoch_step 400, training accuracy 0.62
epoch_step 450, training accuracy 0.84
epoch_step 500, training accuracy 0.8
epoch_step 550, training accuracy 0.84
epoch_step 600, training accuracy 0.96
epoch_step 650, training accuracy 0.76
epoch_step 700, training accuracy 0.86
epoch_step 750, training accuracy 0.88
epoch_step 800, training accuracy 0.94
epoch_step 850, training accuracy 0.9
epoch_step 900, training accuracy 0.9
epoch_step 950, training accuracy 0.86

test accuracy 0.718333

テストデータに対する正解率は、約72%になりました。思いつきでサムネイルを分類しようとやってみて、なんらかの傾向を返してくれるので、CNNはすごいなと思います。

最後に

今回は、自作したデータセットで、CNNを利用した画像分類をする方法について、まとめていきました。TensorflowのDeep MNIST for Expertsチュートリアルについては、他の多くのところで、詳細に解説されています。しかし、自分で用意したデータで、分類を行う方法について書かれた記事は、少ないように感じました。なので、少しでも、参考になればと思います。

現在、BitStarの開発部署には、三名のインターン生が在籍しています。BitStarのシステムは、Ruby on Railsで開発しているので、普段は、記事のような機械学習に関わったりはしません。しかし、Railsでの開発は、社員の方と変わらないくらいで、任せてもらっています。なので、実務レベルでのRailsの開発はもちろん、エンジニアとしての業務フローなど、多くのことを学ばさせてもらっています。

BitStarでは、社員のエンジニアだけでなく、インターン生も、一緒に開発していただける方を探しています!
Railsでの開発に興味がある方は、応募お待ちしています。

コード全般

コード全般
import random
import numpy as np
import cv2
import tensorflow as tf


CHANNELS = 3
NUM_CLASSES = 3
IMAGE_SIZE = 28
IMAGE_MATRIX_SIZE = IMAGE_SIZE * IMAGE_SIZE * CHANNELS
PATH_LABEL_FILE = "path_and_label.txt"


PATH_AND_LABEL = []

with open(PATH_LABEL_FILE, mode='r') as file :

    for line in file :
        # 改行を除く
        line = line.rstrip()
        # スペースで区切られたlineを、配列にする
        line_list = line.split()
        PATH_AND_LABEL.append(line_list)
        random.shuffle(PATH_AND_LABEL)


DATA_SET = []

for path_label in PATH_AND_LABEL :

    tmp_list = []

    # 画像を読み込み、サイズを変更する
    img = cv2.imread(path_label[0])
    img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE))
    # 行列を一次元に、dtypeをfloat32に、0〜1の値に正規化する
    img = img.flatten().astype(np.float32)/255.0

    tmp_list.append(img)

    # 分類するクラス数の長さを持つ仮のリストを作成する
    classes_array = np.zeros(NUM_CLASSES, dtype = 'float64')
    # ラベルの数字によって、リストを更新する
    classes_array[int(path_label[1])] = 1

    tmp_list.append(classes_array)

    DATA_SET.append(tmp_list)


TRAIN_DATA_SIZE = int(len(DATA_SET) * 0.8)
TRAIN_DATA_SET = DATA_SET[:TRAIN_DATA_SIZE]
TEST_DATA_SET = DATA_SET[TRAIN_DATA_SIZE:]


def batch_data(data_set, batch_size) :

    data_set = random.sample(data_set, batch_size)

    return data_set


def devide_data_set(data_set) :
    data_set = np.array(data_set)
    image_data_set = data_set[:int(len(data_set)), :1].flatten()
    label_data_set = data_set[:int(len(data_set)), 1:].flatten()

    image_ndarray = np.empty((0, IMAGE_MATRIX_SIZE))
    label_ndarray = np.empty((0, NUM_CLASSES))

    for (img, label) in zip(image_data_set, label_data_set) :
        image_ndarray = np.append(image_ndarray, np.reshape(img, (1, IMAGE_MATRIX_SIZE)), axis=0)
        label_ndarray = np.append(label_ndarray, np.reshape(label, (1, NUM_CLASSES)), axis=0)

    return image_ndarray, label_ndarray


def conv2d(x, W) :
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x) :
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

def weight_variable(shape) :
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape) :
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)


def deepnn(x) :

    with tf.name_scope('reshape') :
        x_image = tf.reshape(x, [-1, IMAGE_SIZE, IMAGE_SIZE, CHANNELS])

    with tf.name_scope('conv1') :
        W_conv1 = weight_variable([5, 5, CHANNELS, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    with tf.name_scope('pool1') :
        h_pool1 = max_pool_2x2(h_conv1)

    with tf.name_scope('conv2') :
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    with tf.name_scope('pool2') :
        h_pool2 = max_pool_2x2(h_conv2)

    with tf.name_scope('fc1') :
        W_fc1 = weight_variable([7 * 7 * 64, 1024])
        b_fc1 = bias_variable([1024])

        h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

    with tf.name_scope('dropout') :
        keep_prob = tf.placeholder(tf.float32)
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    with tf.name_scope('fc2') :
        W_fc2 = weight_variable([1024, NUM_CLASSES])
        b_fc2 = bias_variable([NUM_CLASSES])

        y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

    return y_conv, keep_prob


def main(_):

    x = tf.placeholder(tf.float32, [None, IMAGE_MATRIX_SIZE])

    y_ = tf.placeholder(tf.float32, [None, NUM_CLASSES])

    y_conv, keep_prob = deepnn(x)

    with tf.name_scope('loss'):
        cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_,logits=y_conv)
    cross_entropy = tf.reduce_mean(cross_entropy)

    with tf.name_scope('adam_optimizer'):
        train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

    with tf.name_scope('accuracy'):
        correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
        correct_prediction = tf.cast(correct_prediction, tf.float32)
    accuracy = tf.reduce_mean(correct_prediction)

    with tf.Session() as sess :
        sess.run(tf.global_variables_initializer())
        for epoch_step in range(MAX_EPOCH) :
            train_data_set = batch_data(TRAIN_DATA_SET, BATCH_SIZE)
            train_image, train_label = devide_data_set(train_data_set)

            if epoch_step % BATCH_SIZE == 0 :
                train_accuracy = accuracy.eval(feed_dict={x: train_image, y_: train_label, keep_prob: 1.0})
                print('epoch_step %d, training accuracy %g' % (epoch_step, train_accuracy))

            train_step.run(feed_dict={x: train_image, y_: train_label, keep_prob: 0.5})

        test_image, test_label = devide_data_set(TEST_DATA_SET)
        print('test accuracy %g' % accuracy.eval(feed_dict={
            x: test_image, y_: test_label, keep_prob: 1.0}))


MAX_EPOCH = 1000
BATCH_SIZE = 50


if __name__ == '__main__' :
    main(_)

epoch_step ~, training accuracy ~
test accuracy ~