Edited at

chainerによるディープラーニングでAV女優の類似画像検索サービスをつくったノウハウを公開する

More than 3 years have passed since last update.


環境

PC: MacBook Air

CPU: 1.4 GHz Intel Core i5

メモリ: 4GB

ふつうのMacBook Airでやったのですが、学習遅いし、メモリ不足で落ちるしでやっぱり辛かったです。


おおまかな流れ


  1. 各女優の画像を収集する。

  2. dlibで顔画像を切り取って96×96の大きさにリサイズする。

  3. 1人につき1000枚の画像になるようデータ拡張する。

  4. データをnumpyファイルに変換する。

  5. chainerで顔画像を学習する。

  6. 学習したモデルから任意の画像の予測をする。


1. 画像を収集する

ここは詳しくは書けないので以下の記事を参考にしてください。

Python: BeautifulSoup4 を使って Web サイトをスクレイピングする

PythonとBeautiful Soupでスクレイピング

取得した画像は女優ごとにディレクトリを分けて保存します。

./folder

|--- /actress1
| |--- image1.jpg
| |--- image2.jpg
| |--- image3.jpg
|
|--- /actress2
| .
| .
|--- /actress3
.
.
.


2. dlibで顔画像を切り抜く

画像認識といえばOpenCVが有名だと思いますがdlib vs OpenCV face detectionを見ると、顔抽出に関してはdlibというライブラリのほうが誤検出が少なくよさそうなので、dlibを使いました。

import os

import sys
import glob
import cv2
from PIL import Image
import dlib

"""
INPUT_DIRは(1.画像を収集する)で取得した画像のディレクトリ名
OUTPUT_DIRは出力ディレクトリ名(フォルダ構成はINPUT_DIRと同じになる)
"""

detector = dlib.get_frontal_face_detector()

# 各女優のディレクトリのリストを取得
dir_list = os.listdir(INPUT_DIR)

for i, dir_name in enumerate(dir_list):
if not os.path.exists(os.path.join(OUTPUT_DIR, dir_name)):
os.mkdir(os.path.join(OUTPUT_DIR, dir_name))
image_files = glob.glob(os.path.join(INPUT_DIR, dir_name, "*.jpg"))

for j, image_file in enumerate(image_files):
img = cv2.imread(image_file)
dets = detector(img, 1)
open_img = Image.open(image_file)

for k, d in enumerate(dets):
# サイズが80以下の画像はスキップする
if d.right()-d.left() < 80 or d.bottom()-d.top() < 80:
continue

image_file = image_file.replace(INPUT_DIR, OUTPUT_DIR)
# 1つの画像に復数の顔があると出力のファイル名がかぶるので変更
output_file = image_file.replace('.jpg', '_'+str(k)+'.jpg')

cropped_img = open_img.crop((d.left(), d.top(), d.right(), d.bottom()))
cropped_img.resize((96,96)).save(output_file, 'JPEG', quality=100, optimize=True)

dlibは顔抽出だけでなく、顔の目、鼻、輪郭などの器官を検出する機能もあります。


参考資料

dlib.net face_detect.py


3. データ拡張 (Data augmentation)

import os

import math
import random
import glob
import numpy as np
from scipy import misc
from PIL import Image
import cv2

# 左右反転
def flip_left_right(image):
return image[:, -1::-1]

# 輝度の変更
def random_brightness(image, max_delta=63, seed=None):
img = np.array(image)
delta = np.random.uniform(-max_delta, max_delta)
image = Image.fromarray(np.uint8(img + delta))
return image

# コントラスト変更
def random_contrast(image, lower, upper, seed=None):
factor = np.random.uniform(-lower, upper)
mean = (image[0] + image[1] + image[2]).astype(np.float32) / 3
img = np.zeros(image.shape, np.float32)
for i in range(0, 3):
img[i] = (img[i] - mean) * factor + mean
return img

# 画像切り抜き
def crop(image, name, crop_size, padding_size):
(width, height) = image.shape
cropped_images = []
for i in xrange(0, width, padding_size):
for j in xrange(0, height, padding_size):
box = (i, j, i+crop_size, j+crop_size) #left, upper, right, lower
cropped_name = name + '_' + str(i) + '_' + str(j) + '.jpg'
cropped_image = image[i:i+crop_size, j:j+crop_size]
resized_image = cv2.resize(cropped_image, (IMAGE_SIZE, IMAGE_SIZE))
cropped_images.append(resized_image)

return cropped_images

# データ拡張
# data_numに指定した値になるまで「左右反転」「輝度の変更」「コントラストの変更」「切り抜き」する
def data_augmentation(image_files, data_num):
image_list = []
file_num = len(image_files)

for image_file in image_files:
image_list.append(misc.imread(image_file))

if file_num >= data_num:
return image_list

# flip left right
random.shuffle(image_list)
for image in image_list:
flipped_image = flip_left_right(image)
image_list.append(flipped_image)
if len(image_list) == data_num:
return image_list

# random brightness
random.shuffle(image_list)
for image in image_list:
brightness_image = random_brightness(image)
image_list.append(brightness_image)
if len(image_list) == data_num:
return image_list

# random contrast
random.shuffle(image_list)
for image in image_list:
contrast_image = random_contrast(image)
image_list.append(contrast_image)
if len(image_list) == data_num:
return image_list

# cropping
random.shuffle(image_list)
image_list.clear()
cropped_size = int(IMAGE_SIZE * 0.75)
padding_size = IMAGE_SIZE - cropped_size
for image in image_list:
cropped_image_list = crop(image, 'image', cropped_size, padding_size)
for cropped_image in cropped_image_list:
image_list.append(cropped_image)
if len(image_list) == data_num:
return image_list

return image_list

dir_list = os.listdir(INPUT_DIR)

for dir in dir_list:
image_files = glob.glob(os.path.join(input_dir, dir, "*.jpg"))
if len(image_files) == 0:
continue

image_list = data_augmentation(image_files, 1000)

for i, image in enumerate(image_list):
image = whitening(image)
misc.imsave(os.path.join(OUTPUT_DIR, dir, str(i) + '.jpg'), image)


参考資料

Chainerのimagenetサンプルで学習データの拡張/whitening


4. データをnumpy形式に変換する

import os

import sys
import glob
import random
import numpy as np
from scipy import misc

""" Get files from specified directory """
def load_data_from_dir(input_dir_name, input_dir_list, start_index, test_freq):
train_list = []
test_list = []

for dir_index, dir_name in enumerate(input_dir_list):
image_files = glob.glob(os.path.join(input_dir_name, dir_name, "*.jpg"))
train_count = 0
test_count = 0
print('directory:{} index:{}'.format(dir_name, dir_index + start_index))

for file_index, file_name in enumerate(image_files):
image = misc.imread(file_name)
label = np.int32(dir_index + start_index)
if not file_index % test_freq == 0: # set train datq
train_list.append((dir_name, image, label))
train_count += 1
else:
test_list.append((dir_name, image, label))
test_count += 1

print("directory:{} total:{} train:{} test:{}".format(
dir_name, train_count + test_count, train_count, test_count))

return train_list, test_list

""" Save data in a numpy format """
def save_dataset_numpy(data_list, image_path, label_path):
image_list = []
label_list = []
for _, image, label in data_list:
image_list.append(image)
label_list.append(label)

image_data = np.array(image_list, dtype=np.float32)
label_data = np.array(label_list, dtype=np.int32)

np.save(image_path, image_data)
np.save(label_path, label_data)

for i in xrange(0, len(DIR_LIST), 10):
# 10クラスごとにファイルを作成する
train_list, test_list = load_data_from_dir(INPUT_DIR, dir_list[i:i+args.interval], i, 10)

train_data_path = os.path.join(OUTPUT_DIR, 'train', 'data-{}.npy'.format(i+args.interval))
train_label_path = os.path.join(OUTPUT_DIR, 'train', 'label-{}.npy'.format(i+args.interval))
test_data_path = os.path.join(OUTPUT_DIR, 'test', 'data-{}.npy'.format(i+args.interval))
test_label_path = os.path.join(OUTPUT_DIR, 'test', 'label-{}.npy'.format(i+args.interval))

save_dataset_numpy(train_list, train_data_path, train_label_path)
save_dataset_numpy(test_list, test_data_path, test_label_path)


5. chainerで顔画像を学習する

最初はTensorflowで作ってみたのですが、個人的には余計な機能が多かったのでChainerに切り替えました。

学習は最初にCIFAR-10(一般物体の10クラス分類)でちゃんと学習するものをつくってから、実際のデータを学習するようにしました。

ネットワークはAlexNetにBatch Normalizationを入れたものを少し変えて使用しました。


失敗したこと


  • 最初からmultiprocessingを使って(こんなかんじ)で作成したのですが、デバッグが結構大変だったので、最初は本当にシンプルなプログラムを作ったほうがいいと思います。


  • 画像を一気に読み込むとインプット画像だけで1.7GB程度あるのでメモリが死にます。そのため、ソースは汚くなりますが、バッチ毎にデータをdeleteしてメモリを解放するようにBatchIteratorクラスを作成しました。


// 1枚当たりの大きさ  

96×96×3 = 27648(byte)

// 1クラスあたり
27648×1000 = 27648000(byte) = 26.4(MB)

// 全体 (66クラス) ... 計算あってる?
26.4×66 = 1742.4(MB) = 1.7(GB)


"""
Batch iterator class

Usage:
batch_iter = BatchIter(DATA_DIR, 100)

for batch_data, batch_label in batch_iter:
batch_start_time = time.time()
x = np.asarray(batch_data, dtype=np.float32).transpose((0, 3, 1, 2))
t = np.asarray(train_batch_label, dtype=np.int32)
x = Variable(xp.asarray(x))
t = Variable(xp.asarray(t))

optimizer.update(model, x, t)
"""
class BatchIter(object):
def __init__(self, data_dir, batch_size):
self.index = 0
self.batch_size = batch_size
self.data_files = glob.glob(os.path.join(data_dir, 'data-*.npy'))
self.label_files = glob.glob(os.path.join(data_dir, 'label-*.npy'))
data_size = 0
for data in self.data_files:
loaded_data = np.load(data)
data_size += loaded_data.shape[0]
del loaded_data
self.data_size = data_size

assert len(self.data_files) == len(self.label_files), "Invalid data size."

def __iter__(self):
return self

def next(self):
if self.index >= self.data_size:
raise StopIteration()

data = np.zeros((self.batch_size, IMAGE_SIZE, IMAGE_SIZE, 3))
label = np.zeros((self.batch_size))
incremental_value = int(self.batch_size / len(self.data_files))

count = 0
for i in range(len(self.data_files)):
loaded_data = np.load(self.data_files[i])
loaded_label = np.load(self.label_files[i])
assert loaded_data.shape[0] == loaded_label.shape[0], "Loaded data size is invalid."

perm = np.random.permutation(loaded_data.shape[0])
if i + 1 == len(self.data_files): # last item
incremental_value = self.batch_size - count
idx = perm[0:incremental_value]
else:
idx = perm[0:incremental_value]

data[count:count+incremental_value] = loaded_data[idx]
label[count:count+incremental_value] = loaded_label[idx]

count += incremental_value
del loaded_data
del loaded_label

self.index += self.batch_size
return data, label


参考資料


6. 学習したモデルから任意の画像の予測をする

def set_model(model_name, model_path):

model_fn = os.path.basename('models/' + model_name + '.py')
model = imp.load_source(model_fn.split('.')[0],
'models/' + model_name + '.py').model

print('Load model from ', model_path)
serializers.load_hdf5(model_path, model)

return model

def set_optimizer(opt_name, opt_path, model):
if opt_name == 'MomentumSGD':
optimizer = optimizers.MomentumSGD(momentum=0.9)
elif opt_name == 'Adam':
optimizer = optimizers.Adam()
elif opt_name == 'AdaGrad':
optimizer = optimizers.AdaGrad()
else:
raise ValueError('Invalid architecture name')

optimizer.setup(model)

print('Load optimizer state from ', opt_path)
serializers.load_hdf5(opt_path, optimizer)

return optimizer

def detect_face(image_file):
detector = dlib.get_frontal_face_detector()
#img = cv2.imread(image_file)
image = misc.imread(image_file)
dets = detector(image, 1)
d = dets[0]
cropped_image = image[d.top():d.bottom(), d.left():d.right()]
resized_image = misc.imresize(cropped_image, (96, 96))
return resized_image

# モデル読み込み
model = set_model(model_name, model_path)
optimizer = set_optimizer(opt_name, opt_path, model)
detected_face_image = detect_face(input_image)

# 読み込んだモデルから予測する
x = np.asarray(detected_face_image, dtype=np.float32).transpose((0, 3, 1, 2))
x = Variable(np.asarray(x), volatile='on')
pred = model.predict(x).data

# ラベル読み込み (ラベル用のファイルはnumpy形式のファイルを作成時につくる)
categories = np.loadtxt(label_path, str, delimiter="\n")

# スコアの高い順に並び替える
score = pred.reshape((pred.size,))
result = zip(score, categories)
result = sorted(result, reverse=True)

results = []
for i, (score, label) in enumerate(result[:10]):
if i == 5: break
print('num:{} score:{:.5f} label:{}'.format(i + 1, score * 100, label))
results.append({
'label': label,
'score': str(round(score * 100, 2))
})


運用サーバー

全然ディープラーニングと関係ないですが、Webサイト構築時に最初はHerokuとか試したんですが、最終的にConohaにしました。

dlibとかchainerを使うときはHerokuだとインストールで結構苦労しました。Conohaは少し前は稼働率に問題があったようですが、リニューアルしていいかんじのようです。

さくらVPSとも迷ったのですが、さくらVPSは初期費用がかかるのに対してConohaは初期費用無料なのが決め手でした。  

学習部分のコードが抽象化されたChainer Trainerを使ってみた記事も書いてみたのでよかったら参考にしてください。

Chainer Trainerを使って独自データセットの学習をしてみた

最後に宣伝になりますが、CNNを使ってAV女優の類似画像検索をしたサイトを作っているので、よかったら見てみてください。

Babelink - 類似AV女優検索サービス

※アダルトサイトのため、閲覧には十分注意をしてください。