chainerによるディープラーニングでAV女優の類似画像検索サービスをつくったノウハウを公開する

  • 248
    いいね
  • 0
    コメント

環境

PC: MacBook Air
CPU: 1.4 GHz Intel Core i5
メモリ: 4GB

ふつうのMacBook Airでやったのですが、学習遅いし、メモリ不足で落ちるしでやっぱり辛かったです。

おおまかな流れ

  1. 各女優の画像を収集する。
  2. dlibで顔画像を切り取って96×96の大きさにリサイズする。
  3. 1人につき1000枚の画像になるようデータ拡張する。
  4. データをnumpyファイルに変換する。
  5. chainerで顔画像を学習する。
  6. 学習したモデルから任意の画像の予測をする。

1. 画像を収集する

ここは詳しくは書けないので以下の記事を参考にしてください。

Python: BeautifulSoup4 を使って Web サイトをスクレイピングする
PythonとBeautiful Soupでスクレイピング

取得した画像は女優ごとにディレクトリを分けて保存します。

./folder
    |--- /actress1
    |        |--- image1.jpg
    |        |--- image2.jpg
    |        |--- image3.jpg
    |
    |--- /actress2
    |        .
    |        .
    |--- /actress3
    .
    .
    .

2. dlibで顔画像を切り抜く

画像認識といえばOpenCVが有名だと思いますがdlib vs OpenCV face detectionを見ると、顔抽出に関してはdlibというライブラリのほうが誤検出が少なくよさそうなので、dlibを使いました。

import os
import sys
import glob
import cv2
from PIL import Image
import dlib

"""
INPUT_DIRは(1.画像を収集する)で取得した画像のディレクトリ名
OUTPUT_DIRは出力ディレクトリ名(フォルダ構成はINPUT_DIRと同じになる)
"""

detector = dlib.get_frontal_face_detector()

# 各女優のディレクトリのリストを取得
dir_list = os.listdir(INPUT_DIR)

for i, dir_name in enumerate(dir_list):
    if not os.path.exists(os.path.join(OUTPUT_DIR, dir_name)):
        os.mkdir(os.path.join(OUTPUT_DIR, dir_name))
    image_files = glob.glob(os.path.join(INPUT_DIR, dir_name, "*.jpg"))

    for j, image_file in enumerate(image_files):
        img = cv2.imread(image_file)
        dets = detector(img, 1)
        open_img = Image.open(image_file)

        for k, d in enumerate(dets):
            # サイズが80以下の画像はスキップする
            if d.right()-d.left() < 80 or d.bottom()-d.top() < 80:
                continue

            image_file = image_file.replace(INPUT_DIR, OUTPUT_DIR)
            # 1つの画像に復数の顔があると出力のファイル名がかぶるので変更
            output_file = image_file.replace('.jpg', '_'+str(k)+'.jpg')

            cropped_img = open_img.crop((d.left(), d.top(), d.right(), d.bottom()))
            cropped_img.resize((96,96)).save(output_file, 'JPEG', quality=100, optimize=True)

dlibは顔抽出だけでなく、顔の目、鼻、輪郭などの器官を検出する機能もあります。

参考資料

dlib.net face_detect.py

3. データ拡張 (Data augmentation)

import os
import math
import random
import glob
import numpy as np
from scipy import misc
from PIL import Image
import cv2

# 左右反転
def flip_left_right(image):
    return image[:, -1::-1]

# 輝度の変更
def random_brightness(image, max_delta=63, seed=None):
    img = np.array(image)
    delta = np.random.uniform(-max_delta, max_delta)
    image = Image.fromarray(np.uint8(img + delta))
    return image

# コントラスト変更
def random_contrast(image, lower, upper, seed=None):
    factor = np.random.uniform(-lower, upper)
    mean = (image[0] + image[1] + image[2]).astype(np.float32) / 3
    img = np.zeros(image.shape, np.float32)
    for i in range(0, 3):
        img[i] = (img[i] - mean) * factor + mean
    return img

# 画像切り抜き
def crop(image, name, crop_size, padding_size):
    (width, height) = image.shape
    cropped_images = []
    for i in xrange(0, width, padding_size):
        for j in xrange(0, height, padding_size):
            box = (i, j, i+crop_size, j+crop_size) #left, upper, right, lower
            cropped_name = name + '_' + str(i) + '_' + str(j) + '.jpg'
            cropped_image = image[i:i+crop_size, j:j+crop_size]
            resized_image = cv2.resize(cropped_image, (IMAGE_SIZE, IMAGE_SIZE))
            cropped_images.append(resized_image)

    return cropped_images

# データ拡張
# data_numに指定した値になるまで「左右反転」「輝度の変更」「コントラストの変更」「切り抜き」する
def data_augmentation(image_files, data_num):
    image_list = []
    file_num = len(image_files)

    for image_file in image_files:
        image_list.append(misc.imread(image_file))

    if file_num >= data_num:
        return image_list

    # flip left right
    random.shuffle(image_list)
    for image in image_list:
        flipped_image = flip_left_right(image)
        image_list.append(flipped_image)
        if len(image_list) == data_num:
            return image_list

    # random brightness
    random.shuffle(image_list)
    for image in image_list:
        brightness_image = random_brightness(image)
        image_list.append(brightness_image)
        if len(image_list) == data_num:
            return image_list

    # random contrast
    random.shuffle(image_list)
    for image in image_list:
        contrast_image = random_contrast(image)
        image_list.append(contrast_image)
        if len(image_list) == data_num:
            return image_list

    # cropping
    random.shuffle(image_list)
    image_list.clear()
    cropped_size = int(IMAGE_SIZE * 0.75)
    padding_size = IMAGE_SIZE - cropped_size
    for image in image_list:
        cropped_image_list = crop(image, 'image', cropped_size, padding_size)
        for cropped_image in cropped_image_list:
            image_list.append(cropped_image)
            if len(image_list) == data_num:
                return image_list

    return image_list


dir_list = os.listdir(INPUT_DIR)

for dir in dir_list:
    image_files = glob.glob(os.path.join(input_dir, dir, "*.jpg"))
    if len(image_files) == 0:
        continue


    image_list = data_augmentation(image_files, 1000)

    for i, image in enumerate(image_list):
        image = whitening(image)
        misc.imsave(os.path.join(OUTPUT_DIR, dir, str(i) + '.jpg'), image)

参考資料

Chainerのimagenetサンプルで学習データの拡張/whitening

4. データをnumpy形式に変換する

import os
import sys
import glob
import random
import numpy as np
from scipy import misc

""" Get files from specified directory """
def load_data_from_dir(input_dir_name, input_dir_list, start_index, test_freq):
    train_list = []
    test_list = []

    for dir_index, dir_name in enumerate(input_dir_list):
        image_files = glob.glob(os.path.join(input_dir_name, dir_name, "*.jpg"))
        train_count = 0
        test_count = 0
        print('directory:{} index:{}'.format(dir_name, dir_index + start_index))

        for file_index, file_name in enumerate(image_files):
            image = misc.imread(file_name)
            label = np.int32(dir_index + start_index)
            if not file_index % test_freq == 0: # set train datq
                train_list.append((dir_name, image, label))
                train_count += 1
            else:
                test_list.append((dir_name, image, label))
                test_count += 1

        print("directory:{} total:{} train:{} test:{}".format(
               dir_name, train_count + test_count, train_count, test_count))

    return train_list, test_list

""" Save data in a numpy format """
def save_dataset_numpy(data_list, image_path, label_path):
    image_list = []
    label_list = []
    for _, image, label in data_list:
        image_list.append(image)
        label_list.append(label)

    image_data = np.array(image_list, dtype=np.float32)
    label_data = np.array(label_list, dtype=np.int32)

    np.save(image_path, image_data)
    np.save(label_path, label_data)

for i in xrange(0, len(DIR_LIST), 10):
    # 10クラスごとにファイルを作成する
    train_list, test_list = load_data_from_dir(INPUT_DIR, dir_list[i:i+args.interval], i, 10)

    train_data_path = os.path.join(OUTPUT_DIR, 'train', 'data-{}.npy'.format(i+args.interval))
    train_label_path = os.path.join(OUTPUT_DIR, 'train', 'label-{}.npy'.format(i+args.interval))
    test_data_path = os.path.join(OUTPUT_DIR, 'test', 'data-{}.npy'.format(i+args.interval))
    test_label_path = os.path.join(OUTPUT_DIR, 'test', 'label-{}.npy'.format(i+args.interval))

    save_dataset_numpy(train_list, train_data_path, train_label_path)
    save_dataset_numpy(test_list, test_data_path, test_label_path)

5. chainerで顔画像を学習する

最初はTensorflowで作ってみたのですが、個人的には余計な機能が多かったのでChainerに切り替えました。

学習は最初にCIFAR-10(一般物体の10クラス分類)でちゃんと学習するものをつくってから、実際のデータを学習するようにしました。

ネットワークはAlexNetにBatch Normalizationを入れたものを少し変えて使用しました。

失敗したこと

  • 最初からmultiprocessingを使って(こんなかんじ)で作成したのですが、デバッグが結構大変だったので、最初は本当にシンプルなプログラムを作ったほうがいいと思います。

  • 画像を一気に読み込むとインプット画像だけで1.7GB程度あるのでメモリが死にます。そのため、ソースは汚くなりますが、バッチ毎にデータをdeleteしてメモリを解放するようにBatchIteratorクラスを作成しました。

// 1枚当たりの大きさ  
96×96×3 = 27648(byte)

// 1クラスあたり  
27648×1000 = 27648000(byte) = 26.4(MB)

// 全体 (66クラス) ... 計算あってる? 
26.4×66 = 1742.4(MB) = 1.7(GB)

"""
Batch iterator class

Usage:
batch_iter = BatchIter(DATA_DIR, 100)

for batch_data, batch_label in batch_iter:
    batch_start_time = time.time()
    x = np.asarray(batch_data, dtype=np.float32).transpose((0, 3, 1, 2))
    t = np.asarray(train_batch_label, dtype=np.int32)
    x = Variable(xp.asarray(x))
    t = Variable(xp.asarray(t))

    optimizer.update(model, x, t)
"""
class BatchIter(object):
    def __init__(self, data_dir, batch_size):
        self.index = 0
        self.batch_size = batch_size
        self.data_files = glob.glob(os.path.join(data_dir, 'data-*.npy'))
        self.label_files = glob.glob(os.path.join(data_dir, 'label-*.npy'))
        data_size = 0
        for data in self.data_files:
            loaded_data = np.load(data)
            data_size += loaded_data.shape[0]
            del loaded_data
        self.data_size = data_size

        assert len(self.data_files) == len(self.label_files), "Invalid data size."

    def __iter__(self):
        return self

    def next(self):
        if self.index >= self.data_size:
            raise StopIteration()

        data = np.zeros((self.batch_size, IMAGE_SIZE, IMAGE_SIZE, 3))
        label = np.zeros((self.batch_size))
        incremental_value = int(self.batch_size / len(self.data_files))

        count = 0
        for i in range(len(self.data_files)):
            loaded_data = np.load(self.data_files[i])
            loaded_label = np.load(self.label_files[i])
            assert loaded_data.shape[0] == loaded_label.shape[0], "Loaded data size is invalid."

            perm = np.random.permutation(loaded_data.shape[0])
            if i + 1 == len(self.data_files): # last item
                incremental_value = self.batch_size - count
                idx = perm[0:incremental_value]
            else:
                idx = perm[0:incremental_value]

            data[count:count+incremental_value] = loaded_data[idx]
            label[count:count+incremental_value] = loaded_label[idx]

            count += incremental_value
            del loaded_data
            del loaded_label

        self.index += self.batch_size
        return data, label

参考資料

6. 学習したモデルから任意の画像の予測をする

def set_model(model_name, model_path):
    model_fn = os.path.basename('models/' + model_name + '.py')
    model = imp.load_source(model_fn.split('.')[0],
                            'models/' + model_name + '.py').model

    print('Load model from ', model_path)
    serializers.load_hdf5(model_path, model)

    return model

def set_optimizer(opt_name, opt_path, model):
    if opt_name == 'MomentumSGD':
        optimizer = optimizers.MomentumSGD(momentum=0.9)
    elif opt_name == 'Adam':
        optimizer = optimizers.Adam()
    elif opt_name == 'AdaGrad':
        optimizer = optimizers.AdaGrad()
    else:
        raise ValueError('Invalid architecture name')

    optimizer.setup(model)

    print('Load optimizer state from ', opt_path)
    serializers.load_hdf5(opt_path, optimizer)

    return optimizer

def detect_face(image_file):
    detector = dlib.get_frontal_face_detector()
    #img = cv2.imread(image_file)
    image = misc.imread(image_file)
    dets = detector(image, 1)
    d = dets[0]
    cropped_image = image[d.top():d.bottom(), d.left():d.right()]
    resized_image = misc.imresize(cropped_image, (96, 96))
    return resized_image


# モデル読み込み
model = set_model(model_name, model_path)
optimizer = set_optimizer(opt_name, opt_path, model)
detected_face_image = detect_face(input_image)

# 読み込んだモデルから予測する
x = np.asarray(detected_face_image, dtype=np.float32).transpose((0, 3, 1, 2))
x = Variable(np.asarray(x), volatile='on') 
pred = model.predict(x).data

# ラベル読み込み (ラベル用のファイルはnumpy形式のファイルを作成時につくる)
categories = np.loadtxt(label_path, str, delimiter="\n")

# スコアの高い順に並び替える
score = pred.reshape((pred.size,))
result = zip(score, categories)
result = sorted(result, reverse=True)

results = []
for i, (score, label) in enumerate(result[:10]):
    if i == 5: break
    print('num:{} score:{:.5f} label:{}'.format(i + 1, score * 100, label))
    results.append({
        'label': label,
        'score': str(round(score * 100, 2))
    })

運用サーバー

全然ディープラーニングと関係ないですが、Webサイト構築時に最初はHerokuとか試したんですが、最終的にConohaにしました。
dlibとかchainerを使うときはHerokuだとインストールで結構苦労しました。Conohaは少し前は稼働率に問題があったようですが、リニューアルしていいかんじのようです。
さくらVPSとも迷ったのですが、さくらVPSは初期費用がかかるのに対してConohaは初期費用無料なのが決め手でした。  

学習部分のコードが抽象化されたChainer Trainerを使ってみた記事も書いてみたのでよかったら参考にしてください。
Chainer Trainerを使って独自データセットの学習をしてみた

最後に宣伝になりますが、CNNを使ってAV女優の類似画像検索をしたサイトを作っているので、よかったら見てみてください。
Babelink - 類似AV女優検索サービス
※アダルトサイトのため、閲覧には十分注意をしてください。