はじめに
TensorFlowについて学習するにあたり、何か良い題材がないかと探していたら面白そうなQiitaの記事を発見したのですが、記事ではPython2系を使われているようだったので、これをPython3系に移植してみることしました。
コード以外の詳細は以下のURLをご参照下さい。
画像収集
2系から3系に移植するにあたり、以下の点を修正。
- インポートするライブラリの変更
#import urllib2
#from urllib import urlretrieve
import urllib.request
- 記述の変更
urllib2.urlopen -> urllib.request.urlopen
os.makedirs(dirpath) -> os.makedirs(dirpath, exist_ok=True)
urlretrieve -> urllib.request.urlretrieve
- 2012年以前への対応
y = int(url[-4:])
if y > 2012:
# 2013年以降用
html = urllib.request.urlopen('{}/photo'.format(url.strip()))
else:
# 2012年以前用
url2012 = "https://misscolle.com/" + url.strip().split('/')[-1].replace("miss","")
html = urllib.request.urlopen('{}/photo'.format(url2012))
上記の変更を施した以下のコードを実行して画像を収集。
# -*- coding:utf-8 -*-
import os
import bs4
import time
import random
from itertools import chain
import urllib.request
base_url = 'http://misscolle.com'
def fetch_page_urls():
html = urllib.request.urlopen('{}/versions'.format(base_url))
soup = bs4.BeautifulSoup(html, 'html.parser')
columns = soup.find_all('ul', class_='columns')
atags = map(lambda column: column.find_all('a'), columns)
with open('page_urls.txt', 'w') as f:
for _ in chain.from_iterable(atags):
path = _.get('href')
if not path.startswith('http'): # Relative path
path = '{}{}'.format(base_url, path)
if path[-1] == '/': # Normalize
path = path[:-1]
f.write('{}\n'.format(path))
def fetch_photos():
with open('page_urls.txt') as f:
for url in f:
# Make directories for saving images
dirpath = 'photos/{}'.format(url.strip().split('/')[-1].replace("20","/20").replace("miss",""))
os.makedirs(dirpath, exist_ok=True)
y = int(url[-4:])
if y > 2012:
# 2013年以降用
html = urllib.request.urlopen('{}/photo'.format(url.strip()))
else:
# 2012年以前用
url2012 = "https://misscolle.com/" + url.strip().split('/')[-1].replace("miss","")
html = urllib.request.urlopen('{}/photo'.format(url2012))
soup = bs4.BeautifulSoup(html, 'html.parser')
photos = soup.find_all('li', class_='photo')
paths = map(lambda path: path.find('a').get('href'), photos)
for path in paths:
filename = '_'.join(path.split('?')[0].split('/')[-2:])
filepath = '{}/{}'.format(dirpath, filename)
print(filepath)
# Download image file
urllib.request.urlretrieve('{}{}'.format(base_url, path), filepath)
# Add random waiting time (4 - 6 sec)
time.sleep(4 + random.randint(0, 2))
if __name__ == '__main__':
fetch_page_urls()
fetch_photos()
顔領域のトリミング
OpenCVを利用するので、インストールされていなければインストールします。
Macであれば以下のリンク先にインストール手順が書いてあります。
OpenCVのインストール後、使用する評価器をコードと同じフォルダにコピーします。
$ cp /usr/local/share/OpenCV/haarcascades/haarcascade_frontalface_alt2.xml ./
トリミングを行う部分のソースコードはほとんどそのままですが、makedirsに「exist_ok=True」の設定を追加してあります。また、作業経過がわかるように作業中のファイル名を表示するようprint文を追加しました。
# -*- coding:utf-8 -*-
import os
import cv2
def main():
for srcpath, _, files in os.walk('photos'):
if len(_):
continue
dstpath = srcpath.replace('photos', 'faces')
os.makedirs(dstpath, exist_ok=True)
for filename in files:
if filename.startswith('.'): # Pass .DS_Store
continue
try:
detect_faces(srcpath, dstpath, filename)
print(srcpath, dstpath, filename)
except:
continue
def detect_faces(srcpath, dstpath, filename):
cascade = cv2.CascadeClassifier('haarcascade_frontalface_alt2.xml')
image = cv2.imread('{}/{}'.format(srcpath, filename))
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
faces = cascade.detectMultiScale(gray_image)
# Extract when just one face is detected
if (len(faces) == 1):
(x, y, w, h) = faces[0]
image = image[y:y+h, x:x+w]
image = cv2.resize(image, (100, 100))
cv2.imwrite('{}/{}'.format(dstpath, filename), image)
if __name__ == '__main__':
main()
画像の学習
2014年以前の画像を学習に使用し、2015年の画像を検証に使用するということで、記事に記載はありませんが、きっと「train」と「test」というフォルダを作成して、対象の写真をそのフォルダに入れて学習をされているような気がします。
その通りにしても良いのですが、もしかしたら「どの大学でもない」という選択肢も今後あるかもしれないと思い、全てのデータを学習できるよう修正してみました。
また、学習結果の保存先をカレントディレクトリではなく「model」というフォルダに保存するよう変更してあります。
ちなみに、Python3系ではmapオブジェクトやfilterオブジェクトの扱いが2系と異なるため、listに変換して使用しています。
ついでに、TensorFlowのバージョンが古いとうまく動かない時があるようです。
もし、うまくいかない場合は以下のコマンドでバージョンアップさせて下さい。
$ pip install tensorflow --upgrade
では、以下のスクリプトを実行させます。
# -*- coding:utf-8 -*-
import os
import random
import numpy as np
import tensorflow as tf
label_dict = {
'aoyama': 0, 'jissen': 1, 'keio': 2, 'phoenix': 3, 'rika': 4,
'rikkyo': 5, 'seikei': 6, 'sophia': 7, 'todai': 8, 'tonjo': 9
}
def load_data(data_type):
filenames, images, labels = [], [], []
if data_type == "test":
walk = filter(lambda _: not len(_[1]) and "2015" in _[0], os.walk('faces'))
elif data_type == "train":
walk = filter(lambda _: not len(_[1]) and not "2015" in _[0], os.walk('faces'))
else:
walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))
for root, dirs, files in walk:
filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]
# Shuffle files
random.shuffle(filenames)
# Read, resize, and reshape images
images = map(lambda _: tf.image.decode_jpeg(tf.read_file(_), channels=3), filenames)
images = map(lambda _: tf.image.resize_images(_, [32, 32]), images)
images = list(map(lambda _: tf.reshape(_, [-1]), images))
for filename in filenames:
label = np.zeros(10)
for k, v in label_dict.items():
if k in filename:
label[v] = 1.
labels.append(label)
return images, labels
def get_batch_list(l, batch_size):
# [1, 2, 3, 4, 5,...] -> [[1, 2, 3], [4, 5,..]]
return [np.asarray(l[_:_+batch_size]) for _ in range(0, len(l), batch_size)]
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
def inference(images_placeholder, keep_prob):
# Convolution layer
x_image = tf.reshape(images_placeholder, [-1, 32, 32, 3])
W_conv1 = weight_variable([5, 5, 3, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
# Pooling layer
h_pool1 = max_pool_2x2(h_conv1)
# Convolution layer
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
# Pooling layer
h_pool2 = max_pool_2x2(h_conv2)
# Full connected layer
W_fc1 = weight_variable([8 * 8 * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
# Dropout
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
# Full connected layer
W_fc2 = weight_variable([1024, 10])
b_fc2 = bias_variable([10])
return tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)
def main():
with tf.Graph().as_default():
train_images, train_labels = load_data('train')
test_images, test_labels = load_data('test')
print("train_images", len(train_images))
print("test_images", len(test_images))
x = tf.placeholder('float', shape=[None, 32 * 32 * 3]) # 32 * 32, 3 channels
y_ = tf.placeholder('float', shape=[None, 10]) # 10 classes
keep_prob = tf.placeholder('float')
y_conv = inference(x, keep_prob)
# Loss function
cross_entropy = -tf.reduce_sum(y_ * tf.log(y_conv))
tf.summary.scalar('cross_entropy', cross_entropy)
# Minimize cross entropy by using SGD
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
# Accuracy
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
tf.summary.scalar('accuracy', accuracy)
saver = tf.train.Saver()
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter('./logs', sess.graph)
batched_train_images = get_batch_list(train_images, 25)
batched_train_labels = get_batch_list(train_labels, 25)
train_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(train_images)))
test_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(test_images)))
train_labels, test_labels = np.asarray(train_labels), np.asarray(test_labels)
# Train
for step, (images, labels) in enumerate(zip(batched_train_images, batched_train_labels)):
images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, images))
sess.run(train_step, feed_dict={ x: images, y_: labels, keep_prob: 0.5 })
train_accuracy = accuracy.eval(feed_dict = {
x: train_images, y_: train_labels, keep_prob: 1.0 })
print('step {}, training accuracy {}'.format(step, train_accuracy))
summary_str = sess.run(summary_op, feed_dict={
x: train_images, y_: train_labels, keep_prob: 1.0 })
summary_writer.add_summary(summary_str, step)
# Test trained model
test_accuracy = accuracy.eval(feed_dict = {
x: test_images, y_: test_labels, keep_prob: 1.0 })
print('test accuracy {}'.format(test_accuracy))
# Save model
save_path = saver.save(sess, "model/model.ckpt")
if __name__ == '__main__':
main()
画像の判別
前述の「train.py」のmainだけを以下のとおり変更して「predict.py」と名称を変更して保存します。
そして、facesフォルダの中に「experiment」というフォルダを作成し、顔画像を保存してから実行。
ちなみに、学習結果のデータ読込元をmodelフォルダに変更してあります。
# 略
def main():
with tf.Graph().as_default():
test_images, test_labels = load_data('experiment')
x = tf.placeholder('float', shape=[None, 32 * 32 * 3]) # 32 * 32, 3 channels
y_ = tf.placeholder('float', shape=[None, 10]) # 10 classes
keep_prob = tf.placeholder('float')
y_conv = inference(x, keep_prob)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, "model/model.ckpt")
test_images = list(map(lambda _: sess.run(_).astype(np.float32) / 255.0, np.asarray(test_images)))
print(y_conv.eval(feed_dict={ x: [test_images[0]], keep_prob: 1.0 })[0])
print(np.argmax(y_conv.eval(feed_dict={ x: [test_images[0]], keep_prob: 1.0 })[0]))
if __name__ == '__main__':
main()
できた!
ちなみに、画像データの読み込みに1日、画像の学習に半日かかってしまい...高性能なマシンが欲しいと心から思わされる作業でした(-_-;)