大学の授業関係でラズパイ3B+とpicameraをゲット。暇なので、ラズパイに深層学習を用いた分類をさせてみようと思い立ちました。ただ、前もって撮った写真を分類させるのではなく、picameraからのリアルタイムの映像内の物体を分類させ、いい感じに表示させます。
学生レベルかもしれませんが、一部分でも参考になれば幸いです。
思い描いたこと
「固定されたpicameraの視野内に複数の私物を置くと、それをリアルタイムに分類し、表示する機能」をラズパイ内に作ってみようと思いました。
具体的には、背景差分(背景画像と変化した部分を抜き出す手法)で物体を抽出し、PyTorch [パイトーチ](Keras, TensorFlowと似たようなもの)で深層学習にかけて分類する方針をとります。
(※YOLO, SSDなどは扱いません!)
ということで次のステップで実装しました。
Step1: 自前の訓練データを用意し
Step2: ニューラルネットワークを構築して学習させ
Step3: picameraの映像から物体を抜き出し、学習済みパラメータで分類した結果を表示する仕組みを実装
ラズパイは処理が遅いので、自前PC上で学習を行い、得られたパラメータファイルを使ってラズパイ上で分類するようにしました。なのでPCにもラズパイにもPyTorchを入れました。
以下、一連の過程を記していきます。
苦戦した箇所は、**【⚠注】**記号と共にメモしておきます。
まず準備
PCとラズパイ上で実行環境を整えます。
実行環境
PCとラズパイで、同じパッケージでもバージョンが違いますがお気になさらず。
自前PCは学習用に。
自前PC (Windows 10)
- Python 3.6.4
- PyTorch 1.4.0+cpu
- Torchvision 0.5.0+cpu
※Torchvision [トーチビジョン] はPyTorchとセットで、画像の前処理、データセット作成に用いるライブラリです。
Raspberry Pi 3 Model B+ (Raspbian Stretch)
- Python 3.5.3
- PyTorch 1.3.0+cpu
- Torchvision 0.5.0+cpu
- OpenCV 3.4.7
ラズパイに差し込むカメラは Raspberry Pi Camera Module V2 を使いました。
また、PCにVNC Viewerを入れ、SSH接続でラズパイを操作しました。
構築方法
それぞれのコンピュータに上のバージョンのパッケージを入れます。
詳細は省きますが、リンクのサイト辺りを参考にさせて頂きました。
PyTorch / Torchvision
PCへはPyTorch公式から、環境を選んでインストール。
**【⚠注】**GPUはNVIDIA製のものが入っていないと使えないので、「intel入ってる」方は CUDA→Noneを選択(普通にCPUを使います)。
ラズパイへは "Raspberry Pi 3にPyTorch v1.3.0を入れる" や、"Raspberry Piに PyTorch Deep Learning Frameworkをソースコードからビルドする方法" をありがたく参考にさせて頂きビルド。
**【⚠注】**バージョンはgit clone ~~~ -b v1.3.0
などと指定
**【⚠注】**PyTorch 1.4.0はfatal error: immintrin.h
が存在しません的なものが出てビルドが80%程度で止まりました。謎。 (2020/3/20)
OpenCV
"Raspberry Pi + Python 3 に OpenCV 3 をなるべく簡単にインストールする" などを参考にさせて頂きラズパイへインストール。
どちらもビルドに数時間......
さっそく実装
試行錯誤を重ね、ひたすらPythonスクリプトを作成。
Step1: 学習に使う画像の訓練データを自分で作成
学習に用いる、私物の画像データを作成しました。
ラズパイにpicameraを差し、picameraが動かないように固定して使うことを想定しています。
作成するプログラム
"r" キーで画面を回転させた後、何も写さない状態で "p" を押して背景を撮影。
撮影したい私物を置いて再び "p" で撮影すると、背景差分をして、緑の枠の部分の写真を保存するものです。
今回は、**「某Phone」「腕時計」「財布」**の3つを分類させようと思うので、その3つの写真をひたすら撮ります。
# coding: utf-8
import cv2
from datetime import datetime
import picamera
import picamera.array
MIN_LEN = 50 # 物体検出枠の1辺の最小長さ
GRAY_THR = 20 # 濃度変化の閾値
CUT_MODE = True # True:検出物体を切り取って保存, False:画像全体をそのまま保存
def imshow_rect(img, contour, minlen=0):
"""
取得画像中の物体検出箇所全てを四角枠で囲む
引数:
img: カメラ画像
contour: コンター
minlen: 検出の大きさの閾値(これより枠の1辺が短い箇所は除く)
"""
for pt in contour:
x, y, w, h = cv2.boundingRect(pt)
if w < minlen and h < minlen: continue
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.imshow('Preview', img)
def save_cutimg(img, contour, minlen=0):
"""
取得画像中の物体検出箇所を全て切り抜き保存
引数:
同上
"""
# 日時を取得しファイル名に使用
dt = datetime.now()
f_name = '{}.jpg'.format(dt.strftime('%y%m%d%H%M%S'))
imgs_cut = []
for pt in contour:
x, y, w, h = cv2.boundingRect(pt)
if w < minlen and h < minlen: continue
imgs_cut.append(img[y:y+h, x:x+w])
# 物体を切り抜いて保存
if not imgs_cut: return -1
if len(imgs_cut) > 1:
for i in range(len(imgs_cut)):
cv2.imwrite(f_name[:-4]+'_'+str(i+1)+f_name[-4:], imgs_cut[i])
else:
cv2.imwrite(f_name, imgs_cut[0])
return len(imgs_cut)
def save_img(img):
"""
取得画像をそのまま保存
引数:
同上
"""
dt = datetime.now()
fname = '{}.jpg'.format(dt.strftime('%y%m%d%H%M%S'))
cv2.imwrite(fname, img)
def take_photo():
"""
背景撮影->物体撮影, 保存
キー入力:
"p": 写真を撮る
"q": やめる
"r": 画面を回転(背景撮影時)
"i": 初めからやり直す(物体撮影時)
"""
cnt = 0
# picamera起動
with picamera.PiCamera() as camera:
camera.resolution = (480, 480) # 解像度
camera.rotation = 0 # カメラの回転角(度)
# ストリーミング開始
with picamera.array.PiRGBArray(camera) as stream:
print('Set background ... ', end='', flush=True)
# 初めに背景を撮影
while True:
# ストリーミング画像を取得、表示
camera.capture(stream, 'bgr', use_video_port=True)
cv2.imshow('Preview', stream.array)
wkey = cv2.waitKey(5) & 0xFF # キー入力受付
stream.seek(0) # 新しくcaptureするための呪文x2
stream.truncate()
if wkey == ord('q'):
cv2.destroyAllWindows()
return print()
elif wkey == ord('r'):
camera.rotation += 90
elif wkey == ord('p'):
camera.exposure_mode = 'off' # ホワイトバランス固定
save_img(stream.array)
# グレースケール化して背景画像に設定
back_gray = cv2.cvtColor(stream.array,
cv2.COLOR_BGR2GRAY)
print('done')
break
# 背景を設定し終えたら, カメラを動かさないように対象物撮影
print('Take photos!')
while True:
camera.capture(stream, 'bgr', use_video_port=True)
# 現在のフレームをグレースケール化
stream_gray = cv2.cvtColor(stream.array,
cv2.COLOR_BGR2GRAY)
# 差分の絶対値を計算し二値化, マスク作成
diff = cv2.absdiff(stream_gray, back_gray)
mask = cv2.threshold(diff, GRAY_THR, 255,
cv2.THRESH_BINARY)[1]
cv2.imshow('mask', mask)
# 物体検出のためのコンター, マスク作成
contour = cv2.findContours(mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)[1]
# 検出された物体全てを四角で囲み表示
stream_arr = stream.array.copy()
imshow_rect(stream_arr, contour, MIN_LEN)
wkey = cv2.waitKey(5) & 0xFF
stream.seek(0)
stream.truncate()
if wkey == ord('q'):
cv2.destroyAllWindows()
return
elif wkey == ord('i'):
break
elif wkey == ord('p'):
if CUT_MODE:
num = save_cutimg(stream.array, contour, MIN_LEN)
if num > 0:
cnt += num
print(' Captured: {} (sum: {})'.format(num, cnt))
else:
save_img(stream.array)
cnt += 1
print(' Captured: 1 (sum: {})'.format(cnt))
print('Initialized')
take_photo()
if __name__ == '__main__':
take_photo()
実行
ひたすら写真を撮ります。
こんな感じで緑枠ごとの切り抜き画像が保存されます。
【⚠注】写真が少なすぎるとうまく学習しません。
自分は訓練データ用に各クラス50枚以上撮りましたが、それでも少ないかと...
一応学習時に様々なノイズを加え、データ量はかさ増しします。
写真をフォルダにまとめ、Slackとかを使って自分のPCに移します。(半アナログ)
そして各私物の写真を下のフォルダ構造で格納します。
image_data
├─train
│ ├─phone
│ │ 191227013419.jpg
│ │ 191227013424.jpg
│ │ :
│ ├─wallet
│ │ 191227013300.jpg
│ │ 191227013308.jpg
│ │ :
│ └─watch
│ 191227013345.jpg
│ 191227013351.jpg
| :
└─val
├─phone
│ 191227013441.jpg
│ 191227013448.jpg
| :
├─wallet
│ 191227013323.jpg
│ 191227013327.jpg
| :
└─watch
191227013355.jpg
191227013400.jpg
:
Step2: PC上のPyTorchで深層学習
ネットワークを構築し、先ほどの画像で学習させます。
作成するプログラム
実行すると、先ほどのフォルダから画像を読み込んで学習を始め、途中経過ファイル、損失と精度の推移図、最終的なパラメータファイルを出力するものです。
作成にあたり、"PyTorchニューラルネットワーク 実装ハンドブック"(秀和システム) をかなり参考にしました。
"Ctrl+C" で中断しても、それまでの学習経過が**"train_process.ckpt"**として保存され、次回実行時に続きから学習できます。
途中でハイパーパラメータを変更しても大丈夫です。
ちなみにtorchvsionのImageFolderは、写真が入っているフォルダ名をそのままクラス名にしたデータセットを作ってくれるんです。ラク!!
学習には先ほどのtrainフォルダ内、評価にはvalフォルダ内の写真が使われます。
# coding: utf-8
import os
import re
import torch.nn as nn
import torch.optim as optim
import torch.utils
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
DATA_DIR = 'image_data' # 画像フォルダ名
CKPT_PROCESS = 'train_process.ckpt' # 学習経過保存ファイル名
CKPT_NET = 'trained_net.ckpt' # 学習済みパラメータファイル名
NUM_CLASSES = 3 # クラス数
NUM_EPOCHS = 100 # 学習回数
# よく変更するハイパーパラメータ
LEARNING_RATE = 0.001 # 学習率
MOMENTUM = 0.5 # 慣性
checkpoint = {} # 途中経過保存用変数
# 画像データ変換定義(かさ増し)
# Resizeのサイズと, classifierの最初のLinear入力サイズが関連
data_transforms = transforms.Compose([
transforms.Resize((112, 112)), # リサイズ
transforms.RandomRotation(30), # ランダムに回転
transforms.Grayscale(), # 2値化
transforms.ToTensor(), # テンソル化
transforms.Normalize(mean=[0.5], std=[0.5]) # 正規化(数字はテキトー)
])
val_transforms = transforms.Compose([
transforms.Resize((112, 112)),
transforms.Grayscale(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5])
])
# データセット作成
train_dataset = datasets.ImageFolder(
root=os.path.join(DATA_DIR, 'train'),
transform=train_transforms
)
val_dataset = datasets.ImageFolder(
root=os.path.join(DATA_DIR, 'val'),
transform=val_transforms
)
# ミニバッチ取得
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=10, # 学習時のバッチサイズ
shuffle=True # 訓練データをシャッフル
)
val_loader = torch.utils.data.DataLoader(
dataset=val_dataset,
batch_size=10,
shuffle=True
)
class NeuralNet(nn.Module):
"""ネットワーク定義. nn.Module継承"""
def __init__(self, num_classes):
super(NeuralNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(8, 16, kernel_size=5, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.classifier = nn.Sequential(
nn.Dropout(p=0.5),
nn.Linear(400, 200),
nn.ReLU(inplace=True),
nn.Dropout(p=0.5),
nn.Linear(200, num_classes)
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
def main():
"""訓練途中データ読み込み->学習(->訓練途中データの保存)->結果の図示"""
global checkpoint
print('[Settings]')
# デバイスの設定
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# ネットワーク, 評価関数, 最適化関数設定
net = NeuralNet(NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss() # 評価関数
optimizer = optim.SGD( # 最適化アルゴリズム
net.parameters(),
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=5e-4
)
# 設定の表示
# print(' Device :', device)
# print(' Dataset Class-Index :', train_dataset.class_to_idx)
# print(' Network Model :', re.findall('(.*)\(', str(net))[0])
# print(' Criterion :', re.findall('(.*)\(', str(criterion))[0])
# print(' Optimizer :', re.findall('(.*)\(', str(optimizer))[0])
# print(' -Learning Rate :', LEARNING_RATE)
# print(' -Momentum :', MOMENTUM)
t_loss_list = []
t_acc_list = []
v_loss_list = []
v_acc_list = []
epoch_pre = -1
# 訓練(途中)データ取得
if os.path.isfile(CKPT_PROCESS):
checkpoint = torch.load(CKPT_PROCESS)
net.load_state_dict(checkpoint['net'])
optimizer.load_state_dict(checkpoint['optimizer'])
t_loss_list = checkpoint['t_loss_list']
t_acc_list = checkpoint['t_acc_list']
v_loss_list = checkpoint['v_loss_list']
v_acc_list = checkpoint['v_acc_list']
epoch_pre = checkpoint['epoch']
print("Progress until last time = {}/{} epochs"\
.format(epoch_pre+1, NUM_EPOCHS))
print('[Main process]')
for epoch in range(epoch_pre+1, NUM_EPOCHS):
t_loss, t_acc, v_loss, v_acc = 0, 0, 0, 0
# 学習 ---------------------------------------------------------
net.train() # 学習モード
for _, (images, labels) in enumerate(train_loader):
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = net(images)
loss = criterion(outputs, labels)
t_loss += loss.item()
t_acc += (outputs.max(1)[1] == labels).sum().item()
loss.backward()
optimizer.step()
avg_t_loss = t_loss / len(train_loader.dataset)
avg_t_acc = t_acc / len(train_loader.dataset)
# 評価 ---------------------------------------------------------
net.eval() # 評価モード
with torch.no_grad(): # 勾配の更新を停止
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
images = images.to(device)
labels = labels.to(device)
outputs = net(images)
loss = criterion(outputs, labels)
v_loss += loss.item()
v_acc += (outputs.max(1)[1] == labels).sum().item()
avg_v_loss = v_loss / len(val_loader.dataset)
avg_v_acc = v_acc / len(val_loader.dataset)
# --------------------------------------------------------------
print('\rEpoch [{}/{}] | Train [oss:{:.3f}, acc:{:.3f}] | Val [loss:{:.3f}, acc:{:.3f}]'\
.format(epoch+1, NUM_EPOCHS, avg_t_loss, avg_t_acc, avg_v_loss, avg_v_acc), end='')
# 損失, 精度記録
t_loss_list.append(avg_t_loss)
t_acc_list.append(avg_t_acc)
v_loss_list.append(avg_v_loss)
v_acc_list.append(avg_v_acc)
# 途中経過保存用処理
checkpoint['net'] = net.state_dict()
checkpoint['optimizer'] = optimizer.state_dict()
checkpoint['t_loss_list'] = t_loss_list
checkpoint['t_acc_list'] = t_acc_list
checkpoint['v_loss_list'] = v_loss_list
checkpoint['v_acc_list'] = v_acc_list
checkpoint['epoch'] = epoch
graph()
save_process()
save_net()
def save_process():
"""途中経過を保存"""
global checkpoint
if not checkpoint: return
torch.save(checkpoint, CKPT_PROCESS)
def save_net():
"""ネットワーク情報のみ保存"""
global checkpoint
if not checkpoint: return
torch.save(checkpoint['net'], CKPT_NET)
def graph():
"""損失, 精度のグラフ化"""
global checkpoint
if not checkpoint: return
t_loss_list = checkpoint['t_loss_list']
t_acc_list = checkpoint['t_acc_list']
v_loss_list = checkpoint['v_loss_list']
v_acc_list = checkpoint['v_acc_list']
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(range(len(t_loss_list)), t_loss_list,
color='blue', linestyle='-', label='t_loss')
plt.plot(range(len(v_loss_list)), v_loss_list,
color='green', linestyle='--', label='v_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.title('Training and validation loss')
plt.grid()
plt.subplot(1, 2, 2)
plt.plot(range(len(t_acc_list)), t_acc_list,
color='blue', linestyle='-', label='t_acc')
plt.plot(range(len(v_acc_list)), v_acc_list,
color='green', linestyle='--', label='v_acc')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('acc')
plt.title('Training and validation accuracy')
plt.grid()
plt.show()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
graph()
save_process()
【⚠注】ネットワークの規模はほどほどに。
層やノード数を増やし過ぎると、のちにラズパイで分類する際、膨大な量のパラメータがメモリを食い尽すのかDefaultCPUAllocator: can't allocate memory: you tried to allocate 685198800 bytes.
のエラーが出ます。
実行
学習の経過がコチラ。
左が損失、右が精度です。また青線が訓練データ、緑破線が検証データに対するものです。
検証データで精度は72%程度。改善の余地があります...
学習を終えると、学習済みパラメータのみを保存した**""trained_net.ckpt""ファイルが出来るので、それをまたSlackかなんかでラズパイに送ります**。
Step3: ラズパイでカメラ映像をリアルタイム分類、結果表示
ゴールとして、カメラ映像内の物体をリアルタイムで分類させ、いい感じに表示します。
作成するプログラム
初めに背景を撮影、その後のフレームからは背景を除算し、浮かび上がった物体を切り取り、定義された前処理を通して4次元のtensorバッチにします。
バッチまるごとネットワークに通して各クラスの確率に変換し、最も確率の高いクラス(物の名前)を重ね描きしてウィンドウに表示します。
先ほど出来た "trained_net.ckpt" を読み込みます。
【⚠注】バッチサイズ(一度に検出する物体の数)に上限を設けないと、検出した大量の領域を一度に処理しようとして、ラズパイがフリーズする恐れがあります。
# coding: utf-8
import os
from PIL import Image
from time import sleep
import cv2
import picamera
import picamera.array
import torch
# pytorchディレクトリで "export OMP_NUM_THREADS=1 or 2 or 3" 必須(デフォルトは4)
# 並列処理コア数は "print(torch.__config__.parallel_info())" で確認
import torch.nn as nn
import torch.utils
from torchvision import transforms
CKPT_NET = 'trained_net.ckpt' # 学習済みパラメータファイル
OBJ_NAMES = ['Phone', 'Wallet', 'Watch'] # 各クラスの表示名
MIN_LEN = 50
GRAY_THR = 20
CONTOUR_COUNT_MAX = 3 # バッチサイズ(一度に検出する物体の数)の上限
SHOW_COLOR = (255, 191, 0) # 枠の色(B,G,R)
NUM_CLASSES = 3
PIXEL_LEN = 112 # Resize後のサイズ(1辺)
CHANNELS = 1 # 色のチャンネル数(BGR:3, グレースケール:1)
# 画像データ変換定義
# Resizeと, classifierの最初のLinear入力が関連
data_transforms = transforms.Compose([
transforms.Resize((PIXEL_LEN, PIXEL_LEN)),
transforms.Grayscale(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5])
])
class NeuralNet(nn.Module):
"""ネットワーク定義. 学習に用いたものと同一である必要"""
def __init__(self, num_classes):
super(NeuralNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(8, 16, kernel_size=5, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.classifier = nn.Sequential(
nn.Dropout(p=0.5),
nn.Linear(400, 200),
nn.ReLU(inplace=True),
nn.Dropout(p=0.5),
nn.Linear(200, num_classes)
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
def detect_obj(back, target):
"""
OpenCVの背景差分処理で, 検出された物体のタプルを作成
引数:
back: 入力背景画像
カラー画像
target: 背景差分対象の画像
カラー画像. 複数の物体を切り抜き, カラー画像タプルにまとめる
"""
print('Detecting objects ...')
# 2値化
b_gray = cv2.cvtColor(back, cv2.COLOR_BGR2GRAY)
t_gray = cv2.cvtColor(target, cv2.COLOR_BGR2GRAY)
# 差分を計算
diff = cv2.absdiff(t_gray, b_gray)
# 閾値に従ってコンター, マスクを作成, 物体を抽出
# findContoursのインデックスは, cv2.__version__ == 4.2.0->[0], 3.4.7->[1]
mask = cv2.threshold(diff, GRAY_THR, 255, cv2.THRESH_BINARY)[1]
cv2.imshow('mask', mask)
contour = cv2.findContours(mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)[1]
# 一定の縦横幅以上で検出された変化領域の座標, サイズバッチを作成
pt_list = list(filter(
lambda x: x[2] > MIN_LEN and x[3] > MIN_LEN,
[cv2.boundingRect(pt) for pt in contour]
))[:CONTOUR_COUNT_MAX]
# 位置情報に従ってフレーム切り抜き, PIL画像のタプルに変換して返す
obj_imgaes = tuple(map(
lambda x: Image.fromarray(target[x[1]:x[1]+x[3], x[0]:x[0]+x[2]]),
pt_list
))
return (obj_imgaes, pt_list)
def batch_maker(tuple_images, transform):
"""
PIL形式の画像のタプルをtransformし, ネットワークで処理可能なtensorバッチに変換
引数:
tuple_images: PIL画像タプル
transform: torchvision画像変換定義
"""
return torch.cat([transform(img) for img
in tuple_images]).view(-1, CHANNELS, PIXEL_LEN, PIXEL_LEN)
def judge_what(img, probs_list, pos_list):
"""
各クラスに属する確率から物体を決定し, その位置に枠と名前を表示, クラスのインデックスを返す
引数:
probs_list: 確率の二次配列. バッチ形式
pos_list: 位置の二次配列. バッチ形式
"""
print('Judging objects ...')
# 最も高い確率とそのインデックスのリストに変換
ip_list = list(map(lambda x: max(enumerate(x), key = lambda y:y[1]),
F.softmax(probs_list, dim=-1))) # <- 4/30修正
# インデックスを物体名に変換, 物体の位置に物体名と確信度を書き込み表示
for (idx, prob), pos in zip(ip_list, pos_list):
cv2.rectangle(img, (pos[0], pos[1]), (pos[0]+pos[2], pos[1]+pos[3]), SHOW_COLOR, 2)
cv2.putText(img, '%s:%.1f%%'%(OBJ_NAMES[idx], prob*100), (pos[0]+5, pos[1]+20),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, SHOW_COLOR, thickness=2)
return ip_list
def realtime_classify():
"""学習済みモデル読み込み->テストデータ読み込み->分類->結果を画像に重ねて表示"""
# デバイス設定
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# ネットワーク設定
net = NeuralNet(NUM_CLASSES).to(device)
# 訓練済みデータ取得
if os.path.isfile(CKPT_NET):
checkpoint = torch.load(CKPT_NET)
net.load_state_dict(checkpoint)
else:
raise FileNotFoundError('No trained network file: {}'.format(CKPT_NET))
# 評価モード
net.eval()
# picamera起動
with picamera.PiCamera() as camera:
camera.resolution = (480, 480)
# ストリーミング開始
with picamera.array.PiRGBArray(camera) as stream:
print('Setting background ...')
sleep(2)
camera.exposure_mode = 'off' # ホワイトバランス固定
camera.capture(stream, 'bgr', use_video_port=True)
# 背景に設定
img_back = stream.array
stream.seek(0)
stream.truncate()
print('Start!')
with torch.no_grad():
while True:
camera.capture(stream, 'bgr', use_video_port=True)
# これからの入力画像に対して背景差分
img_target = stream.array
# 物体とその位置を検出
obj_imgs, positions = detect_obj(img_back, img_target)
if obj_imgs:
# 検出物体をネットワークの入力形式に変換
obj_batch = batch_maker(obj_imgs, data_transforms)
# 分類
outputs = net(obj_batch)
# 判定
result = judge_what(img_target, outputs, positions)
print(' Result:', result)
# 表示
cv2.imshow('detection', img_target)
if cv2.waitKey(200) == ord('q'):
cv2.destroyAllWindows()
return
stream.seek(0)
stream.truncate()
if __name__ == "__main__":
try:
realtime_classify()
except KeyboardInterrupt:
cv2.destroyAllWindows()
実行
先ほどの"trained_net.ckpt"をラズパイ上に持ってきて、同ディレクトリ内で実行。
検知した物体名とその確信度が表示されます。
実行結果は......置いた瞬間から高精度で分類してくれて満足!!
【⚠注】実行に使用するコア数(デフォルト4)を変更することをお勧めします。
4コアフルで使うとフリーズする恐れ大です。
pytorchディレクトリで export OMP_NUM_THREADS=2
(2コア使用)とコマンドを打ち変更します。コア数はprint(torch.__config__.parallel_info())
で確認できます。
ただしシェルを閉じると変更が破棄されるので、永続化するには/home/pi
にある**".profile"**の最下段...~ fi
の下に、export OMP_NUM_THREADS=2
と書き込みrebootします。
まとめ
やりたかったことはできました!(可読性に欠ける箇所はすみません...)
OpenCVの顔検出を使えば、とても簡単な顔認証にはすぐ応用できそうです。
もともとはSSDを実装しようと思ってましたが、位置情報付きのデータセットを作るのが大変そうだと思ったうえ、サンプルデータで学習させようとして出たエラーが解決できなかったので諦めました...
SSDなどと違い、今回の背景差分では重なり合った物を分離できず、一体だと判断されてしまうのが欠点です。
いい勉強になりました~