LoginSignup
90
97

More than 3 years have passed since last update.

ラズパイとUSBカメラとTensorFlowで物体検出

Last updated at Posted at 2020-08-22

更新情報: 2021年5月に TensorFlow 2.5, OpenCV 4.5.1, Debian 11 Bullseyeに合わせて更新しプログラムを動作確認した

ラズパイにUSBカメラを繋ぎ、Python 3上のOpenCVを用いて映像を取り込み、リアルタイムにTensorFlowで物体検出する手順です。Tensorflow HubのサンプルとTensorFlow Lite のサンプルを改変して用いて、それぞれ以下のような検出結果を表示します。ラズパイ依存部分は無いので、インテルCPUを積んだノードパソコンとかでも以下のプログラムは実は問題無く動作します(少なくともLinux稼働していれば(などといいつつ後半2つがインテルUbuntu 20.04で動作していなくて直せていない…😭))。ARM特有の話として import cv2 をTensorFlow関連パッケージのimportよりも前に置かないとエラーになるという落とし穴があります。これはインテルCPUだと起きない現象で、pythonのプログラムを自分で書くときは注意が必要です。今までのところ以下の話は

で動作確認しています。ラズパイでは同じハードウェアでも、32bitよりも64ビットのほうがニューラルネットの推論が約2倍速いです(そこまでは速くない…)。しかし、使用メモリ量も増えます。


本記事第一のプログラムによる物体検出
tfhub2.png


本記事第二のプログラムによる物体検出
tflite4.png


記事末尾のプログラムで、カメラから1024x1024の画像を切り出して、EfficientDet D4をに入力すると以下のような検出結果になります。

スクリーンショット 2020-08-24 16-20-41.png

このとき画像1枚の物体検出に必要な時間は約20秒で、そのときのメモリ使用状況は下記のような感じです。

kakinagu@raspi-mate:~$ top

top - 16:47:48 up  1:04,  2 users,  load average: 8.23, 7.74, 7.45
Tasks: 158 total,   3 running, 155 sleeping,   0 stopped,   0 zombie
%Cpu(s): 92.7 us,  4.1 sy,  2.5 ni,  0.3 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
MiB Mem :   7759.4 total,   2330.7 free,   4420.6 used,   1008.1 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used.   3223.9 avail Mem

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
   1753 kakinagu  20   0 6299756   4.2g 286584 S 354.0  55.8 143:53.77 python3
   1754 kakinagu  39  19  715216 136896  66040 S   5.6   1.7  10:03.56 python3
   1752 kakinagu  39  19  925524 129412  81648 R   5.6   1.6  14:45.44 python3

動作確認環境

Raspberry Pi 4B メモリ8 GBモデルで確認したが、メモリはTensorflow Liteを使うなら 2GB、Tensorflow Hubを用いるなら4 GBで十分だと思われる。Raspberry Pi OS Buster の32ビット版と64ビット版で動作確認した。ラズパイ専用のカメラではなくて、そこらへんのパソコンでも使えるUSB接続のカメラを使っていることに注意。ラズパイ専用カメラの場合は Raspberry Piと純正カメラモジュールで監視カメラを作る、おそらく正しい方法 jessie版 (motion + v4l2ドライバ) などに従って bcm2835-v4l2.ko モジュールをカーネルに読み込むとUSBカメラと同様に扱えます

準備

  1. ラズパイでUSBカメラの映像をPythonのOpenCVで表示する などに従って、USBカメラの映像をラズパイで取り込み・表示できるように設定する
  2. ラズパイへのTensorFlow 2.4簡単インストールして物体検出 などに従って、Tensorflow 2.2以上、Tensorflow Hubをインストールする

物体検出プログラム

この後の話は TensorFlowでの物体検出が超手軽にできる「Object Detection Tools」をTensorFlow 2.xに対応しました と本質的(というか機能的に)に同じです

Tensorflow Hubのサンプルの改造版

https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb のサンプルはインターネットから画像をダウンロードして物体検出しているが、それをUSBカメラから取り込むように変更した。TensorFlow Hub系のものは起動にかなり時間がかかります。本記事末尾のEfficientDet D0で約8分、D4で約16分程度Raspberry Pi OS 64bit上でかかります。高速化したい場合は TensorFlow Lite形式への変換 を行うと起動が速くなると思います。

  1. カメラの撮影解像度を 800x600 にしてあるので、それを適切に変更する。利用可能な解像度などは v4l2-ctl --list-formats-ext で調べられる。v4l2-ctl コマンドが無ければ sudo apt-get install v4l-utils でインストールできる。
  2. python3 tfhub.py で起動する
  3. 検出するためのニューラルネットワークはMobileNetV2 SSDであるがこれをInception Resnetに変更できる。しかし、そうすると必要な仮想メモリーが10 GB以上になり、ラズパイ4B メモリ8 GBモデルで64ビット版Raspberry Pi OSを用いないと動作しなくなる。MobileNetV2の必要メモリはだいたい3 GBくらいで、32ビット版Raspberry Pi OSでギリギリ動作する。フレームの物体検出にMobileNetV2が掛かる時間は0.7秒くらいであった。
tfhub.py
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb

# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
import cv2

#@title Imports and function definitions

# For running inference on the TF-Hub module.
import tensorflow as tf

import tensorflow_hub as hub

# For drawing onto the image.
import numpy as np
from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps

# For measuring the inference time.
import time


# Print Tensorflow version
print(tf.__version__)

# Check available GPU devices.
print("The following GPU devices are available: %s" % tf.test.gpu_device_name())


def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color,
                               font,
                               thickness=4,
                               display_str_list=()):
  """Adds a bounding box to an image."""
  draw = ImageDraw.Draw(image)
  im_width, im_height = image.size
  (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                ymin * im_height, ymax * im_height)
  draw.line([(left, top), (left, bottom), (right, bottom), (right, top),
             (left, top)],
            width=thickness,
            fill=color)

  # If the total height of the display strings added to the top of the bounding
  # box exceeds the top of the image, stack the strings below the bounding box
  # instead of above.
  display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
  # Each display_str has a top and bottom margin of 0.05x.
  total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)

  if top > total_display_str_height:
    text_bottom = top
  else:
    text_bottom = top + total_display_str_height
  # Reverse list and print from bottom to top.
  for display_str in display_str_list[::-1]:
    text_width, text_height = font.getsize(display_str)
    margin = np.ceil(0.05 * text_height)
    draw.rectangle([(left, text_bottom - text_height - 2 * margin),
                    (left + text_width, text_bottom)],
                   fill=color)
    draw.text((left + margin, text_bottom - text_height - margin),
              display_str,
              fill="black",
              font=font)
    text_bottom -= text_height - 2 * margin


def draw_boxes(image, boxes, class_names, scores, max_boxes=10, min_score=0.1):
  """Overlay labeled boxes on an image with formatted scores and label names."""
  colors = list(ImageColor.colormap.values())

  try:
    font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSansNarrow-Regular.ttf",
                              25)
  except IOError:
    print("Font not found, using default font.")
    font = ImageFont.load_default()

  for i in range(min(boxes.shape[0], max_boxes)):
    if scores[i] >= min_score:
      ymin, xmin, ymax, xmax = tuple(boxes[i])
      display_str = "{}: {}%".format(class_names[i].decode("ascii"),
                                     int(100 * scores[i]))
      color = colors[hash(class_names[i]) % len(colors)]
      #image_pil = Image.fromarray(np.uint8(image)).convert("RGB")
      image_pil = Image.fromarray(image)
      draw_bounding_box_on_image(
          image_pil,
          ymin,
          xmin,
          ymax,
          xmax,
          color,
          font,
          display_str_list=[display_str])
      np.copyto(image, np.array(image_pil))
  return image


module_handle = "https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
# 次行のモデルは仮想メモリを10ギガバイト占有するのでラズパイには重すぎる
#module_handle = "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]

detector = hub.load(module_handle).signatures['default']

capture = cv2.VideoCapture(0)
if capture.isOpened() is False:
  raise("IO Error")

capture.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)


while(True):
  try:
    ret, frame = capture.read()
    if ret is False:
      raise("IO Error")
    rgb_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    converted_img  = tf.image.convert_image_dtype(rgb_img, tf.float32)[tf.newaxis, ...]
    start_time = time.time()
    result = detector(converted_img)
    end_time = time.time()

    result = {key:value.numpy() for key,value in result.items()}

    print("Found %d objects." % len(result["detection_scores"]))
    print("Inference time: ", end_time-start_time)

    image_with_boxes = draw_boxes(
      rgb_img, result["detection_boxes"],
      result["detection_class_entities"], result["detection_scores"])

    cv2.imshow('frame', cv2.cvtColor(image_with_boxes, cv2.COLOR_RGB2BGR))
    cv2.waitKey(1)
  except KeyboardInterrupt:
    # 終わるときは CTRL + C を押す
    break

capture.release()
cv2.destroyAllWindows()

Tensorflow Liteのサンプルの改造版

https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi のサンプルはラズパイ専用カメラで撮影した映像の物体検出しているが、それをUSBカメラから取り込むように変更した。なお、ラズパイUbuntu 20.04ではエラーが出て動かなくて、一応github にイシューを上げてあります(本記事著者のプログラムの問題点も否定しきれない) もっと初歩的なミスだった…💦

  1. カメラの撮影解像度を記事著者のカメラの解像度 800x600 にしてあるので、以下のdetect_usbcamera.pyの解像度を設定している部分を適切に変更する
  2. https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi/download.sh を実行して必要なファイルをダウンロードする
  3. python3 detect_usbcamera.py --model /tmp/detect.tflite --labels /tmp/coco_labels.txt で実行する。必要メモリ量はだいたい1 GB弱である。1フレームの物体検出にニューラルネットワークが掛かる時間は0.2秒くらいであった。
  4. 上記で detect.tflite がいつの間にか消滅しているので、代わりに https://www.tensorflow.org/lite/examples/object_detection/overview からダウンロードできるデータ ssd_mobilenet_v1_1_metadata_1.tflitedetect.tflite の代わりに使用して下さい。 coco_labels.txt は取り敢えずそのまま流用できました。
detect_usbcamera.py
# python3
#
# The following is the original copyright notice.
# This is modified from detect_picamera.py to handle a USB camera
# by OpenCV. The original is from
# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi
#
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example using TF Lite to detect objects with a USB camera."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import cv2
import argparse
import io
import re
import time

from annotation import Annotator

import numpy as np


from PIL import Image
import tensorflow.lite as tflite

def load_labels(path):
  """Loads the labels file. Supports files with or without index numbers."""
  with open(path, 'r', encoding='utf-8') as f:
    lines = f.readlines()
    labels = {}
    for row_number, content in enumerate(lines):
      pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
      if len(pair) == 2 and pair[0].strip().isdigit():
        labels[int(pair[0])] = pair[1].strip()
      else:
        labels[row_number] = pair[0].strip()
  return labels


def set_input_tensor(interpreter, image):
  """Sets the input tensor."""
  tensor_index = interpreter.get_input_details()[0]['index']
  input_tensor = interpreter.tensor(tensor_index)()[0]
  input_tensor[:, :] = image


def get_output_tensor(interpreter, index):
  """Returns the output tensor at the given index."""
  output_details = interpreter.get_output_details()[index]
  tensor = np.squeeze(interpreter.get_tensor(output_details['index']))
  return tensor


def detect_objects(interpreter, image, threshold):
  """Returns a list of detection results, each a dictionary of object info."""
  set_input_tensor(interpreter, image)
  interpreter.invoke()

  # Get all output details
  boxes = get_output_tensor(interpreter, 0)
  classes = get_output_tensor(interpreter, 1)
  scores = get_output_tensor(interpreter, 2)
  count = int(get_output_tensor(interpreter, 3))

  results = []
  for i in range(count):
    if scores[i] >= threshold:
      result = {
          'bounding_box': boxes[i],
          'class_id': classes[i],
          'score': scores[i]
      }
      results.append(result)
  return results


def annotate_objects(annotator, results, labels, CAMERA_WIDTH, CAMERA_HEIGHT):
  """Draws the bounding box and label for each object in the results."""
  for obj in results:
    # Convert the bounding box figures from relative coordinates
    # to absolute coordinates based on the original resolution
    ymin, xmin, ymax, xmax = obj['bounding_box']
    xmin = int(xmin * CAMERA_WIDTH)
    xmax = int(xmax * CAMERA_WIDTH)
    ymin = int(ymin * CAMERA_HEIGHT)
    ymax = int(ymax * CAMERA_HEIGHT)

    # Overlay the box, label, and score on the camera preview
    annotator.bounding_box([xmin, ymin, xmax, ymax])
    annotator.text([xmin, ymin],
                   '%s\n%.2f' % (labels[obj['class_id']], obj['score']))


def main():
  parser = argparse.ArgumentParser(
      formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  parser.add_argument(
      '--model', help='File path of .tflite file.', required=True)
  parser.add_argument(
      '--labels', help='File path of labels file.', required=True)
  parser.add_argument(
      '--threshold',
      help='Score threshold for detected objects.',
      required=False,
      type=float,
      default=0.4)
  args = parser.parse_args()

  labels = load_labels(args.labels)
  interpreter = tflite.Interpreter(args.model)
  interpreter.allocate_tensors()
  _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']

  camera = cv2.VideoCapture(0)
  try:
    if camera.isOpened() is False:
      raise("IO Error")
    camera.set(cv2.CAP_PROP_FRAME_WIDTH, 800)
    camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
    CAMERA_WIDTH = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
    CAMERA_HEIGHT = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
    annotator = Annotator(camera)
    while True:
      annotator.update()
      image = annotator.buffer.resize(
        (input_width, input_height), Image.BICUBIC)
      start_time = time.monotonic()
      results = detect_objects(interpreter, image, args.threshold)
      elapsed_ms = (time.monotonic() - start_time) * 1000

#      annotator.clear()
      annotate_objects(annotator, results, labels, CAMERA_WIDTH, CAMERA_HEIGHT)
      annotator.text([5, 0], '%.1fms' % (elapsed_ms))
      cv2.imshow('frame',cv2.cvtColor(np.asarray(annotator.buffer), cv2.COLOR_RGB2BGR))
      cv2.waitKey(1)
      del image

  except KeyboardInterrupt:
    camera.release()
    cv2.destroyAllWindows()


if __name__ == '__main__':
  main()
annotation.py
# python3
# The following is the original copyright notice.
# This is modified from the original for capturing from the first USB camera with OpenCV.
# The original is from
# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi
#
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An annotation library that draws overlays on the USB camera preview.

Annotations include bounding boxes and text overlays.
Annotations support partial opacity, however only with respect to the content in
the preview. A transparent fill value will cover up previously drawn overlay
under it, but not the camera content under it. A color of None can be given,
which will then not cover up overlay content drawn under the region.
Note: Overlays do not persist through to the storage layer so images saved from
the camera, will not contain overlays.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import cv2

from PIL import Image
from PIL import ImageDraw


class Annotator:
  """Utility for managing annotations on the camera preview."""

  def __init__(self, camera, default_color=None):
    """Initializes Annotator parameters.

    Args:
      camera: cv2.VideoCapture camera object to overlay on top of.
      default_color: PIL.ImageColor (with alpha) default for the drawn content.
    """
    self._camera = camera
    self._dims = (int(camera.get(cv2.CAP_PROP_FRAME_WIDTH)), int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    self.buffer = Image.new('RGB', self._dims)
    self._draw = ImageDraw.Draw(self.buffer)
    self._default_color = default_color or (0xFF, 0, 0)

  def update(self):
    """Draws any changes to the image buffer onto the overlay."""
    ret, captured_frame = self._camera.read()
    if ret is False:
        raise("IO Error")
    self.buffer = Image.fromarray(cv2.cvtColor(captured_frame, cv2.COLOR_BGR2RGB), 'RGB')
    self._draw = ImageDraw.Draw(self.buffer)

  def clear(self):
    """Clears the contents of the overlay, leaving only the plain background."""
    self._draw.rectangle((0, 0) + self._dims, fill=(0, 0, 0, 0x00))

  def bounding_box(self, rect, outline=None, fill=None):
    """Draws a bounding box around the specified rectangle.

    Args:
      rect: (x1, y1, x2, y2) rectangle to be drawn, where (x1, y1) and (x2, y2)
        are opposite corners of the desired rectangle.
      outline: PIL.ImageColor with which to draw the outline (defaults to the
        Annotator default_color).
      fill: PIL.ImageColor with which to fill the rectangle (defaults to None,
        which will *not* cover up drawings under the region).
    """
    outline = outline or self._default_color
    self._draw.rectangle(rect, fill=fill, outline=outline)

  def text(self, location, text, color=None):
    """Draws the given text at the given location.

    Args:
      location: (x, y) point at which to draw the text (upper left corner).
      text: string to be drawn.
      color: PIL.ImageColor to draw the string in (defaults to the Annotator
        default_color).
    """
    color = color or self._default_color
    self._draw.text(location, text, fill=color)

並行処理のよる高速化

物体検出の処理は

  • カメラからの画像取り込み
  • ニューラルネットワークによる推論
  • 推論結果のGUIへの書き出し

3つからなるが、これらは並行して処理できる。Python 3のmultiprocessingでプロセス間で大量のデータを受け渡しつつnumpyで処理する のテクニックを用いて、Tensorflow Hubを用いるバージョンを高速化したものを以下に紹介する。なお、以下のバージョンはUbuntu 20.04附属の python3-opencv のバージョン4.2だとカメラからの読み出しで必ずエラーが起きるので動きません😭 multiprocess の使用を止めるとエラーが起きなくなるという謎の現象がおきていて、対応法が今の所謎です。 うまく動作しないときは、最初に一度だけimport cv2 してそれがmultiprocessing.Processforkされた子プロセスに受け継がれていたが、 multiprocessing.Process で作られた子プロセスの中で個別に import cv2 したらOpenCV4.2でも動作するようになった。全く意味不明である…😭Ubuntu 20.10にしたらそれでもエラーが出たので、tfhub-faster4.py, tfhub8.pyのようになった

tfhub-faster4.py
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb

# OpenCV 3.2 must be loaded before the TensorFlow, see
# https://github.com/opencv/opencv/issues/14884
import cv2

# For measuring the inference time.
import time
initialization_start_time = time.time()

import multiprocessing
import multiprocessing.sharedctypes
import os

# Use v4l2-ctl --list-formats-ext
MY_CAMERA_WIDTH=800
MY_CAMERA_HEIGHT=600
MY_FPS=6

#@title Imports and function definitions


# For drawing onto the image.
import numpy as np
from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps

def draw_bounding_box_on_image(image,
                               ymin,
                               xmin,
                               ymax,
                               xmax,
                               color,
                               font,
                               thickness=4,
                               display_str_list=()):
  """Adds a bounding box to an image."""
  draw = ImageDraw.Draw(image)
  im_width, im_height = image.size
  (left, right, top, bottom) = (xmin * im_width, xmax * im_width,
                                ymin * im_height, ymax * im_height)
  draw.line([(left, top), (left, bottom), (right, bottom), (right, top),
             (left, top)],
            width=thickness,
            fill=color)

  # If the total height of the display strings added to the top of the bounding
  # box exceeds the top of the image, stack the strings below the bounding box
  # instead of above.
  display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
  # Each display_str has a top and bottom margin of 0.05x.
  total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)

  if top > total_display_str_height:
    text_bottom = top
  else:
    text_bottom = top + total_display_str_height
  # Reverse list and print from bottom to top.
  for display_str in display_str_list[::-1]:
    text_width, text_height = font.getsize(display_str)
    margin = np.ceil(0.05 * text_height)
    draw.rectangle([(left, text_bottom - text_height - 2 * margin),
                    (left + text_width, text_bottom)],
                   fill=color)
    draw.text((left + margin, text_bottom - text_height - margin),
              display_str,
              fill="black",
              font=font)
    text_bottom -= text_height - 2 * margin


def draw_boxes(image, boxes, class_names, scores, max_boxes=10, min_score=0.1):
  """Overlay labeled boxes on an image with formatted scores and label names."""
  colors = list(ImageColor.colormap.values())

  try:
    font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSansNarrow-Regular.ttf",
                              25)
  except IOError:
    print("Font not found, using default font.")
    font = ImageFont.load_default()

  for i in range(min(boxes.shape[0], max_boxes)):
    if scores[i] >= min_score:
      ymin, xmin, ymax, xmax = tuple(boxes[i])
      display_str = "{}: {}%".format(class_names[i].decode("ascii"),
                                     int(100 * scores[i]))
      color = colors[hash(class_names[i]) % len(colors)]
      #image_pil = Image.fromarray(np.uint8(image)).convert("RGB")
      image_pil = Image.fromarray(image)
      draw_bounding_box_on_image(
          image_pil,
          ymin,
          xmin,
          ymax,
          xmax,
          color,
          font,
          display_str_list=[display_str])
      np.copyto(image, np.array(image_pil))
  return image


def camera_reader(out_buf, buf1_ready):

  os.nice(19) # Make the priority of this process the lowest.
  try:
    capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
  except TypeError:
    capture = cv2.VideoCapture(0)
  if capture.isOpened() is False:
    raise IOError

  has_cap_buffer_size = True
  try: cv2.CAP_PROP_BUFFERSIZE
  except NameError: has_cap_buffer_size = False
  if has_cap_buffer_size: capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)

  capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'))
  capture.set(cv2.CAP_PROP_FRAME_WIDTH, MY_CAMERA_WIDTH)
  capture.set(cv2.CAP_PROP_FRAME_HEIGHT, MY_CAMERA_HEIGHT)
  #capture.set(cv2.CAP_PROP_FPS, MY_FPS)

  while(True):
    try:
      ret, frame = capture.read()
      if ret is False:
          raise IOError
      buf1_ready.clear()
      np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
      buf1_ready.set()
    except KeyboardInterrupt:
        break
  capture.release()

def annotator(in_buf, out_buf, buf1_ready):
  # For running inference on the TF-Hub module.
  import tensorflow as tf
  import tensorflow_hub as hub
  # Print Tensorflow version
  print(tf.__version__)
  # Check available GPU devices.
  print("The following GPU devices are available: %s" % tf.test.gpu_device_name())

  module_handle = "https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]
  # 次行のモデルは仮想メモリを10ギガバイト占有するのでラズパイには重すぎる
  #module_handle = "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1" #@param ["https://tfhub.dev/google/openimages_v4/ssd/mobilenet_v2/1", "https://tfhub.dev/google/faster_rcnn/openimages_v4/inception_resnet_v2/1"]

  detector = hub.load(module_handle).signatures['default']
  print("Initialization time: ", time.time()-initialization_start_time)
  end_time = time.time()
  while True:
    buf1_ready.wait()
    rgb_img = np.array(np.reshape(in_buf, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3)), copy=True)
    buf1_ready.clear()
    converted_img  = tf.image.convert_image_dtype(rgb_img, tf.float32)[tf.newaxis, ...]
    start_time = time.time()
    result = detector(converted_img)
    last_end_time = end_time
    end_time = time.time()
    result = {key:value.numpy() for key,value in result.items()}

    print("Found %d objects." % len(result["detection_scores"]))
    print("Inference time: ", end_time-start_time, end="")
    print("  Total time: ", end_time-last_end_time)

    image_with_boxes = draw_boxes(
      rgb_img, result["detection_boxes"],
      result["detection_class_entities"], result["detection_scores"])
    np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(image_with_boxes, cv2.COLOR_RGB2BGR) , (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))


multiprocessing.set_start_method('fork')
buf1 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf2 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
p1=multiprocessing.Process(target=camera_reader, args=(buf1,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=annotator, args=(buf1,buf2,buf1_ready), daemon=True)
p2.start()
p1.start()

image_with_boxes = np.empty((MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3), dtype=np.uint8)
os.nice(19) # Make the priority of this process the lowest
while True:
  try:
    np.asarray(image_with_boxes)[:,:,:] = np.reshape(buf2, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3))
    cv2.imshow('frame', image_with_boxes)
    cv2.waitKey(10)
  except KeyboardInterrupt:
    # 終わるときは CTRL + C を押す
    print("Waiting camera reader to finish.")
    p1.join(10)
    break

cv2.destroyAllWindows()

TF Hub にある40個の訓練済みモデルを使えるようにする

TensorFlow 2 meets the Object Detection API で紹介されたように https://tfhub.dev/tensorflow/collections/object_detection/1 に 40 個くらいの物体検出の訓練済みニューラルネットワークがある。これを使えるようにするための作業を紹介する。 基本的には前節と同じだが、検出結果のデータ型が若干違うため微調整が必要である。本当は https://github.com/tensorflow/models/blob/master/research/object_detection/colab_tutorials/inference_from_saved_model_tf2_colab.ipynb に書いてあるようにPythonパッケージをインストールするのが正しい方法だが ARM64 用のTensorFlow Addonsが無くてどうしようもないので、パッケージをつまみ食いするやり方を紹介する。
EfficientDet D0 https://tfhub.dev/tensorflow/efficientdet/d0/1 が、起動に時間がかかる(10分前後)が、約2秒弱で画像一つの推論が終わり占有実メモリが2 GB強で良いと思う。EfficientDet最強のD7を用いると、ラズパイ4Bで画像1枚あたり85秒、実メモリは5 GB強使用します。

from object_detection.core import keypoint_ops
from object_detection.core import standard_fields as fields
from object_detection.utils import shape_utils
  • 以下を例えば tfhub8.py という名前で保存し、 python3 tfhub8.py で実行する
tfhub8.py
# Modified from https://github.com/tensorflow/hub/blob/master/examples/colab/object_detection.ipynb
import cv2

# For measuring the inference time.
import time
initialization_start_time = time.time()

import multiprocessing
import multiprocessing.sharedctypes
import os


# Use v4l2-ctl --list-formats-ext
MY_CAMERA_WIDTH_ORIG=800 # カメラからキャプチャする画像幅
MY_CAMERA_WIDTH=512 # ニューラルネットに渡す画像幅
MY_CAMERA_HEIGHT_ORIG=600 # カメラからキャプチャする画像高さ
MY_CAMERA_HEIGHT=512 # ニューラルネットに渡す画像高さ
# 画像の中央の部分を取り出してNNに渡す
MY_FPS=15
# キャプチャ映像形式は YUYV に下のほうで決め打ちです

import numpy as np



# Load the COCO Label Map
category_index = {
    1: {'id': 1, 'name': 'person'},
    2: {'id': 2, 'name': 'bicycle'},
    3: {'id': 3, 'name': 'car'},
    4: {'id': 4, 'name': 'motorcycle'},
    5: {'id': 5, 'name': 'airplane'},
    6: {'id': 6, 'name': 'bus'},
    7: {'id': 7, 'name': 'train'},
    8: {'id': 8, 'name': 'truck'},
    9: {'id': 9, 'name': 'boat'},
    10: {'id': 10, 'name': 'traffic light'},
    11: {'id': 11, 'name': 'fire hydrant'},
    13: {'id': 13, 'name': 'stop sign'},
    14: {'id': 14, 'name': 'parking meter'},
    15: {'id': 15, 'name': 'bench'},
    16: {'id': 16, 'name': 'bird'},
    17: {'id': 17, 'name': 'cat'},
    18: {'id': 18, 'name': 'dog'},
    19: {'id': 19, 'name': 'horse'},
    20: {'id': 20, 'name': 'sheep'},
    21: {'id': 21, 'name': 'cow'},
    22: {'id': 22, 'name': 'elephant'},
    23: {'id': 23, 'name': 'bear'},
    24: {'id': 24, 'name': 'zebra'},
    25: {'id': 25, 'name': 'giraffe'},
    27: {'id': 27, 'name': 'backpack'},
    28: {'id': 28, 'name': 'umbrella'},
    31: {'id': 31, 'name': 'handbag'},
    32: {'id': 32, 'name': 'tie'},
    33: {'id': 33, 'name': 'suitcase'},
    34: {'id': 34, 'name': 'frisbee'},
    35: {'id': 35, 'name': 'skis'},
    36: {'id': 36, 'name': 'snowboard'},
    37: {'id': 37, 'name': 'sports ball'},
    38: {'id': 38, 'name': 'kite'},
    39: {'id': 39, 'name': 'baseball bat'},
    40: {'id': 40, 'name': 'baseball glove'},
    41: {'id': 41, 'name': 'skateboard'},
    42: {'id': 42, 'name': 'surfboard'},
    43: {'id': 43, 'name': 'tennis racket'},
    44: {'id': 44, 'name': 'bottle'},
    46: {'id': 46, 'name': 'wine glass'},
    47: {'id': 47, 'name': 'cup'},
    48: {'id': 48, 'name': 'fork'},
    49: {'id': 49, 'name': 'knife'},
    50: {'id': 50, 'name': 'spoon'},
    51: {'id': 51, 'name': 'bowl'},
    52: {'id': 52, 'name': 'banana'},
    53: {'id': 53, 'name': 'apple'},
    54: {'id': 54, 'name': 'sandwich'},
    55: {'id': 55, 'name': 'orange'},
    56: {'id': 56, 'name': 'broccoli'},
    57: {'id': 57, 'name': 'carrot'},
    58: {'id': 58, 'name': 'hot dog'},
    59: {'id': 59, 'name': 'pizza'},
    60: {'id': 60, 'name': 'donut'},
    61: {'id': 61, 'name': 'cake'},
    62: {'id': 62, 'name': 'chair'},
    63: {'id': 63, 'name': 'couch'},
    64: {'id': 64, 'name': 'potted plant'},
    65: {'id': 65, 'name': 'bed'},
    67: {'id': 67, 'name': 'dining table'},
    70: {'id': 70, 'name': 'toilet'},
    72: {'id': 72, 'name': 'tv'},
    73: {'id': 73, 'name': 'laptop'},
    74: {'id': 74, 'name': 'mouse'},
    75: {'id': 75, 'name': 'remote'},
    76: {'id': 76, 'name': 'keyboard'},
    77: {'id': 77, 'name': 'cell phone'},
    78: {'id': 78, 'name': 'microwave'},
    79: {'id': 79, 'name': 'oven'},
    80: {'id': 80, 'name': 'toaster'},
    81: {'id': 81, 'name': 'sink'},
    82: {'id': 82, 'name': 'refrigerator'},
    84: {'id': 84, 'name': 'book'},
    85: {'id': 85, 'name': 'clock'},
    86: {'id': 86, 'name': 'vase'},
    87: {'id': 87, 'name': 'scissors'},
    88: {'id': 88, 'name': 'teddy bear'},
    89: {'id': 89, 'name': 'hair drier'},
    90: {'id': 90, 'name': 'toothbrush'},
}


def camera_reader(out_buf, buf1_ready):
  # OpenCV 3.2 must be loaded before the TensorFlow, see
  # https://github.com/opencv/opencv/issues/14884
  #import cv2

  os.nice(19) # Make the priority of this process the lowest.
  try:
    capture = cv2.VideoCapture(0, cv2.CAP_V4L2)
  except TypeError:
    capture = cv2.VideoCapture(0)
  if capture.isOpened() is False:
    raise IOError

  if isinstance(capture.get(cv2.CAP_PROP_CONVERT_RGB), float):
    capture.set(cv2.CAP_PROP_CONVERT_RGB, 0.0)
  else:
    capture.set(cv2.CAP_PROP_CONVERT_RGB, False)

  has_cap_buffer_size = True
  try: cv2.CAP_PROP_BUFFERSIZE
  except NameError: has_cap_buffer_size = False
  if has_cap_buffer_size: capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)

  capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('Y', 'U', 'Y', 'V'))
  capture.set(cv2.CAP_PROP_FRAME_WIDTH, MY_CAMERA_WIDTH_ORIG)
  capture.set(cv2.CAP_PROP_FRAME_HEIGHT, MY_CAMERA_HEIGHT_ORIG)
  capture.set(cv2.CAP_PROP_FPS, MY_FPS)
  while(True):
    try:
      ret, frame = capture.read()
      if ret is False:
        print("Check v4l2-ctl --list-formats-ext\a")
        raise IOError
      #print(np.shape(frame))
      cropped_image = cv2.cvtColor(frame[MY_CAMERA_HEIGHT_ORIG//2 - MY_CAMERA_HEIGHT//2 : MY_CAMERA_HEIGHT_ORIG//2 + MY_CAMERA_HEIGHT//2, MY_CAMERA_WIDTH_ORIG//2 - MY_CAMERA_WIDTH//2 : MY_CAMERA_WIDTH_ORIG//2 + MY_CAMERA_WIDTH//2, :], cv2.COLOR_YUV2RGB_YUYV)
      buf1_ready.clear()
      np.asarray(out_buf)[:] = np.reshape(cropped_image, (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))
      buf1_ready.set()
    except KeyboardInterrupt:
        break
  capture.release()


def annotator(in_buf, out_buf, buf1_ready):
  # OpenCV 3.2 must be loaded before the TensorFlow, see
  # https://github.com/opencv/opencv/issues/14884
  #import cv2
  # For running inference on the TF-Hub module.
  import tensorflow as tf
  import tensorflow_hub as hub
  # the next is from https://github.com/tensorflow/models/blob/master/research/object_detection/utils/visualization_utils.py 
  import visualization_utils as viz_utils
  # Print Tensorflow version
  print(tf.__version__)
  # Check available GPU devices.
  print("The following GPU devices are available: %s" % tf.test.gpu_device_name())

  # https://tfhub.dev/tensorflow/collections/object_detection/1 のモデルを使える
  #module_handle = "https://tfhub.dev/tensorflow/centernet/hourglass_512x512/1"
  #module_handle = "https://tfhub.dev/tensorflow/ssd_mobilenet_v1/fpn_640x640/1"
  #module_handle = "https://tfhub.dev/tensorflow/ssd_mobilenet_v2/fpnlite_640x640/1"
  module_handle = "https://tfhub.dev/tensorflow/efficientdet/d0/1"
  #module_handle = "https://tfhub.dev/tensorflow/efficientdet/d4/1"

  detector = hub.load(module_handle)
  print("Initialization time: \a", time.time() - initialization_start_time)
  end_time = time.time()
  while True:
    buf1_ready.wait()
    rgb_img = np.array(np.reshape(in_buf, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3)), copy=True)
    buf1_ready.clear()
    converted_img  = tf.image.convert_image_dtype(rgb_img, tf.uint8)[tf.newaxis, ...]
    start_time = time.time()
    detections = detector(converted_img)
    last_end_time = end_time
    end_time = time.time()
    #result = {key:value.numpy() for key,value in result.items()}

    #print("Found %d objects." % len(result["detection_scores"]))
    print("Inference time: ", end_time-start_time, end="")
    print("  Total time: ", end_time-last_end_time)

    image_np_with_detections = rgb_img.copy()
    viz_utils.visualize_boxes_and_labels_on_image_array(
        image_np_with_detections,
        detections['detection_boxes'][0].numpy(),
        detections['detection_classes'][0].numpy().astype(np.int32),
        detections['detection_scores'][0].numpy(),
        category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=200,
        min_score_thresh=.40,
        agnostic_mode=False)
    np.asarray(out_buf)[:] = np.reshape(cv2.cvtColor(image_np_with_detections, cv2.COLOR_RGB2BGR) , (MY_CAMERA_WIDTH*MY_CAMERA_HEIGHT*3))


multiprocessing.set_start_method('fork')
buf1 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf2 = multiprocessing.sharedctypes.RawArray('B', MY_CAMERA_HEIGHT*MY_CAMERA_WIDTH*3)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
p1=multiprocessing.Process(target=camera_reader, args=(buf1,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=annotator, args=(buf1,buf2,buf1_ready), daemon=True)
p2.start()
p1.start()


image_with_boxes = np.empty((MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3), dtype=np.uint8)
os.nice(19) # Make the priority of this process the lowest.
while True:
  try:
    np.asarray(image_with_boxes)[:,:,:] = np.reshape(buf2, (MY_CAMERA_HEIGHT, MY_CAMERA_WIDTH, 3))
    cv2.imshow('frame', image_with_boxes)
    cv2.waitKey(10)
  except KeyboardInterrupt:
    # 終わるときは CTRL + C を押す
    print("Waiting camera reader to finish.")
    p1.join(10)
    break

cv2.destroyAllWindows()
90
97
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
90
97