更新情報: 2021年5月に TensorFlow 2.5, OpenCV 4.5.1, Debian 11 Bullseyeに合わせて更新しプログラムを動作確認した
ラズパイにUSBカメラを繋ぎ、Python 3上のOpenCVを用いて映像を取り込み、リアルタイムにTensorFlowで物体検出する手順です。Tensorflow HubのサンプルとTensorFlow Lite のサンプルを改変して用いて、それぞれ以下のような検出結果を表示します。ラズパイ依存部分は無いので、インテルCPUを積んだノードパソコンとかでも以下のプログラムは実は問題無く動作します(少なくともLinux稼働していれば~~(などといいつつ後半2つがインテルUbuntu 20.04で動作していなくて直せていない…😭)~~)。ARM特有の話として import cv2
をTensorFlow関連パッケージのimport
よりも前に置かないとエラーになるという落とし穴があります。これはインテルCPUだと起きない現象で、pythonのプログラムを自分で書くときは注意が必要です。今までのところ以下の話は
- Raspberry Pi OS Buster 32bit
- Raspberry Pi OS Buster 64bit
- Raspberry Pi OS Bullseye 64bit
- Ubuntu Mate 20.04 (Focal) Beta1 64bit ラズパイ
- Ubuntu Server 20.10 (Groovy) 64bit ラズパイ
- Ubuntu Mate 20.04 (Focal) 64bit インテルノートPC
- 純正Debian Bullseye 64bit
で動作確認しています。ラズパイでは同じハードウェアでも、32bitよりも64ビットのほうがニューラルネットの推論が~~約2倍速いです~~(そこまでは速くない…)。しかし、使用メモリ量も増えます。
記事末尾のプログラムで、カメラから1024x1024の画像を切り出して、EfficientDet D4をに入力すると以下のような検出結果になります。
このとき画像1枚の物体検出に必要な時間は約20秒で、そのときのメモリ使用状況は下記のような感じです。
kakinagu@raspi-mate:~$ top
top - 16:47:48 up 1:04, 2 users, load average: 8.23, 7.74, 7.45
Tasks: 158 total, 3 running, 155 sleeping, 0 stopped, 0 zombie
%Cpu(s): 92.7 us, 4.1 sy, 2.5 ni, 0.3 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
MiB Mem : 7759.4 total, 2330.7 free, 4420.6 used, 1008.1 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 3223.9 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1753 kakinagu 20 0 6299756 4.2g 286584 S 354.0 55.8 143:53.77 python3
1754 kakinagu 39 19 715216 136896 66040 S 5.6 1.7 10:03.56 python3
1752 kakinagu 39 19 925524 129412 81648 R 5.6 1.6 14:45.44 python3
動作確認環境
Raspberry Pi 4B メモリ8 GBモデルで確認したが、メモリはTensorflow Liteを使うなら 2GB、Tensorflow Hubを用いるなら4 GBで十分だと思われる。Raspberry Pi OS Buster の32ビット版と64ビット版で動作確認した。ラズパイ専用のカメラではなくて、そこらへんのパソコンでも使えるUSB接続のカメラを使っていることに注意。ラズパイ専用カメラの場合は Raspberry Piと純正カメラモジュールで監視カメラを作る、おそらく正しい方法 jessie版 (motion + v4l2ドライバ) などに従って bcm2835-v4l2.ko モジュールをカーネルに読み込むとUSBカメラと同様に扱えます。
準備
- ラズパイでUSBカメラの映像をPythonのOpenCVで表示する などに従って、USBカメラの映像をラズパイで取り込み・表示できるように設定する
- ラズパイへのTensorFlow 2.4簡単インストールして物体検出 などに従って、Tensorflow 2.2以上、Tensorflow Hubをインストールする
物体検出プログラム
この後の話は TensorFlowでの物体検出が超手軽にできる「Object Detection Tools」をTensorFlow 2.xに対応しました と本質的(というか機能的に)に同じです
Tensorflow Hubのサンプルの改造版
https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb のサンプルはインターネットから画像をダウンロードして物体検出しているが、それをUSBカメラから取り込むように変更した。TensorFlow Hub系のものは起動にかなり時間がかかります。本記事末尾のEfficientDet D0で約8分、D4で約16分程度Raspberry Pi OS 64bit上でかかります。高速化したい場合は TensorFlow Lite形式への変換 を行うと起動が速くなると思います。
- カメラの撮影解像度を 800x600 にしてあるので、それを適切に変更する。利用可能な解像度などは
v4l2-ctl --list-formats-ext
で調べられる。v4l2-ctl
コマンドが無ければsudo apt-get install v4l-utils
でインストールできる。 -
python3 tfhub.py
で起動する - 検出するためのニューラルネットワークはMobileNetV2 SSDであるがこれをInception Resnetに変更できる。しかし、そうすると必要な仮想メモリーが10 GB以上になり、ラズパイ4B メモリ8 GBモデルで64ビット版Raspberry Pi OSを用いないと動作しなくなる。MobileNetV2の必要メモリはだいたい3 GBくらいで、32ビット版Raspberry Pi OSでギリギリ動作する。フレームの物体検出にMobileNetV2が掛かる時間は0.7秒くらいであった。
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb
# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
import cv2
#@title Imports and function definitions
# For running inference on the TF-Hub module.
import tensorflow as tf
import tensorflow_hub as hub
# For drawing onto the image.
import numpy as np
from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps
# For measuring the inference time.
import time
# Print Tensorflow version
print(tf.__version__)
# Check available GPU devices.
print("The following GPU devices are available: %s" % tf.test.gpu_device_name())
def draw_bounding_box_on_image(image,
ymin,
xmin,
ymax,
xmax,
color,
font,
thickness=4,
display_str_list=()):
"""Adds a bounding box to an image."""
draw = ImageDraw.Draw(image)
im_width, im_height = image.size
(left, right, top, bottom) = (xmin * im_width, xmax * im_width,
ymin * im_height, ymax * im_height)
draw.line([(left, top), (left, bottom), (right, bottom), (right, top),
(left, top)],
width=thickness,
fill=color)
# If the total height of the display strings added to the top of the bounding
# box exceeds the top of the image, stack the strings below the bounding box
# instead of above.
display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
# Each display_str has a top and bottom margin of 0.05x.
total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)
if top > total_display_str_height:
text_bottom = top
else:
text_bottom = top + total_display_str_height
# Reverse list and print from bottom to top.
for display_str in display_str_list[::-1]:
text_width, text_height = font.getsize(display_str)
margin = np.ceil(0.05 * text_height)
draw.rectangle([(left, text_bottom - text_height - 2 * margin),
(left + text_width, text_bottom)],
fill=color)
draw.text((left + margin, text_bottom - text_height - margin),
display_str,
fill="black",
font=font)
text_bottom -= text_height - 2 * margin
def draw_boxes(image, boxes, class_names, scores, max_boxes=10, min_score=0.1):
"""Overlay labeled boxes on an image with formatted scores and label names."""
colors = list(ImageColor.colormap.values())
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSansNarrow-Regular.ttf",
25)
except IOError:
print("Font not found, using default font.")
font = ImageFont.load_default()
for i in range(min(boxes.shape[0], max_boxes)):
if scores[i] >= min_score:
ymin, xmin, ymax, xmax = tuple(boxes[i])
display_str = "{}: {}%".format(class_names[i].decode("ascii"),
int(100 * scores[i]))
color = colors[hash(class_names[i]) % len(colors)]
#image_pil = Image.fromarray(np.uint8(image)).convert("RGB")
image_pil = Image.fromarray(image)
draw_bounding_box_on_image(
image_pil,
ymin,
xmin,
ymax,
xmax,
color,
font,
display_str_list=[display_str])
np.copyto(image, np.array(image_pil))
return image
module_handle = "https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
# 次行のモデルは仮想メモリを10ギガバイト占有するのでラズパイには重すぎる
#module_handle = "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
detector = hub.load(module_handle).signatures['default']
capture = cv2.VideoCapture(0)
if capture.isOpened() is False:
raise("IO Error")
capture.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
while(True):
try:
ret, frame = capture.read()
if ret is False:
raise("IO Error")
rgb_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
converted_img = tf.image.convert_image_dtype(rgb_img, tf.float32)[tf.newaxis, ...]
start_time = time.time()
result = detector(converted_img)
end_time = time.time()
result = {key:value.numpy() for key,value in result.items()}
print("Found %d objects." % len(result["detection_scores"]))
print("Inference time: ", end_time-start_time)
image_with_boxes = draw_boxes(
rgb_img, result["detection_boxes"],
result["detection_class_entities"], result["detection_scores"])
cv2.imshow('frame', cv2.cvtColor(image_with_boxes, cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
except KeyboardInterrupt:
# 終わるときは CTRL + C を押す
break
capture.release()
cv2.destroyAllWindows()
Tensorflow Liteのサンプルの改造版
https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi のサンプルはラズパイ専用カメラで撮影した映像の物体検出しているが、それをUSBカメラから取り込むように変更した。なお、ラズパイUbuntu 20.04ではエラーが出て動かなくて、一応github にイシューを上げてあります(本記事著者のプログラムの問題点も否定しきれない) もっと初歩的なミスだった…💦
- カメラの撮影解像度を記事著者のカメラの解像度 800x600 にしてあるので、以下の
detect_usbcamera.py
の解像度を設定している部分を適切に変更する - https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi/download.sh を実行して必要なファイルをダウンロードする
-
python3 detect_usbcamera.py --model /tmp/detect.tflite --labels /tmp/coco_labels.txt
で実行する。必要メモリ量はだいたい1 GB弱である。1フレームの物体検出にニューラルネットワークが掛かる時間は0.2秒くらいであった。 -
上記で
detect.tflite
がいつの間にか消滅しているので、代わりに https://www.tensorflow.org/lite/examples/object_detection/overview からダウンロードできるデータssd_mobilenet_v1_1_metadata_1.tflite
をdetect.tflite
の代わりに使用して下さい。coco_labels.txt
は取り敢えずそのまま流用できました。
# python3
#
# The following is the original copyright notice.
# This is modified from detect_picamera.py to handle a USB camera
# by OpenCV. The original is from
# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi
#
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example using TF Lite to detect objects with a USB camera."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cv2
import argparse
import io
import re
import time
from annotation import Annotator
import numpy as np
from PIL import Image
import tensorflow.lite as tflite
def load_labels(path):
"""Loads the labels file. Supports files with or without index numbers."""
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()
labels = {}
for row_number, content in enumerate(lines):
pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
if len(pair) == 2 and pair[0].strip().isdigit():
labels[int(pair[0])] = pair[1].strip()
else:
labels[row_number] = pair[0].strip()
return labels
def set_input_tensor(interpreter, image):
"""Sets the input tensor."""
tensor_index = interpreter.get_input_details()[0]['index']
input_tensor = interpreter.tensor(tensor_index)()[0]
input_tensor[:, :] = image
def get_output_tensor(interpreter, index):
"""Returns the output tensor at the given index."""
output_details = interpreter.get_output_details()[index]
tensor = np.squeeze(interpreter.get_tensor(output_details['index']))
return tensor
def detect_objects(interpreter, image, threshold):
"""Returns a list of detection results, each a dictionary of object info."""
set_input_tensor(interpreter, image)
interpreter.invoke()
# Get all output details
boxes = get_output_tensor(interpreter, 0)
classes = get_output_tensor(interpreter, 1)
scores = get_output_tensor(interpreter, 2)
count = int(get_output_tensor(interpreter, 3))
results = []
for i in range(count):
if scores[i] >= threshold:
result = {
'bounding_box': boxes[i],
'class_id': classes[i],
'score': scores[i]
}
results.append(result)
return results
def annotate_objects(annotator, results, labels, CAMERA_WIDTH, CAMERA_HEIGHT):
"""Draws the bounding box and label for each object in the results."""
for obj in results:
# Convert the bounding box figures from relative coordinates
# to absolute coordinates based on the original resolution
ymin, xmin, ymax, xmax = obj['bounding_box']
xmin = int(xmin * CAMERA_WIDTH)
xmax = int(xmax * CAMERA_WIDTH)
ymin = int(ymin * CAMERA_HEIGHT)
ymax = int(ymax * CAMERA_HEIGHT)
# Overlay the box, label, and score on the camera preview
annotator.bounding_box([xmin, ymin, xmax, ymax])
annotator.text([xmin, ymin],
'%s\n%.2f' % (labels[obj['class_id']], obj['score']))
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--model', help='File path of .tflite file.', required=True)
parser.add_argument(
'--labels', help='File path of labels file.', required=True)
parser.add_argument(
'--threshold',
help='Score threshold for detected objects.',
required=False,
type=float,
default=0.4)
args = parser.parse_args()
labels = load_labels(args.labels)
interpreter = tflite.Interpreter(args.model)
interpreter.allocate_tensors()
_, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
camera = cv2.VideoCapture(0)
try:
if camera.isOpened() is False:
raise("IO Error")
camera.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
CAMERA_WIDTH = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
CAMERA_HEIGHT = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
annotator = Annotator(camera)
while True:
annotator.update()
image = annotator.buffer.resize(
(input_width, input_height), Image.BICUBIC)
start_time = time.monotonic()
results = detect_objects(interpreter, image, args.threshold)
elapsed_ms = (time.monotonic() - start_time) * 1000
# annotator.clear()
annotate_objects(annotator, results, labels, CAMERA_WIDTH, CAMERA_HEIGHT)
annotator.text([5, 0], '%.1fms' % (elapsed_ms))
cv2.imshow('frame',cv2.cvtColor(np.asarray(annotator.buffer), cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
del image
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
# python3
# The following is the original copyright notice.
# This is modified from the original for capturing from the first USB camera with OpenCV.
# The original is from
# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi
#
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An annotation library that draws overlays on the USB camera preview.
Annotations include bounding boxes and text overlays.
Annotations support partial opacity, however only with respect to the content in
the preview. A transparent fill value will cover up previously drawn overlay
under it, but not the camera content under it. A color of None can be given,
which will then not cover up overlay content drawn under the region.
Note: Overlays do not persist through to the storage layer so images saved from
the camera, will not contain overlays.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cv2
from PIL import Image
from PIL import ImageDraw
class Annotator:
"""Utility for managing annotations on the camera preview."""
def __init__(self, camera, default_color=None):
"""Initializes Annotator parameters.
Args:
camera: cv2.VideoCapture camera object to overlay on top of.
default_color: PIL.ImageColor (with alpha) default for the drawn content.
"""
self._camera = camera
self._dims = (int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)), int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
self.buffer = Image.new('RGB', self._dims)
self._draw = ImageDraw.Draw(self.buffer)
self._default_color = default_color or (0xFF, 0, 0)
def update(self):
"""Draws any changes to the image buffer onto the overlay."""
ret, captured_frame = self._camera.read()
if ret is False:
raise("IO Error")
self.buffer = Image.fromarray(cv2.cvtColor(captured_frame, cv2.COLOR_BGR2RGB), 'RGB')
self._draw = ImageDraw.Draw(self.buffer)
def clear(self):
"""Clears the contents of the overlay, leaving only the plain background."""
self._draw.rectangle((0, 0) + self._dims, fill=(0, 0, 0, 0x00))
def bounding_box(self, rect, outline=None, fill=None):
"""Draws a bounding box around the specified rectangle.
Args:
rect: (x1, y1, x2, y2) rectangle to be drawn, where (x1, y1) and (x2, y2)
are opposite corners of the desired rectangle.
outline: PIL.ImageColor with which to draw the outline (defaults to the
Annotator default_color).
fill: PIL.ImageColor with which to fill the rectangle (defaults to None,
which will *not* cover up drawings under the region).
"""
outline = outline or self._default_color
self._draw.rectangle(rect, fill=fill, outline=outline)
def text(self, location, text, color=None):
"""Draws the given text at the given location.
Args:
location: (x, y) point at which to draw the text (upper left corner).
text: string to be drawn.
color: PIL.ImageColor to draw the string in (defaults to the Annotator
default_color).
"""
color = color or self._default_color
self._draw.text(location, text, fill=color)
並行処理のよる高速化
物体検出の処理は
- カメラからの画像取り込み
- ニューラルネットワークによる推論
- 推論結果のGUIへの書き出し
3つからなるが、これらは並行して処理できる。Python 3のmultiprocessingでプロセス間で大量のデータを受け渡しつつnumpyで処理する のテクニックを用いて、Tensorflow Hubを用いるバージョンを高速化したものを以下に紹介する。なお、以下のバージョンはUbuntu 20.04附属の python3-opencv のバージョン4.2だとカメラからの読み出しで必ずエラーが起きるので動きません😭 multiprocess の使用を止めるとエラーが起きなくなるという謎の現象がおきていて、対応法が今の所謎です。 うまく動作しないときは、最初に一度だけimport cv2
してそれがmultiprocessing.Process
でfork
された子プロセスに受け継がれていたが、 multiprocessing.Process
で作られた子プロセスの中で個別に import cv2
したらOpenCV4.2でも動作するようになった。全く意味不明である…😭←Ubuntu 20.10にしたらそれでもエラーが出たので、tfhub-faster4.py, tfhub8.pyのようになった
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb
# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
import cv2
# For measuring the inference time.
import time
initialization_start_time = time.time()
import multiprocessing
import multiprocessing.sharedctypes
import os
# Use v4l2-ctl --list-formats-ext
MY_CAMERA_WIDTH=800
MY_CAMERA_HEIGHT=600
MY_FPS=6
#@title Imports and function definitions
# For drawing onto the image.
import numpy as np
from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps
def draw_bounding_box_on_image(image,
ymin,
xmin,
ymax,
xmax,
color,
font,
thickness=4,
display_str_list=()):
"""Adds a bounding box to an image."""
draw = ImageDraw.Draw(image)
im_width, im_height = image.size
(left, right, top, bottom) = (xmin * im_width, xmax * im_width,
ymin * im_height, ymax * im_height)
draw.line([(left, top), (left, bottom), (right, bottom), (right, top),
(left, top)],
width=thickness,
fill=color)
# If the total height of the display strings added to the top of the bounding
# box exceeds the top of the image, stack the strings below the bounding box
# instead of above.
display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
# Each display_str has a top and bottom margin of 0.05x.
total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)
if top > total_display_str_height:
text_bottom = top
else:
text_bottom = top + total_display_str_height
# Reverse list and print from bottom to top.
for display_str in display_str_list[::-1]:
text_width, text_height = font.getsize(display_str)
margin = np.ceil(0.05 * text_height)
draw.rectangle([(left, text_bottom - text_height - 2 * margin),
(left + text_width, text_bottom)],
fill=color)
draw.text((left + margin, text_bottom - text_height - margin),
display_str,
fill="black",
font=font)
text_bottom -= text_height - 2 * margin
def draw_boxes(image, boxes, class_names, scores, max_boxes=10, min_score=0.1):
"""Overlay labeled boxes on an image with formatted scores and label names."""
colors = list(ImageColor.colormap.values())
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSansNarrow-Regular.ttf",
25)
except IOError:
print("Font not found, using default font.")
font = ImageFont.load_default()
for i in range(min(boxes.shape[0], max_boxes)):
if scores[i] >= min_score:
ymin, xmin, ymax, xmax = tuple(boxes[i])
display_str = "{}: {}%".format(class_names[i].decode("ascii"),
int(100 * scores[i]))
color = colors[hash(class_names[i]) % len(colors)]
#image_pil = Image.fromarray(np.uint8(image)).convert("RGB")
image_pil = Image.fromarray(image)
draw_bounding_box_on_image(
image_pil,
ymin,
xmin,
ymax,
xmax,
color,
font,
display_str_list=[display_str])
np.copyto(image, np.array(image_pil))
return image
def camera_reader(out_buf, buf1_ready):
os.nice(19) # Make the priority of this process the lowest.
try:
capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
except TypeError:
capture = cv2.VideoCapture(0)
if capture.isOpened() is False:
raise IOError
has_cap_buffer_size = True
try: cv2.CAP_PROP_BUFFERSIZE
except NameError: has_cap_buffer_size = False
if has_cap_buffer_size: capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'))
capture.set(cv2.CAP_PROP_FRAME_WIDTH, MY_CAMERA_WIDTH)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, MY_CAMERA_HEIGHT)
#capture.set(cv2.CAP_PROP_FPS, MY_FPS)
while(True):
try:
ret, frame = capture.read()
if ret is False:
raise IOError
buf1_ready.clear()
np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
buf1_ready.set()
except KeyboardInterrupt:
break
capture.release()
def annotator(in_buf, out_buf, buf1_ready):
# For running inference on the TF-Hub module.
import tensorflow as tf
import tensorflow_hub as hub
# Print Tensorflow version
print(tf.__version__)
# Check available GPU devices.
print("The following GPU devices are available: %s" % tf.test.gpu_device_name())
module_handle = "https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
# 次行のモデルは仮想メモリを10ギガバイト占有するのでラズパイには重すぎる
#module_handle = "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
detector = hub.load(module_handle).signatures['default']
print("Initialization time: ", time.time()-initialization_start_time)
end_time = time.time()
while True:
buf1_ready.wait()
rgb_img = np.array(np.reshape(in_buf, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3)), copy=True)
buf1_ready.clear()
converted_img = tf.image.convert_image_dtype(rgb_img, tf.float32)[tf.newaxis, ...]
start_time = time.time()
result = detector(converted_img)
last_end_time = end_time
end_time = time.time()
result = {key:value.numpy() for key,value in result.items()}
print("Found %d objects." % len(result["detection_scores"]))
print("Inference time: ", end_time-start_time, end="")
print(" Total time: ", end_time-last_end_time)
image_with_boxes = draw_boxes(
rgb_img, result["detection_boxes"],
result["detection_class_entities"], result["detection_scores"])
np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(image_with_boxes, cv2.COLOR_RGB2BGR) , (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
multiprocessing.set_start_method('fork')
buf1 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf2 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
p1=multiprocessing.Process(target=camera_reader, args=(buf1,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=annotator, args=(buf1,buf2,buf1_ready), daemon=True)
p2.start()
p1.start()
image_with_boxes = np.empty((MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3), dtype=np.uint8)
os.nice(19) # Make the priority of this process the lowest
while True:
try:
np.asarray(image_with_boxes)[:,:,:] = np.reshape(buf2, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3))
cv2.imshow('frame', image_with_boxes)
cv2.waitKey(10)
except KeyboardInterrupt:
# 終わるときは CTRL + C を押す
print("Waiting camera reader to finish.")
p1.join(10)
break
cv2.destroyAllWindows()
TF Hub にある40個の訓練済みモデルを使えるようにする
TensorFlow 2 meets the Object Detection API で紹介されたように https://tfhub.dev/tensorflow/collections/object_detection/1 に 40 個くらいの物体検出の訓練済みニューラルネットワークがある。これを使えるようにするための作業を紹介する。 基本的には前節と同じだが、検出結果のデータ型が若干違うため微調整が必要である。本当は https://github.com/tensorflow/models/blob/master/research/object_detection/colab_tutorials/inference_from_saved_model_tf2_colab.ipynb に書いてあるようにPythonパッケージをインストールするのが正しい方法だが ARM64 用のTensorFlow Addonsが無くてどうしようもないので、パッケージをつまみ食いするやり方を紹介する。
EfficientDet D0 https://tfhub.dev/tensorflow/efficientdet/d0/1 が、起動に時間がかかる(10分前後)が、約2秒弱で画像一つの推論が終わり占有実メモリが2 GB強で良いと思う。EfficientDet最強のD7を用いると、ラズパイ4Bで画像1枚あたり85秒、実メモリは5 GB強使用します。
- https://github.com/tensorflow/models/blob/master/research/object_detection/utils/visualization_utils.py をダウンロードし、以下の行を削除する
from object_detection.core import keypoint_ops
from object_detection.core import standard_fields as fields
from object_detection.utils import shape_utils
- 以下を例えば
tfhub8.py
という名前で保存し、python3 tfhub8.py
で実行する
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb
import cv2
# For measuring the inference time.
import time
initialization_start_time = time.time()
import multiprocessing
import multiprocessing.sharedctypes
import os
# Use v4l2-ctl --list-formats-ext
MY_CAMERA_WIDTH_ORIG=800 # カメラからキャプチャする画像幅
MY_CAMERA_WIDTH=512 # ニューラルネットに渡す画像幅
MY_CAMERA_HEIGHT_ORIG=600 # カメラからキャプチャする画像高さ
MY_CAMERA_HEIGHT=512 # ニューラルネットに渡す画像高さ
# 画像の中央の部分を取り出してNNに渡す
MY_FPS=15
# キャプチャ映像形式は YUYV に下のほうで決め打ちです
import numpy as np
# Load the COCO Label Map
category_index = {
1: {'id': 1, 'name': 'person'},
2: {'id': 2, 'name': 'bicycle'},
3: {'id': 3, 'name': 'car'},
4: {'id': 4, 'name': 'motorcycle'},
5: {'id': 5, 'name': 'airplane'},
6: {'id': 6, 'name': 'bus'},
7: {'id': 7, 'name': 'train'},
8: {'id': 8, 'name': 'truck'},
9: {'id': 9, 'name': 'boat'},
10: {'id': 10, 'name': 'traffic light'},
11: {'id': 11, 'name': 'fire hydrant'},
13: {'id': 13, 'name': 'stop sign'},
14: {'id': 14, 'name': 'parking meter'},
15: {'id': 15, 'name': 'bench'},
16: {'id': 16, 'name': 'bird'},
17: {'id': 17, 'name': 'cat'},
18: {'id': 18, 'name': 'dog'},
19: {'id': 19, 'name': 'horse'},
20: {'id': 20, 'name': 'sheep'},
21: {'id': 21, 'name': 'cow'},
22: {'id': 22, 'name': 'elephant'},
23: {'id': 23, 'name': 'bear'},
24: {'id': 24, 'name': 'zebra'},
25: {'id': 25, 'name': 'giraffe'},
27: {'id': 27, 'name': 'backpack'},
28: {'id': 28, 'name': 'umbrella'},
31: {'id': 31, 'name': 'handbag'},
32: {'id': 32, 'name': 'tie'},
33: {'id': 33, 'name': 'suitcase'},
34: {'id': 34, 'name': 'frisbee'},
35: {'id': 35, 'name': 'skis'},
36: {'id': 36, 'name': 'snowboard'},
37: {'id': 37, 'name': 'sports ball'},
38: {'id': 38, 'name': 'kite'},
39: {'id': 39, 'name': 'baseball bat'},
40: {'id': 40, 'name': 'baseball glove'},
41: {'id': 41, 'name': 'skateboard'},
42: {'id': 42, 'name': 'surfboard'},
43: {'id': 43, 'name': 'tennis racket'},
44: {'id': 44, 'name': 'bottle'},
46: {'id': 46, 'name': 'wine glass'},
47: {'id': 47, 'name': 'cup'},
48: {'id': 48, 'name': 'fork'},
49: {'id': 49, 'name': 'knife'},
50: {'id': 50, 'name': 'spoon'},
51: {'id': 51, 'name': 'bowl'},
52: {'id': 52, 'name': 'banana'},
53: {'id': 53, 'name': 'apple'},
54: {'id': 54, 'name': 'sandwich'},
55: {'id': 55, 'name': 'orange'},
56: {'id': 56, 'name': 'broccoli'},
57: {'id': 57, 'name': 'carrot'},
58: {'id': 58, 'name': 'hot dog'},
59: {'id': 59, 'name': 'pizza'},
60: {'id': 60, 'name': 'donut'},
61: {'id': 61, 'name': 'cake'},
62: {'id': 62, 'name': 'chair'},
63: {'id': 63, 'name': 'couch'},
64: {'id': 64, 'name': 'potted plant'},
65: {'id': 65, 'name': 'bed'},
67: {'id': 67, 'name': 'dining table'},
70: {'id': 70, 'name': 'toilet'},
72: {'id': 72, 'name': 'tv'},
73: {'id': 73, 'name': 'laptop'},
74: {'id': 74, 'name': 'mouse'},
75: {'id': 75, 'name': 'remote'},
76: {'id': 76, 'name': 'keyboard'},
77: {'id': 77, 'name': 'cell phone'},
78: {'id': 78, 'name': 'microwave'},
79: {'id': 79, 'name': 'oven'},
80: {'id': 80, 'name': 'toaster'},
81: {'id': 81, 'name': 'sink'},
82: {'id': 82, 'name': 'refrigerator'},
84: {'id': 84, 'name': 'book'},
85: {'id': 85, 'name': 'clock'},
86: {'id': 86, 'name': 'vase'},
87: {'id': 87, 'name': 'scissors'},
88: {'id': 88, 'name': 'teddy bear'},
89: {'id': 89, 'name': 'hair drier'},
90: {'id': 90, 'name': 'toothbrush'},
}
def camera_reader(out_buf, buf1_ready):
# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
#import cv2
os.nice(19) # Make the priority of this process the lowest.
try:
capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
except TypeError:
capture = cv2.VideoCapture(0)
if capture.isOpened() is False:
raise IOError
if isinstance(capture.get(cv2.CAP_PROP_CONVERT_RGB), float):
capture.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)
else:
capture.set(cv2.CAP_PROP_CONVERT_RGB, False)
has_cap_buffer_size = True
try: cv2.CAP_PROP_BUFFERSIZE
except NameError: has_cap_buffer_size = False
if has_cap_buffer_size: capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'))
capture.set(cv2.CAP_PROP_FRAME_WIDTH, MY_CAMERA_WIDTH_ORIG)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, MY_CAMERA_HEIGHT_ORIG)
capture.set(cv2.CAP_PROP_FPS, MY_FPS)
while(True):
try:
ret, frame = capture.read()
if ret is False:
print("Check v4l2-ctl --list-formats-ext\a")
raise IOError
#print(np.shape(frame))
cropped_image = cv2.cvtColor(frame[MY_CAMERA_HEIGHT_ORIG//2 - MY_CAMERA_HEIGHT//2 : MY_CAMERA_HEIGHT_ORIG//2 + MY_CAMERA_HEIGHT//2, MY_CAMERA_WIDTH_ORIG//2 - MY_CAMERA_WIDTH//2 : MY_CAMERA_WIDTH_ORIG//2 + MY_CAMERA_WIDTH//2, :], cv2.COLOR_YUV2RGB_YUYV)
buf1_ready.clear()
np.asarray(out_buf)[:] = np.reshape(cropped_image, (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
buf1_ready.set()
except KeyboardInterrupt:
break
capture.release()
def annotator(in_buf, out_buf, buf1_ready):
# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
#import cv2
# For running inference on the TF-Hub module.
import tensorflow as tf
import tensorflow_hub as hub
# the next is from https://github.com/tensorflow/models/blob/master/research/object_detection/utils/visualization_utils.py
import visualization_utils as viz_utils
# Print Tensorflow version
print(tf.__version__)
# Check available GPU devices.
print("The following GPU devices are available: %s" % tf.test.gpu_device_name())
# https://tfhub.dev/tensorflow/collections/object_detection/1 のモデルを使える
#module_handle = "https://tfhub.dev/tensorflow/centernet/hourglass_512x512/1"
#module_handle = "https://tfhub.dev/tensorflow/ssd_mobilenet_v1/fpn_640x640/1"
#module_handle = "https://tfhub.dev/tensorflow/ssd_mobilenet_v2/fpnlite_640x640/1"
module_handle = "https://tfhub.dev/tensorflow/efficientdet/d0/1"
#module_handle = "https://tfhub.dev/tensorflow/efficientdet/d4/1"
detector = hub.load(module_handle)
print("Initialization time: \a", time.time() - initialization_start_time)
end_time = time.time()
while True:
buf1_ready.wait()
rgb_img = np.array(np.reshape(in_buf, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3)), copy=True)
buf1_ready.clear()
converted_img = tf.image.convert_image_dtype(rgb_img, tf.uint8)[tf.newaxis, ...]
start_time = time.time()
detections = detector(converted_img)
last_end_time = end_time
end_time = time.time()
#result = {key:value.numpy() for key,value in result.items()}
#print("Found %d objects." % len(result["detection_scores"]))
print("Inference time: ", end_time-start_time, end="")
print(" Total time: ", end_time-last_end_time)
image_np_with_detections = rgb_img.copy()
viz_utils.visualize_boxes_and_labels_on_image_array(
image_np_with_detections,
detections['detection_boxes'][0].numpy(),
detections['detection_classes'][0].numpy().astype(np.int32),
detections['detection_scores'][0].numpy(),
category_index,
use_normalized_coordinates=True,
max_boxes_to_draw=200,
min_score_thresh=.40,
agnostic_mode=False)
np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(image_np_with_detections, cv2.COLOR_RGB2BGR) , (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
multiprocessing.set_start_method('fork')
buf1 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf2 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
p1=multiprocessing.Process(target=camera_reader, args=(buf1,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=annotator, args=(buf1,buf2,buf1_ready), daemon=True)
p2.start()
p1.start()
image_with_boxes = np.empty((MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3), dtype=np.uint8)
os.nice(19) # Make the priority of this process the lowest.
while True:
try:
np.asarray(image_with_boxes)[:,:,:] = np.reshape(buf2, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3))
cv2.imshow('frame', image_with_boxes)
cv2.waitKey(10)
except KeyboardInterrupt:
# 終わるときは CTRL + C を押す
print("Waiting camera reader to finish.")
p1.join(10)
break
cv2.destroyAllWindows()