LoginSignup
1
2

More than 3 years have passed since last update.

Mask R-CNNをOpticalFlowで補間できないか試してみた

Posted at

概要

Mask R-CNNは、物体検出とインスタンスセグメンテーションを行うモデル。
ピクセル単位でセグメンテーションができるので、特定の人物だけをマスクするなんてことも可能。
ただ、1フレーム処理するのにやっぱり時間がかかるので、リアルタイム処理は厳しいですね。
そこで、物体検出と物体検出の間のフレームは、オプティカルフローでマスク画像の動きの変化を推定することで補間できないか試してみました。

方法

Mask R-CNNは、matterport版の実装を利用します。
コードは、AI Coordinatorさんの記事を参考にしました。
これに対して、補間処理を追加しました。
例えば、30fpsの入力動画に対して、10フレーム間隔で物体検出を行う場合、物体検出と物体検出の間の9フレームは、Opencvに実装された「密なオプティカルフロー」でピクセル毎の移動ベクトルを求め、マスク画像を更新します。

環境

だいたいこんな感じです。

Anaconda3 2019.10
Python 3.7.6
opencv 3.4.9
tensorflow-gpu 2.1.0

検証パラメータ

コード中の以下の値をいろいろと変更して動作を検証できます。

file = "input.mp4"
物体検出させる動画ファイルを指定します。

width=640, height=320
物体検出する際に、ここで指定したサイズに画像をサイズ変更します。
サイズが小さいほうが処理時間も短くなります。

detect_fps = 1
物体検出を行う間隔(fps)を指定します。

debug_restrict_class = 1
物体検出におけるクラス識別を制限することができます。
この値を1にすると、class_filterリストで指定したクラス以外の検出は無視されます。
また、制限を行う場合には、クラス毎に指定した色でマスク画像を描画するようになります。

debug_score_threshould = 0.900
物体検出における識別スコア値の閾値を指定できます。
閾値未満の検出結果は無視されます。

debug_max_instances = 100
物体検出における検出オブジェクトの最大数を指定できます。

debug_display_rect = 0
検出オブジェクトを矩形で囲む描画を行うか否か。

debug_display_text = 0
検出オブジェクトの矩形上部にクラス名とスコア値の描画を行うか否か。

debug_display_mask = 1
検出オブジェクトのピクセル単位でのマスク画像を描画するか否か。

debug_update_masks = 1
オプティカルフローによるマスク画像の更新を行うか否か。

debug_update_masks_adding = 0
オプティカルフローによるマスク画像の更新において、前フレームのマスク画像に重ねて次フレームのマスク画像を描画するか否か。

検証

  • オプティカルフローで補間できるのは、せいぜい2~3フレームまで。
  • 物体が手前に近づいたりする場合、物体を構成するピクセル数が増加するので、マスク画像をオプティカルフローで動かしても隙間が生まれてしまう。(当たり前ですね...)
  • 四肢を持つ人のように、動きとともに形が頻繁に変化する場合は、オプティカルフローでは追いきれない。

結果

例えば、車のように形がある程度固定しているものをオプティカルフローで補間する分には、少しは役に立つかもしれないけど、実用的とは言えませんでした。とほほ。

コード

mask_rcnn_with_tracking.py
"""
    Based on https://ai-coordinator.jp/mask-r-cnn
"""
import os
import sys
import random
import math
import numpy as np
#import skimage.io
#import matplotlib
#import matplotlib.pyplot as plt

sys.path.append(os.path.abspath("matterport/Mask_RCNN"))

from samples.coco import coco
from mrcnn import utils
import mrcnn.model as modellib

import cv2
import colorsys

ROOT_DIR = os.getcwd()
MODEL_DIR = os.path.join(ROOT_DIR, "logs")

COCO_MODEL_PATH = os.path.join(ROOT_DIR, "mask_rcnn_coco.h5")
if not os.path.exists(COCO_MODEL_PATH):
    utils.download_trained_weights(COCO_MODEL_PATH)

class InferenceConfig(coco.CocoConfig):
    GPU_COUNT = 1
    IMAGES_PER_GPU = 1

config = InferenceConfig()

model = modellib.MaskRCNN(mode="inference", model_dir=MODEL_DIR, config=config)
model.load_weights(COCO_MODEL_PATH, by_name=True)

class_names = ['BG', 'person', 'bicycle', 'car', 'motorcycle', 'airplane',
               'bus', 'train', 'truck', 'boat', 'traffic light',
               'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird',
               'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear',
               'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie',
               'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
               'kite', 'baseball bat', 'baseball glove', 'skateboard',
               'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
               'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
               'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
               'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed',
               'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote',
               'keyboard', 'cell phone', 'microwave', 'oven', 'toaster',
               'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors',
               'teddy bear', 'hair drier', 'toothbrush']

class_filter = ['person',
                'bicycle',
                'car',
                'motorcycle',
                'bus',
                'truck',
                'cat',
                'backpack']
class_colors = [[  0/255, 165/255, 255/255],    # orange
                [  0/255, 255/255,   0/255],    # lime
                [255/255, 255/255,   0/255],    # cyan
                [130/255,   0/255,  75/255],    # indigo
                [  0/255, 128/255, 128/255],    # olive
                [  0/255, 128/255,   0/255],    # green
                [140/255, 230/255, 240/255],    # khaki
                [255/255,   0/255,   0/255]]    # blue

file = "input.mp4"
cap = cv2.VideoCapture(file)

width  = 640    # 640, 320
height = 360    # 360, 180

input_fps = cap.get(cv2.CAP_PROP_FPS)
detect_fps = 1

debug_restrict_class = 1
debug_score_threshould = 0.900
debug_max_instances = 100
debug_display_rect = 0
debug_display_text = 0
debug_display_mask = 1
debug_update_masks = 1
debug_update_masks_adding = 0

input_fps = round(input_fps)
if input_fps < 1: input_fps = 1
if detect_fps > input_fps: detect_fps = input_fps

def random_colors(N, bright=True):
    brightness = 1.0 if bright else 0.7
    hsv = [(i / N, 1, brightness) for i in range(N)]
    colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv))
    random.shuffle(colors)
    return colors

def apply_mask(image, mask, color, alpha=0.5):
    for c in range(3):
        image[:, :, c] = np.where(mask == 1,
                                  image[:, :, c] *
                                  (1 - alpha) + alpha * color[c] * 255,
                                  image[:, :, c])
    return image

def display_instances(image, boxes, masks, class_ids, class_names,
                      scores=None, title="",
                      figsize=(16, 16), ax=None):
    N = boxes.shape[0]
    if not N:
        print("\n*** No instances to display *** \n")
    else:
        assert boxes.shape[0] == masks.shape[-1] == class_ids.shape[0]

    colors = random_colors(N)

    masked_image = image.copy()
    total_instance = 0
    for i in range(N):
        label = class_names[class_ids[i]]
        score = scores[i] if scores is not None else None
        total_instance = total_instance + 1

        if check_ignore_instance(label, score, total_instance):
            break

        # Color
        if debug_restrict_class != 0:
            color = get_class_color(label)
        else:
            color = colors[i]

        # Bounding box
        if not np.any(boxes[i]):
            continue
        y1, x1, y2, x2 = boxes[i]
        camera_color = (color[0] * 255, color[1] * 255, color[2] * 255)
        if debug_display_rect != 0:
            cv2.rectangle(masked_image, (x1, y1), (x2, y2), camera_color , 1)

        # Label
        if debug_display_text != 0:
            x = random.randint(x1, (x1 + x2) // 2)
            caption = "{} {:.3f}".format(label, score) if score else label
            camera_font = cv2.FONT_HERSHEY_PLAIN
            cv2.putText(masked_image,caption,(x1, y1),camera_font, 1, camera_color)

        # Mask
        if debug_display_mask != 0:
            mask = masks[:, :, i]
            masked_image = apply_mask(masked_image, mask, color)

    return masked_image.astype(np.uint8)

def get_class_color(class_name):
    return class_colors[class_filter.index(class_name)]

def check_ignore_instance(label, score, total_instance):
    if (debug_restrict_class != 0) and (not label in class_filter):
        return True
    if score < debug_score_threshould:
        return True
    if total_instance > debug_max_instances:
        return True
    return False

def update_new_mask(new_mask, mask, width, height, flow):
    index_list = np.where(mask == 1)
    N = len(index_list[0])
    for i in range(N):
        x = index_list[1][i]
        y = index_list[0][i]
        index_list[1][i] = max(min(x + int(round(flow[y, x, 0])), (width - 1)), 0)
        index_list[0][i] = max(min(y + int(round(flow[y, x, 1])), (height - 1)), 0)
    new_mask[index_list] = 1

def update_masks_by_flow(masks, class_ids, class_names, scores, flow):
    N = masks.shape[-1]

    if debug_update_masks_adding == 0:
        new_masks = np.zeros_like(masks)
    else:
        new_masks = np.copy(masks)

    total_instance = 0
    for i in range(N):
        label = class_names[class_ids[i]]
        score = scores[i] if scores is not None else None
        total_instance = total_instance + 1

        if check_ignore_instance(label, score, total_instance):
            break

        update_new_mask(new_masks[:, :, i], masks[:, :, i], width, height, flow)

    return new_masks

def main():
    frame_no = 0
    image_base = []
    r = []
    while(True):

        # 動画ストリームからフレームを取得
        ret, frame = cap.read()
        if ret == False:
            break

        # カメラ画像をリサイズ
        image_cv2 = cv2.resize(frame,(width,height))

        frame_no = frame_no + 1
        if (input_fps == detect_fps) or frame_no % round(input_fps / detect_fps) == 1:
            image_base = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2GRAY)

            results = model.detect([image_cv2])
            r = results[0]
        else:
            if debug_update_masks != 0:
                image_next = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2GRAY)
                flow = cv2.calcOpticalFlowFarneback(image_base, image_next, None, 0.5, 3, 15, 3, 5, 1.1, 0)
                image_base = image_next

                r['masks'] = update_masks_by_flow(r['masks'], r['class_ids'],
                                class_names, r['scores'], flow)

        camera = display_instances(image_cv2, r['rois'], r['masks'], r['class_ids'], 
                            class_names, r['scores'])

        cv2.imshow("camera window", camera)

        # escを押したら終了。
        if cv2.waitKey(1) == 27:
            break

    #終了
    cap.release()
    cv2.destroyAllWindows()


if __name__ == '__main__':
    main()

補足情報

AttributeError: module 'tensorflow' has no attribute 'log'

以上

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2