はじめに
Movidius™ Neural Compute Stickで自作CNNを使った識別を行う手順を整理してみます。
ここでは、以前作成した記事にある顔識別のデータを流用して速度と精度を確認します。
複数のRaspberry PiとMacを使って入室監視システムを作る
また、この記事は以下の記事をかなり参考にさせて頂きました。
Movidius NCS + Raspberrypi で自作DNNモデルを動かす
準備
項目 | 備考 |
---|---|
学習用PC | MacBook Air |
識別用Raspi | Raspberry Pi 3 |
外部計算機器 | Movidius™ Neural Compute Stick |
プログラミング言語 | Python 3 |
深層学習用ライブラリ | Tensorflow |
必要なソフトウェアやMovidius™ Neural Compute Stick SDKのインストール手順については参考サイトをご参照下さい。
顔画像の取得
以下の記事にあるスクリプトを使って顔画像を収集します。
複数のRaspberry PiとMacを使って入室監視システムを作る
学習用ライブラリ作成 on 学習用PC and 識別用Raspi
事前に以下のスクリプトを作成し、流用できるように備える。
import os
import random
import cv2
import numpy as np
def make_labels():
path = "./faces/train"
dirs = os.listdir(path)
dirs = [f for f in dirs if os.path.isdir(os.path.join(path, f))]
label_dict = {}
i = 0
for dirname in dirs:
label_dict[dirname] = i
i += 1
with open("label.csv","w") as f:
for row in label_dict:
f.write("{},{}\n".format(row, label_dict[row]))
return label_dict
def load_labels():
label_dict = {}
with open("label.csv","r") as f:
for row in f:
key, val = row.strip().split(",")
label_dict[key] = int(val)
return label_dict
def load_data(data_type):
filenames, images, labels = [], [], []
walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))
for root, dirs, files in walk:
filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]
# Shuffle files
random.shuffle(filenames)
# Read, resize, and reshape images
images = []
for file in filenames:
img = cv2.imread(file)
img = cv2.resize(img, (32,32))
images.append(img.astype(np.float32) / 255.0)
images = np.asarray(images)
label_dict = load_labels()
for filename in filenames:
label = np.zeros(len(label_dict))
for k, v in label_dict.items():
if k in filename:
label[v] = 1.
labels.append(label)
labels = np.asarray(labels)
return images, labels
if __name__=="__main__":
make_labels()
print(load_labels())
imgs, labels = load_data("train")
print(imgs.shape, len(labels))
imgs, labels = load_data("test")
print(imgs.shape, len(labels))
学習用スクリプト実行 on 学習用PC
以下のスクリプトを学習用PCで実行。
from load_face_image import *
import tensorflow as tf
import numpy as np
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
def max_pool_3x3(x):
return tf.nn.max_pool(x, ksize=[1, 3, 3, 1], strides=[1, 3, 3, 1], padding='SAME')
def batch_norm_wrapper(inputs, phase_train=None, decay=0.99):
epsilon = 1e-5
out_dim = inputs.get_shape()[-1]
scale = tf.Variable(tf.ones([out_dim]))
beta = tf.Variable(tf.zeros([out_dim]))
pop_mean = tf.Variable(tf.zeros([out_dim]), trainable=False)
pop_var = tf.Variable(tf.ones([out_dim]), trainable=False)
if phase_train == None:
return tf.nn.batch_normalization(inputs, pop_mean, pop_var, beta, scale, epsilon)
rank = len(inputs.get_shape())
axes = list(range(rank - 1))
batch_mean, batch_var = tf.nn.moments(inputs, axes)
ema = tf.train.ExponentialMovingAverage(decay=decay)
def update():
ema_apply_op = ema.apply([batch_mean, batch_var])
with tf.control_dependencies([ema_apply_op]):
return tf.nn.batch_normalization(inputs, tf.identity(batch_mean), tf.identity(batch_var), beta, scale,
epsilon)
def average():
train_mean = pop_mean.assign(ema.average(batch_mean))
train_var = pop_var.assign(ema.average(batch_var))
with tf.control_dependencies([train_mean, train_var]):
return tf.nn.batch_normalization(inputs, train_mean, train_var, beta, scale, epsilon)
return tf.cond(phase_train, update, average)
def train(fine_tune, all_x, all_y):
with tf.Graph().as_default():
phase_train = tf.placeholder(tf.bool)
x = tf.placeholder('float', [None, 32, 32, 3], name="input")
y = tf.placeholder('float', [None, len(all_y[0])])
keep_prob = tf.placeholder(tf.float32)
x_image = tf.reshape(x, [-1, 32, 32, 3])
W_conv1 = weight_variable([5, 5, 3, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
# Pooling layer
h_pool1 = max_pool_2x2(h_conv1)
# Convolution layer
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
# Pooling layer
h_pool2 = max_pool_2x2(h_conv2)
# Full connected layer
W_fc1 = weight_variable([8 * 8 * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
# Dropout
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
# Full connected layer
W_fc2 = weight_variable([1024, len(label_dict)])
b_fc2 = bias_variable([len(label_dict)])
y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")
cross_entropy = -tf.reduce_sum(y * tf.log(y_conv))
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
if fine_tune:
last_model = "./model/cnn-model-99"
print("load " + last_model)
saver.restore(sess, last_model)
all_imgs = np.shape(all_x)[0]
train_indices = np.random.choice(all_imgs, round(all_imgs * 0.9), replace=False)
test_indices = np.array(list(set(range(all_imgs)) - set(train_indices)))
x_train = all_x[train_indices]
x_test = all_x[test_indices]
y_train = all_y[train_indices]
y_test = all_y[test_indices]
num_epoch = 100
num_data = x_train.shape[0]
print(num_data, x_train.shape)
batch_size = 32
for epoch in range(num_epoch):
for idx in range(0, num_data, batch_size):
rand_index = np.random.choice(num_data, size=batch_size)
batch_x = x_train[rand_index]
batch_y = y_train[rand_index]
_, loss = sess.run(
[train_step, cross_entropy],
feed_dict={phase_train: True, x: batch_x, y: batch_y, keep_prob: 0.5})
print("epoch %d, images %d, loss %g" % (epoch, idx, loss / batch_size))
test_accuracy, test_loss = sess.run(
[accuracy, cross_entropy],
feed_dict={phase_train: False, x: x_test, y: y_test, keep_prob: 1.0})
print("epoch %d, test accuracy %g" % (epoch, test_accuracy))
saver.save(sess, "./model/" + 'cnn-model', global_step=epoch)
if __name__=="__main__":
label_dict = make_labels()
train_images, train_labels = load_data('train')
print("train_images", len(train_images))
print(train_images.shape, train_labels.shape)
train(False, train_images.astype(np.float16), train_labels)
学習データの変換 on 学習用PC
以下のスクリプトにより、学習データをMovidiusライブラリで使用できるように変換。
import tensorflow as tf
from train import weight_variable, bias_variable
from train import conv2d, max_pool_2x2, max_pool_3x3
from train import batch_norm_wrapper
def predict(input):
x_image = input
label_dict = make_labels()
W_conv1 = weight_variable([5, 5, 3, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
# Pooling layer
h_pool1 = max_pool_2x2(h_conv1)
# Convolution layer
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
# Pooling layer
h_pool2 = max_pool_2x2(h_conv2)
# Full connected layer
W_fc1 = weight_variable([8 * 8 * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(h_pool2, [-1, 8 * 8 * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
# Dropout
keep_prob = 1.0
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
# Full connected layer
W_fc2 = weight_variable([1024, len(label_dict)])
b_fc2 = bias_variable([len(label_dict)])
output = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2, name="output")
return output
def run(name):
with tf.Graph().as_default():
input = tf.placeholder('float', [1, 32, 32, 3], name="input")
output = predict(input)
with tf.Session() as sess:
saver = tf.train.Saver(tf.global_variables())
last_model = "./model/cnn-model-99"
saver.restore(sess, last_model)
saver.save(sess, name)
run('./model/pred_model')
作成したら、以下のコマンドで実行。
python create_pred_model.py
モデルのコンパイル on 識別用Raspi
学習結果の保存されているmodelフォルダをRaspberry Piにコピーする。
コピーした後、Raspberry Piで以下のコマンドを実行。
cd model
mvNCCompile -s 12 pred_model.meta -in=input -on=output
識別テスト on 識別用Raspi
収集した顔画像のうちtestフォルダに入っているものをRaspberry Piのfacesフォルダにコピー。
また、事前に作成した「load_face_image.py」というスクリプトもコピーしておきます。
コピーしたら以下のスクリプトを作成。
from load_face_image import *
import mvnc.mvncapi as mvnc
import numpy as np
def predict(input, labels):
devices = mvnc.EnumerateDevices()
device = mvnc.Device(devices[0])
device.OpenDevice()
with open('./model/graph', 'rb') as f:
blob = f.read()
graph = device.AllocateGraph(blob)
for i in range(len(input)):
graph.LoadTensor(input[i], 'user object')
output, userobj = graph.GetResult()
#print(output)
print(np.argmax(output), labels[i])
graph.DeallocateGraph()
device.CloseDevice()
return output
if __name__ == "__main__":
test_images, test_labels = load_data('test')
print("test_images", len(test_images))
labels = []
for label in test_labels:
labels.append(np.argmax(label))
input = test_images
predict(input.astype(np.float16), labels)
以下のコマンドをRaspberry Piで実行。
python3 prediction.py
できた!