16
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

WebRTCの映像を止めないように、別スレッドでYOLO v3の物体検出を行う

Posted at

はじめに

PythonのWebRTC実装aiortcと、物体検出のYOLO v3の連携を試みる」では、PythonのWebRTC実装であるaiortcとdarknetのYOLOv3を無理やり連携させました。フレームごとにYOLOv3を呼び出していますが、処理に1秒以上かあるため映像が止まってしまうという致命的な問題がありました。

今回は検出頻度を落としてYOLOv3をワーカースレッドで動かすことで、映像を止めないことを目指してみました。

Pythonでのスレッド処理、非同期処理

Pythonでスレッド処理はどうしたらよいのか途方に暮れていたところ、こちらの書籍を発見しました。さまざまな方法を紹介していてとても参考になりました。

この本によるとPythonによる並行処理は複数のモデルがあり、それぞれ特徴がことなるようです。

  • マルチスレッド
  • マルチプロセス
  • 非同期プログラミング
    • 非同期I/O (aio)
    • async/await (コルーチン)
    • Executor/Future

aiortcはすでに非同期I/Oをベースに作られているため、さらに別の並行処理を組み合わせるの苦労しました。試行錯誤の末、Executor/Futureを組みあせることでなんとか実現できました。

やったこと

  • 物体検出用のワーカースレッドのプール(スレッド数は1)を用意しておく
    • ステータスと結果を示すフラグ、配列も用意する
  • 映像フレームが届いたら、ワーカーの状態を示すフラグをチェックする
    • ワーカーが仕事中でなければ、物体検知の処理を開始する
    • ワーカーが仕事中だったら、物体検知はしない
  • 映像フレームに、前回の検出結果を使って枠とラベルを描画する
  • ワーカーは検出が終了したら、結果を保持する配列を更新する
    • 結果は1接続分しか保持していなので、複数のクライアントから接続すると正しく動かない

映像の検出は1秒以上間隔があいてしまいますが、別スレッドにすることで映像が止まらないようになりました。

実際にはDockerで動かしているせいか、動きがぎくしゃくしてしまいます。またしばらくすると画面が乱れて止まってしまいます。どこかでメモリーリークしているのかもしれません。

Dockefile

インストールするパッケージは前回と同じですが、サンプルのコードが追加になります。

# Ubuntu 18.04 and aiortc
#  aiortc: https://github.com/jlaine/aiortc

FROM ubuntu:18.04
MAINTAINER mganeko

#  
# -- if you are using docker behind proxy, please set ENV --
#

#ENV http_proxy "http://proxy.yourdomain.com:8080/"
#ENV https_proxy "http://proxy.yourdomain.com:8080/"

ENV DEBIAN_FRONTEND nonineractive

# -- build step --
RUN apt update
RUN apt upgrade -y

RUN apt install python3 -y
RUN apt install python3-pip -y
RUN apt install python3-dev -y
RUN python3 -V
RUN pip3 -V
RUN pip3 install --upgrade pip
RUN pip -V


RUN apt install libopus-dev -y
RUN apt install libvpx-dev -y
RUN apt install libffi-dev -y
RUN apt install libssl-dev -y

# -- timezone で止まる
RUN apt install libopencv-dev -y
# RUN DEBIAN_FRONTEND=nonineractive apt install libopencv-dev -y


RUN apt install git -y

RUN mkdir /root/work 
WORKDIR /root/work/
RUN git clone https://github.com/jlaine/aiortc.git

RUN pip install aiohttp
RUN pip install aiortc 
RUN pip install opencv-python

#-- copy ---
#COPY server2.py /root/work/aiortc/examples/server/
#COPY server_yolo.py /root/work/aiortc/examples/server/


#
# ------ yolo v3 ---
#

RUN apt install vim -y
RUN apt install wget -y
WORKDIR /root/work/
RUN git clone https://github.com/pjreddie/darknet.git
WORKDIR /root/work/darknet
RUN make
RUN wget https://pjreddie.com/media/files/yolov3.weights
RUN wget https://pjreddie.com/media/files/yolov3-tiny.weights

RUN ln -s /root/work/darknet/libdarknet.so /usr/lib/libdarknet.so

# --- link ---
RUN ln -s /root/work/darknet/cfg /root/work/aiortc/examples/server/
RUN ln -s /root/work/darknet/data /root/work/aiortc/examples/server/
RUN ln -s /root/work/darknet/yolov3-tiny.weights /root/work/aiortc/examples/server/

#-- copy ---
COPY index_w.html /root/work/aiortc/examples/server/index.html
COPY client_w.js /root/work/aiortc/examples/server/client.js
#COPY client.js /root/work/aiortc/examples/server/client.js
COPY server_yolo_worker.py /root/work/aiortc/examples/server/

# --- for running --
EXPOSE 8080

WORKDIR /root/work/aiortc/examples/server/
#CMD [ "python3", "server_yolo.py" ]
CMD [ "python3", "server_yolo_worker.py" ]

ビルド手順

いつものようにターミナルから docker build を実行します。(イメージ名は適宜付けてください)

$ docker build -t mganeko/aiortc-yolov3 -f Dockerfile .

実行手順

(1) ターミナルから docker run でコンテナを起動します。先ほどつけたイメージ名を指定してください。

docker run -d -p 8001:8080 mganeko/aiortc-yolov3
  • この例ではホストOSの8001ポートをコンテナの8080ポートに接続しています。
  • コンテナの内部では aiortc/examples/server/server_yolo_worker.py を起動しています。

(2) Chromeで http://localhost:8001/ にアクセス
Firefox, Safariでは接続できませんでした

(3) [Use video]をチェック、オプションを選択

今回のオプションは2つだけです。試したいものを選択してください。

  • use YOLO v3 worker ... ワーカースレッドでYOLO v3で物体検出、メインスレッドで検出結果の枠とラベルをopenCVで描画
  • None ... 画像処理なし。クライアント側のカメラの映像をそのまま返す

(4) [Start]ボタンをクリック

数秒経つとブラウザとaiortcの接続が確立します。"use YOLO v3 worker"を選んでいる場合は、ワーカースレッドでは1〜2秒間隔で物体検出を行います。メインスレッド側でリアルタイム映像に前回の検出結果(最大3個)の描画しています。

マシンが十分高速ならリアルタイム映像は安定して流れて、とびとびで物体検出が行われます。変化がゆっくりの映像であれば、ある程度物体検出ができます。

マシンの性能が足りない場合は、映像はギクシャクしたり止まったりして、徐々に乱れてしまうケースがあります。

ワーカースレッドでのYOLO v3の利用

スレッドプールの利用

concurrent.futures から ThreadPoolExecutor をインポートします。結果はFutureで受け取ります。
また、ワーカースレッドの状態と検出結果を管理するために、multiprocessing から Value, RawArray をインポートしています。
検出結果を保持するために ctypes の構造体をつかって、DETECT_RESULT を定義しています。

from ctypes import *

from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Value, RawArray

class DETECT_RESULT(Structure):
    _fields_ = [("left", c_int),
                ("top", c_int),
                ("right", c_int),
                ("bottom", c_int),
                ("cid", c_int)
            ]


# ワーカーの状態を管理するフラグ
workerBusy = Value('i', 0)

# 前回の検出結果を保持する
max_result_count = 3
lastResults = RawArray(DETECT_RESULT, max_result_count)
for i in range(max_result_count):
    lastResults[i] = DETECT_RESULT(0, 0, 0, 0, 0)

# スレッドプールを用意
executor = ThreadPoolExecutor(1)

映像の検出は、VideoTransformTrack の async recv()の中からワーカーをキックします。

class VideoTransformTrack(VideoStreamTrack):
    # ... 省略 ...
 
    async def recv(self):
        frame = await self.received.get()
        self.counter += 1

        if self.transform == 'yolov3_worker':
            with workerBusy.get_lock():
                if (workerBusy.value == 0):
                    # --- kick detect --
                    workerBusy.value = 1
                    img = frame_to_bgr(frame)
                    future = executor.submit(workerDetect, img)
                    future.add_done_callback(onDetected)
                    self.tryDetectCounter += 1
            
            # --- draw last detect ---
            img = frame_to_bgr(frame)
            #rows, cols, _ = img.shape            
            for result in lastResults:
                if result.bottom == 0:
                    break
                name = str(meta.names[result.cid])
                img = cv2.rectangle(img, (result.left, result.top), (result.right, result.bottom), (255, 128,0), 3, 4)
                img = cv2.putText(img, name, (result.left, result.top), cv2.FONT_HERSHEY_PLAIN, 2, (255, 128, 0), 2, 4)

            # --- count ---
            dcount = str(self.tryDetectCounter)
            img = cv2.putText(img, dcount, (10, 20), cv2.FONT_HERSHEY_PLAIN, 2, (255, 128, 128), 2, 4)

            newFrame = frame_from_bgr(img)
            return newFrame
                       
        else:
            # return raw frame
            return frame

workerBusy.get_lock()でロックを取得し、ワーカーの状態を調べます。
検出中ではない(ビジーでない)場合は、新たに物体検出の処理 workerDetect()を開始します。結果はFutureで返ってきますが、完了時のコールバック onDetect() を指定して映像処理が止まらないようにしています。

物体検出を開始した場合もしない場合も、前回の検出結果をフレームに描画して後続の処理につなげます。

物体検出処理

ワーカースレッドで動くとは言え、物体検出の処理は余り変わりません。検出結果の種類はラベルではなく、idの数値で返すところが異なります。


def workerDetect(data):
    img = array_to_image(data)
    res = detectObjects(net, meta, img, alphabet)
    return res

# in: im, out:result(id and rect)
def detectObjects(net, meta, im, alp, thresh=.5, hier_thresh=.5, nms=.45):
    #print('------ start detect ----')
    num = c_int(0)
    pnum = pointer(num)
    predict_image(net, im)
    dets = get_network_boxes(net, im.w, im.h, thresh, hier_thresh, None, 0, pnum)
    num = pnum[0]
    if (nms): do_nms_obj(dets, num, meta.classes, nms);

    res = []
    for j in range(num):
        for i in range(meta.classes):
            if dets[j].prob[i] > 0:
                b = dets[j].bbox
                #res.append((meta.names[i], dets[j].prob[i], (b.x, b.y, b.w, b.h)))
                res.append((i, dets[j].prob[i], (b.x, b.y, b.w, b.h)))
                #print("detect " , meta.names[i], dets[j].prob[i])
    res = sorted(res, key=lambda x: -x[1])
    #print(res)

    free_detections(dets, num)
    return res

検出結果の受け取り

物体検出の処理結果はコールバックで受け取っています。これをスレッド間で共有できる形に変換して保持します。

def onDetected(future):
    #print('--- future on end---')
    #-- get result --
    detect_results = future.result() # <--- wait at here

    # -- clear last results ---
    with workerBusy.get_lock():
        for i in range(max_result_count):
            lastResults[i] = DETECT_RESULT(0, 0, 0, 0, 0)

    # --- convert results ---
    i = 0
    for r in detect_results:
        cid = r[0]
        #name = meta.names[cid]
        b = r[2]
        x = b[0]
        y = b[1]
        w = b[2]
        h = b[3]
        left = int(x - w/2)
        top = int(y - h/2)
        right = int(x + w/2)
        bottom = int(y + h/2)

        # -- update last result ---
        with workerBusy.get_lock():
            lastResults[i] = DETECT_RESULT(left, top, right, bottom, cid)

        i = i + 1
        if i >= max_result_count:
            break

    # -- worker finish --
    with workerBusy.get_lock():
      workerBusy.value = 0

    return

もともとのaiortcのサンプルは複数のクライアントが接続するケースを考慮していますが、今回は物体検出結果は1セットしか保持していないので、クライアント1つだけの接続を想定しています。

おわりに

ワーカースレッドを使うことで、なんとかリアルタイム映像を止めずに、飛び飛びに物体検出を行うことができました。サーバー側でGPUを使えば、検出間隔を短くすることはできるはずです。ただし、実際にサーバー側で処理を行うには厳しい面も見えてきました。

  • 1台のサーバーで、多くのクライアントの映像をリアルタイムで扱うのか厳しい。リアルタイム処理が必要ならGPU付きサーバーを大量に用意する必要があり、コストがかさみそう
  • サーバー側の検出は、転送されてきた映像の画質に左右される。映像が乱れると、途端に検出精度が下がる

実際にリアルタイム映像からの物体検出を行うには、クライアント側(デバイス側)で行うのが望ましいかもしれません。

16
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?