この記事は、「留守でも安心!異常検知システムを自作してみた」の後編です。
留守中の侵入者を検知する異常検知システムを作っていきます。
まだ前編を読んでない方は先に前編をご覧ください。
システム作成手順(後編)
後編では、異常検知モデルの作成とGUIアプリを作成します。
異常検知モデルの作成
異常検知モデルの作成は、コーディング無しで簡単に異常検知モデルを作成できるプラットフォーム「ADFI」を利用します。
無料プランでも、異常検知モデルの作成と性能検証、さらに、100回までのAPI利用が可能です。
異常検知モデルを作成するまでの詳しい操作手順は、過去記事を参照してください。
今回異常検知モデルは、ADFIの無料プランでMSPCの異常検知モデルを作成しました。
作成時のポイントを箇条書きにします。
- データセットは「PCA-MSPC」で作成する。
- 前編で取得した数十枚の正常画像(人が映っていない状態)の8割を「Normal Train image」でアップロードする。
- 正常画像の残りの2割を「Normal Test image」でアップロードする。
- 人間の身体の一部(腕とか)が映った画像をcamera.pyで数枚取得し、異常画像として「Abnormal Test image」にアップロードする。
- 「Create AI Model」ボタンを押してモデルを作成後、「AI MODEL SETTING」タブを開き、閾値を大きめ(下記例では「9」に設定しているが、「50」や「100」など、もっと大きい値の方が誤検知を減らせる)に設定して、「Save Threshold」ボタンで保存。その後「Test」ボタンを押して、性能を確認する。
API情報の確認
ADFIで作成した異常検知モデルはAPIで呼ぶことで利用することができます。
ADFIを終了する前に、「API」タブを開き、「API KEY」「AI Model ID」「Model Type」の値をコピーして、控えておいてください。
これらの情報を、後で作成するGUIアプリで入力します。
app.pyの作成
最後に、GUIアプリ本体であるapp.pyの作成です。
完成コードは下記の通りです。
import glob
import json
import os
import requests
import shutil
import threading
import time
import tkinter as tk
from io import BytesIO
from PIL import Image, ImageTk
# アプリケーション本体
def app_main():
global message_label, canvas, item, status, key_txtBox, id_txtBox, type_txtBox, dir1_txtBox, dir2_txtBox, api_count, interval_txtBox, max_txtBox
# 開始ボタンを押した時の処理
def start_click():
if not os.path.isdir(dir1_txtBox.get()):
message_label['text'] = '処理対象フォルダが存在しません'
message_label['background'] = 'red'
return
if not os.path.isdir(dir2_txtBox.get()):
message_label['text'] = '処理済みフォルダが存在しません'
message_label['background'] = 'red'
return
if dir1_txtBox.get() == dir2_txtBox.get():
message_label['text'] = '処理対象フォルダと処理済みフォルダのパスが同じです'
message_label['background'] = 'red'
return
status.set(True)
message_label['text'] = '実行中'
message_label['background'] = 'blue'
# 停止ボタンを押した時の処理
def stop_click():
status.set(False)
message_label['text'] = '停止中'
message_label['background'] = 'green'
canvas.delete('p1')
# アプリ終了ボタンを押した時の処理
def end_click():
canvas.delete('p1')
baseGround.destroy()
# メインウィンドウを作成
baseGround = tk.Tk()
# ウィンドウのサイズを設定
baseGround.geometry('1000x500')
# ウィンドウのタイトルを設定
baseGround.title('アプリ')
# API実行数の変数
api_count = tk.IntVar(value=0)
# 処理状態(Trueの場合、実行中)
status = tk.BooleanVar(value=False)
# メッセージ用ラベル
message_label = tk.Label(text='停止中', foreground='white', background='green')
message_label.place(x=0, y=0, width=1000)
# GUI上にボタンを配置
start_button = tk.Button(
baseGround, text = '処理開始', command=start_click).place(x=600, y=400)
stop_button = tk.Button(
baseGround, text = '処理停止', command=stop_click).place(x=700, y=400)
end_button = tk.Button(
baseGround, text = 'アプリ終了', command=end_click).place(x=860, y=400)
# API Key
# ラベル
key_label = tk.Label(text='API Key', foreground='black')
key_label.place(x=30, y=70)
# テキストボックス
key_txtBox = tk.Entry(width=40)
key_txtBox.place(x=200, y=70)
# AI Model ID
# ラベル
id_label = tk.Label(text='AI Model ID', foreground='black')
id_label.place(x=30, y=120)
# テキストボックス
id_txtBox = tk.Entry(width=40)
id_txtBox.place(x=200, y=120)
# Model Type
# ラベル
type_label = tk.Label(text='Model Type', foreground='black')
type_label.place(x=30, y=170)
# テキストボックス
type_txtBox = tk.Entry(width=40)
type_txtBox.place(x=200, y=170)
# 処理対象フォルダ
# ラベル
dir1_label = tk.Label(text='処理対象フォルダ', foreground='black')
dir1_label.place(x=30, y=220)
# テキストボックス
dir1_txtBox = tk.Entry(width=40)
dir1_txtBox.insert(0,'camera')
dir1_txtBox.place(x=200, y=220)
# 処理済みフォルダ
# ラベル
dir2_label = tk.Label(text='処理済みフォルダ', foreground='black')
dir2_label.place(x=30, y=270)
# テキストボックス
dir2_txtBox = tk.Entry(width=40)
dir2_txtBox.insert(0,'done')
dir2_txtBox.place(x=200, y=270)
# API送信数
# ラベル
send_label = tk.Label(text='API送信数:', foreground='black')
send_label.place(x=30, y=400)
# 数
sendnum_label = tk.Label(textvariable=api_count, foreground='black')
sendnum_label.place(x=140, y=400)
# 実行間隔
# ラベル
interval_label = tk.Label(text='実行間隔(秒)', foreground='black')
interval_label.place(x=300, y=400)
# テキストボックス
interval_txtBox = tk.Entry(width=10)
interval_txtBox.insert(0,'10')
interval_txtBox.place(x=450, y=400)
# API送信数の上限
# ラベル
max_label = tk.Label(text='API送信数の上限', foreground='black')
max_label.place(x=300, y=450)
# テキストボックス
max_txtBox = tk.Entry(width=10)
max_txtBox.insert(0,'100')
max_txtBox.place(x=450, y=450)
# キャンバス作成
canvas = tk.Canvas(baseGround, bg="#000000", height=240, width=320)
# キャンバス表示
canvas.place(x=650, y=70)
# 初期画像表示
img = tk.PhotoImage()
item = canvas.create_image(2, 2, image=img, anchor=tk.NW, tag='p1')
# 画面を表示し続ける
baseGround.mainloop()
# main
def main():
# ADFI(https://adfi.jp/ja)のAPIを利用
url = "https://us.adfi.karakurai.com/ap/api/apidata/"
#スレッドを立ててtkinterの画像表示を開始する
thread1 = threading.Thread(target=app_main)
thread1.start()
time.sleep(1)
max_count = 0
interval_second = 10
# 実行中の場合、画像をAPIに送信
while True:
# 実行間隔をセット
if interval_txtBox.get().isdecimal():
interval_second = int(interval_txtBox.get())
# API送信数の上限をセット
if max_txtBox.get().isdecimal():
max_count = int(max_txtBox.get())
# API送信数が上限未満の場合、処理を実行
if api_count.get() < max_count:
# ステータスが実行中の場合
if status.get() and dir1_txtBox.get() and dir2_txtBox.get():
# 処理対象フォルダ内の画像ファイルのリストを取得
images = glob.glob(dir1_txtBox.get() + "/*.png") + glob.glob(dir1_txtBox.get() + "/*.jp*g")
if len(images) > 0:
# 1枚目の画像を表示
img = Image.open(images[0])
resized_img = img.resize((320, 240))
tkimg = ImageTk.PhotoImage(resized_img)
item = canvas.create_image(2, 2, image=tkimg, anchor=tk.NW, tag='p1')
# ADFIのAPI呼び出し
if key_txtBox.get() and id_txtBox.get() and type_txtBox.get():
filename = images[0]
apikey = key_txtBox.get()
aimodel_id = id_txtBox.get()
model_type = type_txtBox.get()
# ADFIのAPIサンプルコードを流用
MAX_SIZE = 800
height = img.height
width = img.width
if (img.height > MAX_SIZE):
height = MAX_SIZE
if (img.width > MAX_SIZE):
width = MAX_SIZE
img = img.resize((width, height), Image.ANTIALIAS)
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
img_bytes = img_bytes.getvalue()
files = {"image_data": (filename, img_bytes, "image/png")}
data = {"apikey": apikey, "aimodel_id": aimodel_id, "model_type": model_type}
# APIリクエストを送信
response = requests.post(url, files=files, data=data)
# APIリクエストが成功した場合
if(response.status_code == 200):
result_json = response.json()
print("response:", result_json)
# 異常の場合、メッセージを表示
if result_json["result"] == 'Anomaly':
message_label['text'] = '実行中:異常を検知しました'
message_label['background'] = 'red'
# メールで通知する場合は、ここにメール送信処理を追加する
# 正常の場合、メッセージを表示
else:
message_label['text'] = '実行中:正常です'
message_label['background'] = 'blue'
# API送信数を増やす
api_count.set(api_count.get() + 1)
if dir1_txtBox.get() != dir2_txtBox.get():
# 画像を処理済みフォルダに移動
shutil.move(images[0], dir2_txtBox.get())
# APIリクエストが失敗した場合
else:
message_label['text'] = '実行中:APIリクエストが失敗しました'
message_label['background'] = 'red'
# API情報がない場合
else:
message_label['text'] = '実行中:API情報が不足しています'
message_label['background'] = 'red'
# 時間間隔分スリープ
time.sleep(interval_second)
# thread1の処理が終了した場合、While処理を終了
if threading.active_count() == 1:
thread1.join()
break
if __name__ == '__main__':
main()
上記コードには、メール送信する部分は含まれていないため、メール送信したい場合には、「# メールで通知する場合は、ここにメール送信処理を追加する」の行の後に、コードを追加してください。
また、必要なライブラリをインストールしていない場合は、下記コマンド等でインストールしてください。
pip install requests
pip install threading
pip install Pillow
app.pyを実行すると、GUIアプリが起動して、下記画面が表示されます。
「API KEY」「AI Model ID」「Model Type」には、先ほどADFIでコピーした値を入力してください。
「処理対象フォルダ」と「処理済みフォルダ」に任意のパスを入力してください。
ただし、「処理対象フォルダ」は、camera.pyの画像保存先のフォルダと同じになるようにしてください。
「実行間隔(秒)」は、APIに画像を送信する間隔(秒)です。整数値で入力してください。
「API送信数の上限」は、API送信数が指定した回数に達すると自動でAPI送信を停止します。ADFIの有料プランを使う時などに役立つと思います。
異常検知システムを起動してみる
GUIアプリの各項目に値を設定したら、さっそく使ってみましょう。
「処理開始」ボタンを押すと、app.pyが「処理対象フォルダ」内に画像があるか監視し始めます。
次に、GUIアプリは起動したまま、camera.pyを実行してください。
すると、カメラ画像が定期的に「処理対象フォルダ」に保存され、GUIアプリに表示されるようになります。
下記のように「実行中:正常です」と表示されたら、成功です!
今度は、カメラに手や腕などを映してみてください。
下記のように「実行中:異常を検知しました」と表示されたら、成功です!
もし、正常な状態(人が映っていない状態)にもかからわず、「異常」と判定されてしまった場合は、閾値の値が小さすぎることが原因です。
その場合は、ADFIの画面にて閾値をもっと大きな値に変更して「Save Threshold」と「Test」ボタンを押してから、再度試してみてください。
反対に、異常な状態(人が映っている状態)でも「正常」と判定される場合は、閾値の値が大きすぎることが原因です。上記同様の方法で、閾値を調整してください。
所感
Pythonで侵入者を検知する異常検知システムを自作できました!
侵入者検知以外にも「駐車場の空き状況の検知」や「立ち入り禁止区域の監視」など、他の用途にも使えそうな気がします。
もっと役立ちそうな用途を見つけたら、このコードを改造して作ってみたいと思います。
関連記事