はじめに
とあるイベントで出展した仕組みに興味を持つ人が多かったので、作り方を以下に整理し、公開します。
材料
- Raspberry Pi 3 model B
- USBカメラ
- MacBook Air
Raspberry Piの設定
OSのインストールは終わっている状態とします。
MJPE-streamer
以下のコマンドでインストール。
cd ~/
sudo apt-get install -y cmake libjpeg8-dev build-essential git
git clone https://github.com/jacksonliam/mjpg-streamer.git
cd mjpg-streamer/mjpg-streamer-experimental
make
以下のコマンドを実行。
./start.sh
同じネットワーク上の別のPCからWebブラウザでアクセスして動作確認。
http://[RaspberryPiのIPアドレス]:8080/?action=stream
カメラの画像が表示されればOK.
起動設定
start.shに以下の行を追加。
cd /home/pi/mjpg-streamer/mjpg-streamer-experimental
rc.localに以下の行を追加。
/home/pi/mjpg-streamer/mjpg-streamer-experimental/start.sh &
Macの環境構築
手前味噌ですが、以下のURLに従ってMacの環境構築を行います。
Macで深層学習の環境をさくっと作る手順 with TensorFlow and OpenCV
Kerasをインストール
pip install keras
スクリプトを設置するフォルダに移動して以下のコマンドによりデータ用フォルダを作成。
mkdir faces
cd faces
mkdir train
mkdir test
顔画像収集スクリプト
Macのカメラを使って20個の顔画像を収集し、19個を学習用、1個をテスト用に分けて保存。
a001_make_face_data.py
# !/usr/bin/env python
import os
import sys
import cv2
import time
from datetime import datetime as dt
if len(sys.argv) < 2:
print("Usage: a001_make_face_data.py [name]")
exit()
name = sys.argv[-1]
print(name)
cascade_file = "haarcascade_frontalface_alt2.xml"
cascade = cv2.CascadeClassifier(cascade_file)
cnt_max = 21
cnt_face = 0
faces = []
DEVICE_ID = 0
cap = cv2.VideoCapture(DEVICE_ID)
time.sleep(1)
end_flag, c_frame = cap.read()
height, width, channels = c_frame.shape
while cnt_face < cnt_max:
img = c_frame
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
face_list = cascade.detectMultiScale(img_gray, minSize=(100, 100))
for (pos_x, pos_y, w, h) in face_list:
img_face = img[pos_y:pos_y+h, pos_x:pos_x+w]
img_face = cv2.resize(img_face, (100, 100))
faces.append(img_face)
if len(face_list) > 0:
cnt_face += 1
print("\r", cnt_face, end="")
time.sleep(1)
end_flag, c_frame = cap.read()
cap.release()
print(len(faces))
num = 0
tdatetime = dt.now()
tstr = tdatetime.strftime('%Y%m%d%H%M')
path = '{}/faces/train/{}'.format(os.getcwd(), name)
print("学習データ", path)
os.makedirs(path)
for face in faces[:-1]:
filename = '{}-{}.jpg'.format(tstr, num)
print("\t", filename)
cv2.imwrite('{}/{}'.format(path, filename), face)
num += 1
path = '{}/faces/test/{}'.format(os.getcwd(), name)
print("テストデータ", path)
os.makedirs(path)
face = faces[-1]
filename = '{}-{}.jpg'.format(tstr, num)
print("\t", filename)
cv2.imwrite('{}/{}'.format(path, filename), face)
以下のコマンドで顔画像を収集。
chmod a+x a001_make_face_data.py
./a001_make_face_data.py [name]
顔画像学習用スクリプト
取得した顔画像を学習します。
a002_train_face_data.py
# !/usr/bin/env python
import os
import random
import cv2
import numpy as np
import pandas as pd
import gc
import keras
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.utils import np_utils
path = "./faces/train"
dirs = os.listdir(path)
dirs = [f for f in dirs if os.path.isdir(os.path.join(path, f))]
label_dict = {}
i = 0
for dirname in dirs:
label_dict[dirname] = i
i += 1
def load_data(data_type):
filenames, images, labels = [], [], []
walk = filter(lambda _: not len(_[1]) and data_type in _[0], os.walk('faces'))
for root, dirs, files in walk:
filenames += ['{}/{}'.format(root, _) for _ in files if not _.startswith('.')]
# Shuffle files
random.shuffle(filenames)
# Read, resize, and reshape images
images = []
for file in filenames:
img = cv2.imread(file)
img = cv2.resize(img, (32,32))
images.append(img.astype(np.float32) / 255.0)
images = np.asarray(images)
for filename in filenames:
label = np.zeros(len(label_dict))
for k, v in label_dict.items():
if k in filename:
label[v] = 1.
labels.append(label)
labels = np.asarray(labels)
return images, labels
def make_model(train_shape, label_shape):
model = Sequential()
model.add(Conv2D(32, (3, 3), padding="same", input_shape=train_shape[1:]))
model.add(Activation('relu'))
model.add(Conv2D(32, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(label_shape[1]))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model.summary()
return model
accuracys = []
train_images, train_labels = load_data('train')
test_images, test_labels = load_data('test')
print("train_images", len(train_images))
print("test_images", len(test_images))
print(train_images.shape, train_labels.shape)
model = make_model(train_images.shape, train_labels.shape)
model.fit(train_images, train_labels, batch_size=32, epochs=10)
# modelのテスト
score = model.evaluate(test_images, test_labels)
print('loss=', score[0])
print('accuracy=', score[1])
model_json_str = model.to_json()
open('./model_tf/face-model.json', 'w').write(model_json_str)
hdf5_file = "./model_tf/face-model.hdf5"
model.save_weights(hdf5_file)
gc.collect()
以下のコマンドで学習。
chmod a+x a002_train_face_data.py
./a002_train_face_data.py
顔識別用スクリプト
以下のスクリプトを作成。
a003_detect_face_data.py
# !/usr/bin/env python
import os
import sys
import cv2
import random
import numpy as np
import requests
from keras.models import model_from_json
import h5py
if len(sys.argv) < 2:
print("Usage: a003_detect_face_data.py [IP Address]")
exit()
ip = sys.argv[-1]
print(ip)
model = model_from_json(open('./model_tf/face-model.json').read())
model.load_weights('./model_tf/face-model.hdf5')
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model.summary();
path = "./faces/train"
dirs = os.listdir(path)
dirs = [f for f in dirs if os.path.isdir(os.path.join(path, f))]
label_dict = {}
names = dirs
if __name__ == '__main__':
# 定数定義
ESC_KEY = 27 # Escキー
INTERVAL= 33 # 待ち時間
WINDOW_NAME = "detect"
DEVICE_ID = 0
# 分類器の指定
cascade_file = "haarcascade_frontalface_alt2.xml"
cascade = cv2.CascadeClassifier(cascade_file)
img_url = 'http://{}:8080/?action=snapshot'.format(ip)
# ウィンドウの準備
cv2.namedWindow(WINDOW_NAME)
font = cv2.FONT_HERSHEY_DUPLEX
font_size = 1.0
# 変換処理ループ
while True:
req = requests.get(img_url)
img_buf = np.fromstring(req.content, dtype='uint8')
img = cv2.imdecode(img_buf, 1)
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
face_list = cascade.detectMultiScale(img_gray, minSize=(50, 50))
for (pos_x, pos_y, w, h) in face_list:
img_face = img[pos_y:pos_y+h, pos_x:pos_x+w]
img_face = cv2.resize(img_face, (32, 32))
test_images = []
test_images.append(img_face / 255.0)
test_images = np.asarray(test_images)
results = model.predict(test_images)
text = names[np.argmax(results[0])]
color = (0, 0, 225)
pen_w = 2
cv2.putText(img,text,(pos_x,pos_y - 10), font, font_size, color)
cv2.rectangle(img, (pos_x, pos_y), (pos_x+w, pos_y+h), color, thickness = pen_w)
# フレーム表示
cv2.imshow(WINDOW_NAME, img)
# Escキーで終了
key = cv2.waitKey(INTERVAL)
if key == ESC_KEY:
break
cv2.destroyAllWindows()
以下のコマンドを実行。
chmod a+x a003_detect_face_data.py
./a003_detect_face_data.py [RaspberryPiのIPアドレス] &
複数のRaspberryPiがあれば、上記コマンドの2行目を繰り返し実行することで、複数カメラで監視を実行可能。
さいごに
これで識別した結果をLINEやMessengerなどで通知する仕組みを作れば、色々なところで利用できると考えます。