Movidius™ Neural Compute Stickを使って、Raspberry Piで顔識別を試みる

Last updated at Posted at 2018-05-11


Movidius™ Neural Compute Stickで自作CNNを使った識別を行う手順を整理してみます。

複数のRaspberry PiとMacを使って入室監視システムを作る


Movidius NCS + Raspberrypi で自作DNNモデルを動かす


項目 備考
学習用PC MacBook Air
識別用Raspi Raspberry Pi 3
外部計算機器 Movidius™ Neural Compute Stick
プログラミング言語 Python 3
深層学習用ライブラリ Tensorflow

必要なソフトウェアやMovidius™ Neural Compute Stick SDKのインストール手順については参考サイトをご参照下さい。



複数のRaspberry PiとMacを使って入室監視システムを作る

学習用ライブラリ作成 on 学習用PC and 識別用Raspi


import os
import random
import cv2
import numpy as np

def make_labels():

    path = "./faces/train"
    dirs = os.listdir(path)
    dirs = [f for f in dirs if os.path.isdir(os.path.join(path, f))]

    label_dict = {}
    i = 0

    for dirname in dirs:
        label_dict[dirname] = i
        i += 1

    with open("label.csv","w") as f:
        for row in label_dict:
            f.write("{},{}\n".format(row, label_dict[row]))

    return label_dict

def load_labels():

    label_dict = {}

    with open("label.csv","r") as f:
        for row in f:
            key, val = row.strip().split(",")
            label_dict[key] = int(val)

    return label_dict

def load_data(data_type):

    filenames, images, labels = [], [], []

    walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))

    for root, dirs, files in walk:
        filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]

    # Shuffle files

    # Read, resize, and reshape images
    images = []
    for file in filenames:
        img = cv2.imread(file)
        img = cv2.resize(img, (32,32))
        images.append(img.astype(np.float32) / 255.0)
    images = np.asarray(images)

    label_dict = load_labels()

    for filename in filenames:
        label = np.zeros(len(label_dict))
        for k, v in label_dict.items():
           if k in filename:
                label[v] = 1.
    labels = np.asarray(labels)

    return images, labels

if __name__=="__main__":


    imgs, labels = load_data("train")
    print(imgs.shape, len(labels))

    imgs, labels = load_data("test")
    print(imgs.shape, len(labels))

学習用スクリプト実行 on 学習用PC


from load_face_image import *

import tensorflow as tf
import numpy as np

def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

def max_pool_3x3(x):
    return tf.nn.max_pool(x, ksize=[1, 3, 3, 1], strides=[1, 3, 3, 1], padding='SAME')

def batch_norm_wrapper(inputs, phase_train=None, decay=0.99):
    epsilon = 1e-5
    out_dim = inputs.get_shape()[-1]
    scale = tf.Variable(tf.ones([out_dim]))
    beta = tf.Variable(tf.zeros([out_dim]))
    pop_mean = tf.Variable(tf.zeros([out_dim]), trainable=False)
    pop_var = tf.Variable(tf.ones([out_dim]), trainable=False)
    if phase_train == None:
        return tf.nn.batch_normalization(inputs, pop_mean, pop_var, beta, scale, epsilon)
    rank = len(inputs.get_shape())
    axes = list(range(rank - 1))
    batch_mean, batch_var = tf.nn.moments(inputs, axes)
    ema = tf.train.ExponentialMovingAverage(decay=decay)

    def update():
        ema_apply_op = ema.apply([batch_mean, batch_var])
        with tf.control_dependencies([ema_apply_op]):
            return tf.nn.batch_normalization(inputs, tf.identity(batch_mean), tf.identity(batch_var), beta, scale,

    def average():
        train_mean = pop_mean.assign(ema.average(batch_mean))
        train_var = pop_var.assign(ema.average(batch_var))
        with tf.control_dependencies([train_mean, train_var]):
            return tf.nn.batch_normalization(inputs, train_mean, train_var, beta, scale, epsilon)

    return tf.cond(phase_train, update, average)

def train(fine_tune, all_x, all_y):

    with tf.Graph().as_default():

        phase_train = tf.placeholder(tf.bool)

        x = tf.placeholder('float', [None, 32, 32, 3], name="input")
        y = tf.placeholder('float', [None, len(all_y[0])])

        keep_prob = tf.placeholder(tf.float32)

        x_image = tf.reshape(x, [-1, 32, 32, 3])
        W_conv1 = weight_variable([5, 5, 3, 32])
        b_conv1 = bias_variable([32])
        h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

        # Pooling layer
        h_pool1 = max_pool_2x2(h_conv1)

        # Convolution layer
        W_conv2 = weight_variable([5, 5, 32, 64])
        b_conv2 = bias_variable([64])
        h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

        # Pooling layer
        h_pool2 = max_pool_2x2(h_conv2)

        # Full connected layer
        W_fc1 = weight_variable([8 * 8 * 64, 1024])
        b_fc1 = bias_variable([1024])
        h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
        h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

        # Dropout
        h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

        # Full connected layer
        W_fc2 = weight_variable([1024, len(label_dict)])
        b_fc2 = bias_variable([len(label_dict)])

        y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")

        cross_entropy = -tf.reduce_sum(y * tf.log(y_conv))
        train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
        correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y, 1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        with tf.Session() as sess:

            saver = tf.train.Saver()

            if fine_tune:
                last_model = "./model/cnn-model-99"
                print("load " + last_model)
                saver.restore(sess, last_model)

            all_imgs = np.shape(all_x)[0]

            train_indices = np.random.choice(all_imgs, round(all_imgs * 0.9), replace=False)
            test_indices = np.array(list(set(range(all_imgs)) - set(train_indices)))

            x_train = all_x[train_indices]
            x_test = all_x[test_indices]
            y_train = all_y[train_indices]
            y_test = all_y[test_indices]

            num_epoch = 100
            num_data = x_train.shape[0]
            print(num_data, x_train.shape)

            batch_size = 32
            for epoch in range(num_epoch):
                for idx in range(0, num_data, batch_size):
                    rand_index = np.random.choice(num_data, size=batch_size)
                    batch_x = x_train[rand_index]
                    batch_y = y_train[rand_index]
                    _, loss = sess.run(
                        [train_step, cross_entropy],
                        feed_dict={phase_train: True, x: batch_x, y: batch_y, keep_prob: 0.5})
                    print("epoch %d, images %d, loss %g" % (epoch, idx, loss / batch_size))
                test_accuracy, test_loss = sess.run(
                    [accuracy, cross_entropy],
                    feed_dict={phase_train: False, x: x_test, y: y_test, keep_prob: 1.0})
                print("epoch %d, test accuracy %g" % (epoch, test_accuracy))

            saver.save(sess, "./model/" + 'cnn-model', global_step=epoch)

if __name__=="__main__":

    label_dict = make_labels()

    train_images, train_labels = load_data('train')
    print("train_images", len(train_images))
    print(train_images.shape, train_labels.shape)

    train(False, train_images.astype(np.float16), train_labels)

学習データの変換 on 学習用PC


import tensorflow as tf
from train import weight_variable, bias_variable
from train import conv2d, max_pool_2x2, max_pool_3x3
from train import batch_norm_wrapper

def predict(input):

    x_image = input
    label_dict = make_labels()

    W_conv1 = weight_variable([5, 5, 3, 32])
    b_conv1 = bias_variable([32])
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    # Pooling layer
    h_pool1 = max_pool_2x2(h_conv1)

    # Convolution layer
    W_conv2 = weight_variable([5, 5, 32, 64])
    b_conv2 = bias_variable([64])
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    # Pooling layer
    h_pool2 = max_pool_2x2(h_conv2)

    # Full connected layer
    W_fc1 = weight_variable([8 * 8 * 64, 1024])
    b_fc1 = bias_variable([1024])
    h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

    # Dropout
    keep_prob = 1.0
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    # Full connected layer
    W_fc2 = weight_variable([1024, len(label_dict)])
    b_fc2 = bias_variable([len(label_dict)])

    output = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")

    return output

def run(name):
    with tf.Graph().as_default():
        input = tf.placeholder('float', [1, 32, 32, 3], name="input")
        output = predict(input)

        with tf.Session() as sess:
            saver = tf.train.Saver(tf.global_variables())
            last_model = "./model/cnn-model-99"
            saver.restore(sess, last_model)
            saver.save(sess, name)



python create_pred_model.py

モデルのコンパイル on 識別用Raspi

学習結果の保存されているmodelフォルダをRaspberry Piにコピーする。

コピーした後、Raspberry Piで以下のコマンドを実行。

cd model
mvNCCompile -s 12 pred_model.meta -in=input -on=output

識別テスト on 識別用Raspi

収集した顔画像のうちtestフォルダに入っているものをRaspberry Piのfacesフォルダにコピー。


from load_face_image import *

import mvnc.mvncapi as mvnc
import numpy as np

def predict(input, labels):
    devices = mvnc.EnumerateDevices()
    device = mvnc.Device(devices[0])

    with open('./model/graph', 'rb') as f:
        blob = f.read()
    graph = device.AllocateGraph(blob)

    for i in range(len(input)):
        graph.LoadTensor(input[i], 'user object')
        output, userobj = graph.GetResult()
        print(np.argmax(output), labels[i])


    return output

if __name__ == "__main__":

    test_images, test_labels = load_data('test')
    print("test_images", len(test_images))

    labels = []
    for label in test_labels:

    input = test_images
    predict(input.astype(np.float16), labels)

以下のコマンドをRaspberry Piで実行。

python3 prediction.py



