はじめに
小売店舗の経営者さんから「監視カメラの映像を使って来店客の分析をしたいけど、何か方法ないかな?」と質問があり、詳しく聞いてみるとカメラ画像とレジデータの突合をし、商品を購入されたお客さんの性別や年齢と購入されたものを分析して、仕入れの計画を立てる参考資料にしたいとのこと。
ということで、手始めに監視カメラのことを調べ、監視カメラの画像をパソコンに取り込めるようにし、カメラの画像からレジでお会計をされた来店客を抽出して、時間ごとに表示することで、レジの記録と突合しやすくなるのかなと考え、試してみたら意外と簡単にできたので、以下に手順をまとめます。
なお、監視カメラはレジカウンターの斜め後ろ奥に設置され、レジカウンター越しに店内を映しています。
やったこと
- 監視カメラの画像をパソコンに取り込む
- 監視カメラの画像から人を抽出
- 抽出した人の状態を識別
動作環境
機器
- 監視カメラ(CVI方式)
- AHD/TVI/CVI to HDMI変換器
- HDMI to USBキャプチャ
- パソコン(Ubuntuをインストール)
ライブラリなど
ただし、ここで紹介する方法はTensorFlow 2.xでは動作しません。
概要図
監視カメラの画像の取込
以下のスクリプトを実行すると、tmpフォルダにカメラの画像が順次保存されます。
ちなみにtmpフォルダに保存される画像は書いては消してを繰り返すので、RAMディスクを作成して利用することを強く推奨します。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import cv2
from datetime import datetime as dt
import os
import threading
import logging
import sys
from time import sleep
# カレントディレクトリの移動
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# 営業時間にあわせてプロセスを止める時刻を指定(24時間表記)
OPERATING_HOUR_SINCE = 20
# ログ出力設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
get_handler = logging.FileHandler(
"{}/logs/{}-fetch_image.log".format(IMAGES_DIR, dt.now().strftime('%Y%m%d')))
logger.addHandler(get_handler)
formatter = logging.Formatter('[%(levelname)s]%(asctime)s: %(message)s')
get_handler.setFormatter(formatter)
os.makedirs('./tmp', exist_ok=True)
# カメラ画像の取得
def fetch_camera_image():
# カメラの設定
DEVICE_ID = 0
cap = cv2.VideoCapture(DEVICE_ID)
cap.set(3, 1280)
cap.set(4, 720)
# 初期フレームの読込
end_flag, c_frame = cap.read()
height, width, channels = c_frame.shape
while end_flag == True:
# フレーム画像の取得と保存
img = c_frame
tdatetime = dt.now()
tstr = tdatetime.strftime('%Y%m%d%H%M%S%f')
filename = './tmp/{}.jpg'.format(tstr)
cv2.imwrite(filename, c_frame)
# 次のフレーム読み込み
sleep(3.0)
end_flag, c_frame = cap.read()
if dt.now().hour == OPERATING_HOUR_SINCE:
cap.release()
logger.info('exit fetch process')
sys.exit()
p_camera = threading.Thread(target=fetch_camera_image)
p_camera.start()
logger.info('start fetch process')
監視カメラの画像から人を抽出
以下のスクリプトを実行すると、tmpフォルダの中にある画像からYOLOを使って人の抽出を行い、抽出された画像をdetectedの日付フォルダに保存します。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import cv2
import os
import threading
from glob import glob
import logging
import sys
import tensorflow as tf
import keras
from darkflow.net.build import TFNet
# カレントディレクトリの移動
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# 営業時間にあわせてプロセスを止める時刻を指定(24時間表記)
OPERATING_HOUR_SINCE = 21
# 画像保存の基本ディレクトリ
IMAGES_DIR = "images"
# ログ出力設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
get_handler = logging.FileHandler(
"{}/logs/{}-detect.log".format(IMAGES_DIR, dt.now().strftime('%Y%m%d')))
logger.addHandler(get_handler)
formatter = logging.Formatter('[%(levelname)s]%(asctime)s: %(message)s')
get_handler.setFormatter(formatter)
# Yolo
options = {
"model": "cfg/yolo.cfg",
"load": "bin/yolo.weights",
"threshold": 0.4,
"gpu": 0.0
}
tfnet = TFNet(options)
def detect_person():
while True:
# ファイル一覧の取得
files = glob("./tmp/*.jpg")
files.sort()
for file in files[:-1]:
# 保存先の設定
tdatetime = dt.now()
dstr = tdatetime.strftime('%Y%m%d')
detected_pics_dir = '{}/detected/{}'.format(IMAGES_DIR, dstr)
os.makedirs(detected_faces_dir, exist_ok=True)
# 画像読込
try:
image = cv2.imread(file)
img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
print(file)
except Exception:
continue
# 物体検出
result = tfnet.return_predict(image)
num = 0
for rs in result:
# 人を検出した場合に切り抜いて保存
if rs["label"] == "person":
top = rs["topleft"]["y"]
left = rs["topleft"]["x"]
bottom = rs["bottomright"]["y"]
right = rs["bottomright"]["x"]
img = image[top:bottom, left:right]
num += 1
filename = '{}-{}.jpg'.format(tstr, num)
cv2.imwrite('{}/{}'.format(detected_pics_dir, filename), img)
if num > 0:
print(num)
logger.info('detected.')
# 元のファイルを削除
os.remove(file)
if dt.now().hour == OPERATING_HOUR_SINCE:
logger.info('exit detect process')
sys.exit()
p_detect = threading.Thread(target=detect_person)
p_detect.start()
logger.info('start detect process')
抽出した人の状態を選別
抽出した人の画像を確認し、状態ごとにimages/train以下のフォルダに手作業で選別します。
フォルダ名 | 内容 |
---|---|
0-店内 | 店内で商品を確認している人 |
1-カウンター | カウンター内で作業をしている人 |
2-会計 | レジカウンターにて支払いをしている人 |
今回は、それぞれ2000〜4000個程度の画像を使用しました。
人の状態を学習
選別された写真をもとに抽出画像における人の状態を学習します。
なお、実機で動作させると他の作業ができないくらい遅くなったので、実際の学習はColaboratoryで実行しています。
#!/usr/bin/env python3
# coding: utf-8
import glob
import os
import cv2
import random
import numpy as np
from sklearn import model_selection
import tensorflow as tf
import keras
from keras.utils.np_utils import to_categorical
from keras.layers import Activation, Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from keras.models import Sequential
# カレントディレクトリの移動
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# ファイル読込準備
base_dir = "./images/train"
# 簡易ラベル名の取得
labels = []
for i in range(3):
labels.append("{}-".format(i))
# 学習データの数を確認
n = []
for l in labels:
files = glob.glob("{}/{}*/*.jpg".format(base_dir, l))
n.append(len(files))
print("{} {}".format(l, n[-1])
# 学習データの作成
imgX = []
y = []
k = 0
for l in labels:
files = glob.glob("{}/{}*/*.jpg".format(base_dir, l))
files.sort()
j = int(min(n) * 1.5)
if j > len(files):
j = len(files)
files = random.sample(files, j)
for f in files:
img = cv2.imread(f)
img = cv2.resize(img, (100, 100))
imgX.append(img)
y.append(k)
k += 1
X = np.array(imgX)
X = X / 255
X.shape
# 学習用、検証用、テスト用に分割
test_size = 0.2
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size, random_state=42)
X_valid, X_test, y_valid, y_test = model_selection.train_test_split(X_test, y_test, test_size=.5, random_state=42)
y_train = to_categorical(y_train)
y_valid = to_categorical(y_valid)
y_test = to_categorical(y_test)
# 学習モデル作成
input_shape = X[0].shape
model = Sequential()
model.add(Conv2D(
input_shape=input_shape, filters=64, kernel_size=(5, 5),
strides=(1, 1), padding="same", activation='relu'))
model.add(MaxPooling2D(pool_size=(4, 4)))
model.add(Conv2D(
filters=32, kernel_size=(5, 5),
strides=(1, 1), padding="same", activation='relu'))
model.add(Conv2D(
filters=32, kernel_size=(5, 5),
strides=(1, 1), padding="same", activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(
filters=16, kernel_size=(5, 5),
strides=(1, 1), padding="same", activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1024, activation='sigmoid'))
model.add(Dense(2048, activation='sigmoid'))
model.add(Dense(len(labels), activation='softmax'))
print(model.summary())
model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])
# 学習
history = model.fit(
X_train, y_train, batch_size=200, epochs=60,
verbose=1, shuffle=True,
validation_data=(X_valid, y_valid))
# 汎化制度の評価・表示
score = model.evaluate(X_test, y_test, batch_size=32, verbose=0)
print('validation loss:{0[0]}\nvalidation accuracy:{0[1]}'.format(score))
# 学習結果の保存
model.save("models/status.h5")
学習結果
実行すると、概ね以下のように表示されます。
Model: "sequential_1"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d_1 (Conv2D) (None, 100, 100, 64) 4864
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 25, 25, 64) 0
_________________________________________________________________
conv2d_2 (Conv2D) (None, 25, 25, 32) 51232
_________________________________________________________________
conv2d_3 (Conv2D) (None, 25, 25, 32) 25632
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 12, 12, 32) 0
_________________________________________________________________
conv2d_4 (Conv2D) (None, 12, 12, 16) 12816
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 6, 6, 16) 0
_________________________________________________________________
flatten_1 (Flatten) (None, 576) 0
_________________________________________________________________
dense_1 (Dense) (None, 1024) 590848
_________________________________________________________________
dense_2 (Dense) (None, 2048) 2099200
_________________________________________________________________
dense_3 (Dense) (None, 3) 6147
=================================================================
Total params: 2,790,739
Trainable params: 2,790,739
Non-trainable params: 0
_________________________________________________________________
Train on 9365 samples, validate on 1171 samples
Epoch 1/60
9365/9365 [==============================] - 198s 21ms/step - loss: 2.4131 - accuracy: 0.3665 - val_loss: 1.1107 - val_accuracy: 0.4073
Epoch 2/60
9365/9365 [==============================] - 198s 21ms/step - loss: 1.1838 - accuracy: 0.4173 - val_loss: 1.0917 - val_accuracy: 0.4065
*** 中略 ***
Epoch 59/60
9365/9365 [==============================] - 218s 23ms/step - loss: 0.0364 - accuracy: 0.9887 - val_loss: 0.2565 - val_accuracy: 0.9556
Epoch 60/60
9365/9365 [==============================] - 221s 24ms/step - loss: 0.0256 - accuracy: 0.9910 - val_loss: 0.3123 - val_accuracy: 0.9530
validation loss:0.24554101437335019
validation accuracy:0.9513236284255981
サンプルコードにはありませんが、学習結果を可視化すると以下のようになり、それなりに学習が進んでいることを確認。
人の状態を自動選別して保存
以下のスクリプトを実行すると人の状態を識別し、predicted以下のフォルダに画像を振り分けます。
#!/usr/bin/env python3
# coding: utf-8
from glob import glob
import cv2
import os
from datetime import datetime as dt
import numpy as np
import keras
import threading
# カレントディレクトリの移動
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# 営業時間にあわせてプロセスを止める時刻を指定(24時間表記)
OPERATING_HOUR_SINCE = 21
# ログ出力設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
get_handler = logging.FileHandler(
"{}/logs/{}-predict.log".format(IMAGES_DIR, dt.now().strftime('%Y%m%d')))
logger.addHandler(get_handler)
formatter = logging.Formatter('[%(levelname)s]%(asctime)s: %(message)s')
get_handler.setFormatter(formatter)
# 保存先の準備
dirnames = ["0-店内", "1-カウンター", "2-会計"]
for dirname in dirnames:
os.makedirs("./images/predicted/" + dirname, exist_ok=True)
os.makedirs("./images/predicted/unidentified", exist_ok=True)
# 識別用スクリプト
def predict():
# 学習済モデルの読込
model = keras.models.load_model("models/status.h5")
while True:
# 読込元フォルダの設定
tdatetime = dt.now()
dstr = tdatetime.strftime('%Y%m%d')
dirname = "./images/detected/{}/".format(dstr)
# ファイル一覧の取得
files = glob(dirname + "*.jpg")
files.sort()
# 画像データの読込
imgs = []
imgX = []
i = 1
for file in files[:-1]:
image = cv2.imread(file)
img = cv2.resize(image, (100, 100))
imgs.append(image)
imgX.append(img)
# 分類用データの作成
X = np.array(imgX)
X = X / 255
# 予測
pred = model.predict(X, batch_size=1)
for i in range(len(files)):
# ファイル名の取得
file = files[i].split("/")[-1]
# 予測結果の取得と確率の取得
j = np.argmax(pred[i])
k = pred[i][j]
# 予測確率が一定数以上であれば振り分け
if k > 0.95:
newfile = "{}/{}".format(dirnames[j], file)
# 会計の状態を検出時の処理を記述
# 予測確率が低い場合は別のフォルダに保存
else:
newfile = "unidentified/{}".format(file)
# ファイルの移動
shutil.move(files[i], "./images/predicted/" + newfile)
if dt.now().hour == OPERATING_HOUR_SINCE:
logger.info('exit predict process')
sys.exit()
p_predict = threading.Thread(target=predict)
p_predict.start()
logger.info('start predict process')
できた!
今回利用したカメラの画像には一般の人が写っていることから、ここに画像付きで紹介できないのがとても残念です。
ただ、店内にいる人、会計をしている人がパッと分類されてフォルダに分けられていくのがすごく心地良い。
この心地良さをみんなに感じてもらいたいなと思っています。
応用先の検討
色々ありすぎて、次に何しようか考えるのが楽しすぎる!!