LoginSignup
7
10

More than 5 years have passed since last update.

Movidius™ Neural Compute Stickを使って、Raspberry Piで顔識別を試みる

Last updated at Posted at 2018-05-11

はじめに

Movidius™ Neural Compute Stickで自作CNNを使った識別を行う手順を整理してみます。
ここでは、以前作成した記事にある顔識別のデータを流用して速度と精度を確認します。

複数のRaspberry PiとMacを使って入室監視システムを作る

また、この記事は以下の記事をかなり参考にさせて頂きました。

Movidius NCS + Raspberrypi で自作DNNモデルを動かす

準備

項目 備考
学習用PC MacBook Air
識別用Raspi Raspberry Pi 3
外部計算機器 Movidius™ Neural Compute Stick
プログラミング言語 Python 3
深層学習用ライブラリ Tensorflow

必要なソフトウェアやMovidius™ Neural Compute Stick SDKのインストール手順については参考サイトをご参照下さい。

顔画像の取得

以下の記事にあるスクリプトを使って顔画像を収集します。

複数のRaspberry PiとMacを使って入室監視システムを作る

学習用ライブラリ作成 on 学習用PC and 識別用Raspi

事前に以下のスクリプトを作成し、流用できるように備える。

load_face_image.py
import os
import random
import cv2
import numpy as np

def make_labels():

    path = "./faces/train"
    dirs = os.listdir(path)
    dirs = [f for f in dirs if os.path.isdir(os.path.join(path, f))]

    label_dict = {}
    i = 0

    for dirname in dirs:
        label_dict[dirname] = i
        i += 1

    with open("label.csv","w") as f:
        for row in label_dict:
            f.write("{},{}\n".format(row, label_dict[row]))

    return label_dict

def load_labels():

    label_dict = {}

    with open("label.csv","r") as f:
        for row in f:
            key, val = row.strip().split(",")
            label_dict[key] = int(val)

    return label_dict

def load_data(data_type):

    filenames, images, labels = [], [], []

    walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))

    for root, dirs, files in walk:
        filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]

    # Shuffle files
    random.shuffle(filenames)

    # Read, resize, and reshape images
    images = []
    for file in filenames:
        img = cv2.imread(file)
        img = cv2.resize(img, (32,32))
        images.append(img.astype(np.float32) / 255.0)
    images = np.asarray(images)

    label_dict = load_labels()

    for filename in filenames:
        label = np.zeros(len(label_dict))
        for k, v in label_dict.items():
           if k in filename:
                label[v] = 1.
        labels.append(label)
    labels = np.asarray(labels)

    return images, labels

if __name__=="__main__":

    make_labels()
    print(load_labels())

    imgs, labels = load_data("train")
    print(imgs.shape, len(labels))

    imgs, labels = load_data("test")
    print(imgs.shape, len(labels))

学習用スクリプト実行 on 学習用PC

以下のスクリプトを学習用PCで実行。

train.py
from load_face_image import *

import tensorflow as tf
import numpy as np

def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)


def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)


def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')


def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')


def max_pool_3x3(x):
    return tf.nn.max_pool(x, ksize=[1, 3, 3, 1], strides=[1, 3, 3, 1], padding='SAME')


def batch_norm_wrapper(inputs, phase_train=None, decay=0.99):
    epsilon = 1e-5
    out_dim = inputs.get_shape()[-1]
    scale = tf.Variable(tf.ones([out_dim]))
    beta = tf.Variable(tf.zeros([out_dim]))
    pop_mean = tf.Variable(tf.zeros([out_dim]), trainable=False)
    pop_var = tf.Variable(tf.ones([out_dim]), trainable=False)
    if phase_train == None:
        return tf.nn.batch_normalization(inputs, pop_mean, pop_var, beta, scale, epsilon)
    rank = len(inputs.get_shape())
    axes = list(range(rank - 1))
    batch_mean, batch_var = tf.nn.moments(inputs, axes)
    ema = tf.train.ExponentialMovingAverage(decay=decay)

    def update():
        ema_apply_op = ema.apply([batch_mean, batch_var])
        with tf.control_dependencies([ema_apply_op]):
            return tf.nn.batch_normalization(inputs, tf.identity(batch_mean), tf.identity(batch_var), beta, scale,
                                             epsilon)

    def average():
        train_mean = pop_mean.assign(ema.average(batch_mean))
        train_var = pop_var.assign(ema.average(batch_var))
        with tf.control_dependencies([train_mean, train_var]):
            return tf.nn.batch_normalization(inputs, train_mean, train_var, beta, scale, epsilon)

    return tf.cond(phase_train, update, average)

def train(fine_tune, all_x, all_y):

    with tf.Graph().as_default():

        phase_train = tf.placeholder(tf.bool)

        x = tf.placeholder('float', [None, 32, 32, 3], name="input")
        y = tf.placeholder('float', [None, len(all_y[0])])

        keep_prob = tf.placeholder(tf.float32)

        x_image = tf.reshape(x, [-1, 32, 32, 3])
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

        # Pooling layer
        h_pool1 = max_pool_2x2(h_conv1)

        # Convolution layer
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

        # Pooling layer
        h_pool2 = max_pool_2x2(h_conv2)

        # Full connected layer
        W_fc1 = weight_variable([8 * 8 * 64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

        # Dropout
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

        # Full connected layer
        W_fc2 = weight_variable([1024, len(label_dict)])
        b_fc2 = bias_variable([len(label_dict)])

        y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")

        cross_entropy = -tf.reduce_sum(y * tf.log(y_conv))
        train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
        correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        with tf.Session() as sess:

            sess.run(tf.global_variables_initializer())
            saver = tf.train.Saver()

            if fine_tune:
                last_model = "./model/cnn-model-99"
                print("load " + last_model)
                saver.restore(sess, last_model)

            all_imgs = np.shape(all_x)[0]

            train_indices = np.random.choice(all_imgs, round(all_imgs * 0.9), replace=False)
            test_indices = np.array(list(set(range(all_imgs)) - set(train_indices)))

            x_train = all_x[train_indices]
            x_test = all_x[test_indices]
            y_train = all_y[train_indices]
            y_test = all_y[test_indices]

            num_epoch = 100
            num_data = x_train.shape[0]
            print(num_data, x_train.shape)

            batch_size = 32
            for epoch in range(num_epoch):
                for idx in range(0, num_data, batch_size):
                    rand_index = np.random.choice(num_data, size=batch_size)
                    batch_x = x_train[rand_index]
                    batch_y = y_train[rand_index]
                    _, loss = sess.run(
                        [train_step, cross_entropy],
                        feed_dict={phase_train: True, x: batch_x, y: batch_y, keep_prob: 0.5})
                    print("epoch %d, images %d, loss %g" % (epoch, idx, loss / batch_size))
                test_accuracy, test_loss = sess.run(
                    [accuracy, cross_entropy],
                    feed_dict={phase_train: False, x: x_test, y: y_test, keep_prob: 1.0})
                print("epoch %d, test accuracy %g" % (epoch, test_accuracy))

            saver.save(sess, "./model/" + 'cnn-model', global_step=epoch)

if __name__=="__main__":

    label_dict = make_labels()

    train_images, train_labels = load_data('train')
    print("train_images", len(train_images))
    print(train_images.shape, train_labels.shape)

    train(False, train_images.astype(np.float16), train_labels)

学習データの変換 on 学習用PC

以下のスクリプトにより、学習データをMovidiusライブラリで使用できるように変換。

create_pred_model.py
import tensorflow as tf
from train import weight_variable, bias_variable
from train import conv2d, max_pool_2x2, max_pool_3x3
from train import batch_norm_wrapper

def predict(input):

    x_image = input
    label_dict = make_labels()

    W_conv1 = weight_variable([5, 5, 3, 32])
    b_conv1 = bias_variable([32])
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    # Pooling layer
    h_pool1 = max_pool_2x2(h_conv1)

    # Convolution layer
    W_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    # Pooling layer
    h_pool2 = max_pool_2x2(h_conv2)

    # Full connected layer
    W_fc1 = weight_variable([8 * 8 * 64, 1024])
    b_fc1 = bias_variable([1024])
    h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

    # Dropout
    keep_prob = 1.0
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # Full connected layer
    W_fc2 = weight_variable([1024, len(label_dict)])
    b_fc2 = bias_variable([len(label_dict)])

    output = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")

    return output

def run(name):
    with tf.Graph().as_default():
        input = tf.placeholder('float', [1, 32, 32, 3], name="input")
        output = predict(input)

        with tf.Session() as sess:
            saver = tf.train.Saver(tf.global_variables())
            last_model = "./model/cnn-model-99"
            saver.restore(sess, last_model)
            saver.save(sess, name)

run('./model/pred_model')

作成したら、以下のコマンドで実行。

python create_pred_model.py

モデルのコンパイル on 識別用Raspi

学習結果の保存されているmodelフォルダをRaspberry Piにコピーする。

コピーした後、Raspberry Piで以下のコマンドを実行。

cd model
mvNCCompile -s 12 pred_model.meta -in=input -on=output

識別テスト on 識別用Raspi

収集した顔画像のうちtestフォルダに入っているものをRaspberry Piのfacesフォルダにコピー。
また、事前に作成した「load_face_image.py」というスクリプトもコピーしておきます。

コピーしたら以下のスクリプトを作成。

prediction.py
from load_face_image import *

import mvnc.mvncapi as mvnc
import numpy as np

def predict(input, labels):
    devices = mvnc.EnumerateDevices()
    device = mvnc.Device(devices[0])
    device.OpenDevice()

    with open('./model/graph', 'rb') as f:
        blob = f.read()
    graph = device.AllocateGraph(blob)

    for i in range(len(input)):
        graph.LoadTensor(input[i], 'user object')
        output, userobj = graph.GetResult()
        #print(output)
        print(np.argmax(output), labels[i])

    graph.DeallocateGraph()
    device.CloseDevice()

    return output

if __name__ == "__main__":

    test_images, test_labels = load_data('test')
    print("test_images", len(test_images))

    labels = []
    for label in test_labels:
        labels.append(np.argmax(label))

    input = test_images
    predict(input.astype(np.float16), labels)

以下のコマンドをRaspberry Piで実行。

python3 prediction.py

できた!

7
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
10