Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

「ある美女がどの大学にいそうかをCNNで判別する」という記事のコードをPython3系に移植してみる

More than 3 years have passed since last update.

はじめに

TensorFlowについて学習するにあたり、何か良い題材がないかと探していたら面白そうなQiitaの記事を発見したのですが、記事ではPython2系を使われているようだったので、これをPython3系に移植してみることしました。
コード以外の詳細は以下のURLをご参照下さい。

http://qiita.com/pika_shi/items/3c8ab1a8ecc655b33851

画像収集

2系から3系に移植するにあたり、以下の点を修正。

  • インポートするライブラリの変更
#import urllib2
#from urllib import urlretrieve
import urllib.request
  • 記述の変更
urllib2.urlopen -> urllib.request.urlopen
os.makedirs(dirpath) -> os.makedirs(dirpath, exist_ok=True)
urlretrieve -> urllib.request.urlretrieve
  • 2012年以前への対応
y = int(url[-4:])

if y > 2012:
    # 2013年以降用
    html = urllib.request.urlopen('{}/photo'.format(url.strip()))
else:
    # 2012年以前用
    url2012 = "https://misscolle.com/" + url.strip().split('/')[-1].replace("miss","")
    html = urllib.request.urlopen('{}/photo'.format(url2012))

上記の変更を施した以下のコードを実行して画像を収集。

photo_collector.py
# -*- coding:utf-8 -*-

import os
import bs4
import time
import random
from itertools import chain

import urllib.request

base_url = 'http://misscolle.com'

def fetch_page_urls():
    html = urllib.request.urlopen('{}/versions'.format(base_url))
    soup = bs4.BeautifulSoup(html, 'html.parser')

    columns = soup.find_all('ul', class_='columns')
    atags = map(lambda column: column.find_all('a'), columns)

    with open('page_urls.txt', 'w') as f:
        for _ in chain.from_iterable(atags):
            path = _.get('href')
            if not path.startswith('http'):  # Relative path
                path = '{}{}'.format(base_url, path)
            if path[-1] == '/':  # Normalize
                path = path[:-1]
            f.write('{}\n'.format(path))

def fetch_photos():
    with open('page_urls.txt') as f:
        for url in f:
            # Make directories for saving images
            dirpath = 'photos/{}'.format(url.strip().split('/')[-1].replace("20","/20").replace("miss",""))
            os.makedirs(dirpath, exist_ok=True)

            y = int(url[-4:])

            if y > 2012:
                # 2013年以降用
                html = urllib.request.urlopen('{}/photo'.format(url.strip()))
            else:
                # 2012年以前用
                url2012 = "https://misscolle.com/" + url.strip().split('/')[-1].replace("miss","")
                html = urllib.request.urlopen('{}/photo'.format(url2012))

            soup = bs4.BeautifulSoup(html, 'html.parser')
            photos = soup.find_all('li', class_='photo')
            paths = map(lambda path: path.find('a').get('href'), photos)

            for path in paths:
                filename = '_'.join(path.split('?')[0].split('/')[-2:])
                filepath = '{}/{}'.format(dirpath, filename)
                print(filepath)
                # Download image file
                urllib.request.urlretrieve('{}{}'.format(base_url, path), filepath)
                # Add random waiting time (4 - 6 sec)
                time.sleep(4 + random.randint(0, 2))

if __name__ == '__main__':
    fetch_page_urls()
    fetch_photos()

顔領域のトリミング

OpenCVを利用するので、インストールされていなければインストールします。
Macであれば以下のリンク先にインストール手順が書いてあります。

http://qiita.com/mix_dvd/items/7aae3a50cdfb6baaf54d

OpenCVのインストール後、使用する評価器をコードと同じフォルダにコピーします。

$ cp /usr/local/share/OpenCV/haarcascades/haarcascade_frontalface_alt2.xml ./

トリミングを行う部分のソースコードはほとんどそのままですが、makedirsに「exist_ok=True」の設定を追加してあります。また、作業経過がわかるように作業中のファイル名を表示するようprint文を追加しました。

face_detecter.py
# -*- coding:utf-8 -*-

import os
import cv2

def main():
    for srcpath, _, files in os.walk('photos'):
        if len(_):
            continue
        dstpath = srcpath.replace('photos', 'faces')
        os.makedirs(dstpath, exist_ok=True)
        for filename in files:
            if filename.startswith('.'):  # Pass .DS_Store
                continue
            try:
                detect_faces(srcpath, dstpath, filename)
                print(srcpath, dstpath, filename)
            except:
                continue

def detect_faces(srcpath, dstpath, filename):

    cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt2.xml')
    image = cv2.imread('{}/{}'.format(srcpath, filename))
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = cascade.detectMultiScale(gray_image)
    # Extract when just one face is detected
    if (len(faces) == 1):
        (x, y, w, h) = faces[0]
        image = image[y:y+h, x:x+w]
        image = cv2.resize(image, (100, 100))
        cv2.imwrite('{}/{}'.format(dstpath, filename), image)

if __name__ == '__main__':
    main()

画像の学習

2014年以前の画像を学習に使用し、2015年の画像を検証に使用するということで、記事に記載はありませんが、きっと「train」と「test」というフォルダを作成して、対象の写真をそのフォルダに入れて学習をされているような気がします。

その通りにしても良いのですが、もしかしたら「どの大学でもない」という選択肢も今後あるかもしれないと思い、全てのデータを学習できるよう修正してみました。

また、学習結果の保存先をカレントディレクトリではなく「model」というフォルダに保存するよう変更してあります。

ちなみに、Python3系ではmapオブジェクトやfilterオブジェクトの扱いが2系と異なるため、listに変換して使用しています。

ついでに、TensorFlowのバージョンが古いとうまく動かない時があるようです。
もし、うまくいかない場合は以下のコマンドでバージョンアップさせて下さい。

$ pip install tensorflow --upgrade

では、以下のスクリプトを実行させます。

train.py
# -*- coding:utf-8 -*-

import os
import random
import numpy as np
import tensorflow as tf

label_dict = {
    'aoyama': 0, 'jissen': 1, 'keio': 2, 'phoenix': 3, 'rika': 4,
    'rikkyo': 5, 'seikei': 6, 'sophia': 7, 'todai': 8, 'tonjo': 9
}

def load_data(data_type):
    filenames, images, labels = [], [], []

    if data_type == "test":
        walk = filter(lambda _: not len(_[1]) and "2015" in _[0], os.walk('faces'))
    elif data_type == "train":
        walk = filter(lambda _: not len(_[1]) and not "2015" in _[0], os.walk('faces'))
    else:
        walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))

    for root, dirs, files in walk:
        filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]
    # Shuffle files
    random.shuffle(filenames)
    # Read, resize, and reshape images
    images = map(lambda _: tf.image.decode_jpeg(tf.read_file(_), channels=3), filenames)
    images = map(lambda _: tf.image.resize_images(_, [32, 32]), images)
    images = list(map(lambda _: tf.reshape(_, [-1]), images))

    for filename in filenames:
        label = np.zeros(10)
        for k, v in label_dict.items():
           if k in filename:
                label[v] = 1.
        labels.append(label)

    return images, labels

def get_batch_list(l, batch_size):
    # [1, 2, 3, 4, 5,...] -> [[1, 2, 3], [4, 5,..]]
    return [np.asarray(l[_:_+batch_size]) for _ in range(0, len(l), batch_size)]

def weight_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial)

def bias_variable(shape):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial)

def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

def inference(images_placeholder, keep_prob):
    # Convolution layer
    x_image = tf.reshape(images_placeholder, [-1, 32, 32, 3])
    W_conv1 = weight_variable([5, 5, 3, 32])
    b_conv1 = bias_variable([32])
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    # Pooling layer
    h_pool1 = max_pool_2x2(h_conv1)

    # Convolution layer
    W_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    # Pooling layer
    h_pool2 = max_pool_2x2(h_conv2)

    # Full connected layer
    W_fc1 = weight_variable([8 * 8 * 64, 1024])
    b_fc1 = bias_variable([1024])
    h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

    # Dropout
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # Full connected layer
    W_fc2 = weight_variable([1024, 10])
    b_fc2 = bias_variable([10])

    return tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)

def main():
    with tf.Graph().as_default():
        train_images, train_labels = load_data('train')
        test_images, test_labels = load_data('test')
        print("train_images", len(train_images))
        print("test_images", len(test_images))

        x = tf.placeholder('float', shape=[None, 32 * 32 * 3])  # 32 * 32, 3 channels
        y_ = tf.placeholder('float', shape=[None, 10])  # 10 classes
        keep_prob = tf.placeholder('float')

        y_conv = inference(x, keep_prob)
        # Loss function
        cross_entropy = -tf.reduce_sum(y_ * tf.log(y_conv))
        tf.summary.scalar('cross_entropy', cross_entropy)
        # Minimize cross entropy by using SGD
        train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
        # Accuracy
        correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
        tf.summary.scalar('accuracy', accuracy)

        saver = tf.train.Saver()
        sess = tf.InteractiveSession()
        sess.run(tf.global_variables_initializer())

        summary_op = tf.summary.merge_all()
        summary_writer = tf.summary.FileWriter('./logs', sess.graph)

        batched_train_images = get_batch_list(train_images, 25)
        batched_train_labels = get_batch_list(train_labels, 25)

        train_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(train_images)))
        test_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(test_images)))
        train_labels, test_labels = np.asarray(train_labels), np.asarray(test_labels)

        # Train
        for step, (images, labels) in enumerate(zip(batched_train_images, batched_train_labels)):
            images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, images))
            sess.run(train_step, feed_dict={ x: images, y_: labels, keep_prob: 0.5 })
            train_accuracy = accuracy.eval(feed_dict = {
                x: train_images, y_: train_labels, keep_prob: 1.0 })
            print('step {}, training accuracy {}'.format(step, train_accuracy))
            summary_str = sess.run(summary_op, feed_dict={
                x: train_images, y_: train_labels, keep_prob: 1.0 })
            summary_writer.add_summary(summary_str, step)
        # Test trained model
        test_accuracy = accuracy.eval(feed_dict = {
            x: test_images, y_: test_labels, keep_prob: 1.0 })
        print('test accuracy {}'.format(test_accuracy))
        # Save model
        save_path = saver.save(sess, "model/model.ckpt")

if __name__ == '__main__':
    main()

画像の判別

前述の「train.py」のmainだけを以下のとおり変更して「predict.py」と名称を変更して保存します。
そして、facesフォルダの中に「experiment」というフォルダを作成し、顔画像を保存してから実行。

ちなみに、学習結果のデータ読込元をmodelフォルダに変更してあります。

predict.py
# 略

def main():
    with tf.Graph().as_default():
        test_images, test_labels = load_data('experiment')
        x = tf.placeholder('float', shape=[None, 32 * 32 * 3])  # 32 * 32, 3 channels
        y_ = tf.placeholder('float', shape=[None, 10])  # 10 classes
        keep_prob = tf.placeholder('float')

        y_conv = inference(x, keep_prob)

        sess = tf.InteractiveSession()
        sess.run(tf.global_variables_initializer())
        saver = tf.train.Saver()
        saver.restore(sess, "model/model.ckpt")

        test_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(test_images)))

        print(y_conv.eval(feed_dict={ x: [test_images[0]], keep_prob: 1.0 })[0])
        print(np.argmax(y_conv.eval(feed_dict={ x: [test_images[0]], keep_prob: 1.0 })[0]))

if __name__ == '__main__':
    main()

できた!

ちなみに、画像データの読み込みに1日、画像の学習に半日かかってしまい...高性能なマシンが欲しいと心から思わされる作業でした(-_-;)

mix_dvd
ExcelのマクロやWebアプリケーション、iOSアプリを作っています。 また、しまねソフト研究開発センター専門研究員の業務を受託しています。http://www.s-itoc.jp
http://blueomega.jp
s-itoc
しまねソフト研究開発センター(ITOC)はITを活用する企業の支援と研究開発の拠点です。
http://www.s-itoc.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away