Help us understand the problem. What is going on with this article?

Colab上でwebカメラをリアルタイムに処理

はじめに

グーグルコラボではコードスニペットの「Camera Capture」を使うとwebカメラから画像をキャプチャできます。
動画としてリアルタイムに処理させるのコードを書いたのでメモします。
いろいろ試した結果,結構無理やりなので他にもっといい方法があるかもしれません。

このようにUGATITをコラボ上だけでリアルタイムに動かしたり出来ます。

notebookはgithubにあります。
a2kiti/webCamGoogleColab

websocketを使う方法

1つのプロセスでjavascriptを動かしウェブカメラをキャプチャーし,もう一つのプロセスでwebsocketサーバーを立ち上げて,通信させて処理します。
ソケット通信をコラボ上でつなげることがうまく出来なかったので,ngrokを経由させています。
2つプロセスを動かす必要があるのでmultiprocessingを使う必要があります。
フレームレートは通信がネックになるようで,できるだけ軽くするために一枚一枚画像をリサイズしてからjpegに圧縮しています。

まず必要なライブラリを入れます。

!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install bottle
!pip install bottle_websocket
!pip install gevent

つぎにwebカメラを使うためのjavascriptを書きます。

from IPython.display import display, Javascript
from google.colab.output import eval_js

def use_cam(url, quality=0.8):
  print("start camera")
  js = Javascript('''
    async function useCam(url, quality) {
      const div = document.createElement('div');
      document.body.appendChild(div);
      //video element
      const video = document.createElement('video');
      video.style.display = 'None';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      //canvas for display. frame rate is depending on display size and jpeg quality.
      display_size = 500 
      const src_canvas = document.createElement('canvas');
      src_canvas.width  = display_size;
      src_canvas.height = display_size * video.videoHeight / video.videoWidth;
      const src_canvasCtx = src_canvas.getContext('2d');
      src_canvasCtx.translate(src_canvas.width, 0);
      src_canvasCtx.scale(-1, 1);
      div.appendChild(src_canvas);

      const dst_canvas = document.createElement('canvas');
      dst_canvas.width  = src_canvas.width;
      dst_canvas.height = src_canvas.height;
      const dst_canvasCtx = dst_canvas.getContext('2d');
      div.appendChild(dst_canvas);

      //exit button
      const btn_div = document.createElement('div');
      document.body.appendChild(btn_div);
      const exit_btn = document.createElement('button');
      exit_btn.textContent = 'Exit';
      var exit_flg = true
      exit_btn.onclick = function() {exit_flg = false};
      btn_div.appendChild(exit_btn);

      //log
      let jsLog = function(abc) {
        document.querySelector("#output-area").appendChild(document.createTextNode(`${abc} `));
      }
      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      //for websocket connection.
      var connection = 0
      var flag_count = 0
      var send_flg = false

      // loop
      _canvasUpdate();
      async function _canvasUpdate() {
            flag_count += 1

            //wait until websocket launch
            if (flag_count == 200){
                connection = new WebSocket(url); 
                jsLog("Connect_start")
            }
            if (flag_count == 300){
                connection.onmessage = function(e) {
                    var image = new Image()
                    image.src = e.data;
                    image.onload = function(){dst_canvasCtx.drawImage(image, 0, 0)}
                    send_flg=false
                };
                jsLog("Set_recieve")
            }
            if(flag_count > 400){
                //resize to reduce file size
                src_canvasCtx.drawImage(video, 0, 0, video.videoWidth, video.videoHeight, 0, 0, src_canvas.width, src_canvas.height);
                const img = src_canvas.toDataURL('image/jpeg', quality);
                if (send_flg==false){
                    connection.send(img);
                    send_flg = true
                }
            }
          if (exit_flg){
              requestAnimationFrame(_canvasUpdate);   
          }else{
              stream.getVideoTracks()[0].stop();
              connection.close();
          }
      };
    }
    ''')
  display(js)
  data = eval_js('useCam("{}", {})'.format(url, quality))

ngrokを使いurlを発行します。
次のセルで使うのでテキストファイルに吐いています。

get_ipython().system_raw('./ngrok http 6006 &')
! curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json;url = json.load(sys.stdin)['tunnels'][0]['public_url'];print(url);f = open('url.txt', 'w');f.write(url);f.close()"

処理を行うwebsocketサーバーを立ち上げます。
out_img = cv2.Canny(decimg,100,200)
の部分を変えれば好きな処理を行うことが出来ます。

import numpy as np
import cv2
import json
import bottle
import gevent
from bottle.ext.websocket import GeventWebSocketServer
from bottle.ext.websocket import websocket
from multiprocessing import Pool
from PIL import Image
from io import BytesIO
import base64

socket = bottle.Bottle()
@socket.route('/', apply=[websocket])
def wsbin(ws):
    while True:
        try:
            #decode to image
            img_str = ws.receive()
            decimg = base64.b64decode(img_str.split(',')[1], validate=True)
            decimg = Image.open(BytesIO(decimg))
            decimg = np.array(decimg, dtype=np.uint8); 
            decimg = cv2.cvtColor(decimg, cv2.COLOR_BGR2RGB)

            #############your process###############

            out_img = cv2.Canny(decimg,100,200)
            #out_img = decimg

            #############your process###############

            #encode to string
            _, encimg = cv2.imencode(".jpg", out_img, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
            img_str = encimg.tostring()
            img_str = "data:image/jpeg;base64," + base64.b64encode(img_str).decode('utf-8')
            ws.send(img_str)
        except:
            pass
            #print("error")

if __name__ == '__main__':
    # get ngrok url
    f = open("url.txt", "r")
    url = f.read()
    f.close()
    url = "wss" + url[5:]
    # prepare multiprocess
    _pool = Pool(processes=2)
    _pool.apply_async(use_cam, (url, 0.8))
    socket.run(host='0.0.0.0', port=6006, server=GeventWebSocketServer)

すると以下のようにキャプチャされたカメラ映像と処理された映像がリアルタイムに表示されます。

image.png

コールバックを使う方法

もっとシンプルな方法として,コラボで用意されてるコールバックを使う方法もあります。
こちらの方法だとより簡単にかけましたが,コールバックが帰ってくるのがあまり早くないのか,websocketを使う方法よりもフレームレートが遅くなってしまいました。
とはいえwebsocketの方もそんなに早くないのでこっちで十分かもしれません。

処理を行うコールバックを作成します。

import IPython
from google.colab import output
import cv2
import numpy as np
from PIL import Image
from io import BytesIO
import base64

def run(img_str):
    #decode to image
    decimg = base64.b64decode(img_str.split(',')[1], validate=True)
    decimg = Image.open(BytesIO(decimg))
    decimg = np.array(decimg, dtype=np.uint8); 
    decimg = cv2.cvtColor(decimg, cv2.COLOR_BGR2RGB)

    #############your process###############

    out_img = cv2.Canny(decimg,100,200)
    #out_img = decimg

    #############your process###############

    #encode to string
    _, encimg = cv2.imencode(".jpg", out_img, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
    img_str = encimg.tostring()
    img_str = "data:image/jpeg;base64," + base64.b64encode(img_str).decode('utf-8')
    return IPython.display.JSON({'img_str': img_str})

output.register_callback('notebook.run', run)

webカメラを使うjavascriptを書きます。

from IPython.display import display, Javascript
from google.colab.output import eval_js

def use_cam(quality=0.8):
  js = Javascript('''
    async function useCam(quality) {
      const div = document.createElement('div');
      document.body.appendChild(div);
      //video element
      const video = document.createElement('video');
      video.style.display = 'None';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      //canvas for display. frame rate is depending on display size and jpeg quality.
      display_size = 500 
      const src_canvas = document.createElement('canvas');
      src_canvas.width  = display_size;
      src_canvas.height = display_size * video.videoHeight / video.videoWidth;
      const src_canvasCtx = src_canvas.getContext('2d');
      src_canvasCtx.translate(src_canvas.width, 0);
      src_canvasCtx.scale(-1, 1);
      div.appendChild(src_canvas);

      const dst_canvas = document.createElement('canvas');
      dst_canvas.width  = src_canvas.width;
      dst_canvas.height = src_canvas.height;
      const dst_canvasCtx = dst_canvas.getContext('2d');
      div.appendChild(dst_canvas);

      //exit button
      const btn_div = document.createElement('div');
      document.body.appendChild(btn_div);
      const exit_btn = document.createElement('button');
      exit_btn.textContent = 'Exit';
      var exit_flg = true
      exit_btn.onclick = function() {exit_flg = false};
      btn_div.appendChild(exit_btn);

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      var send_num = 0
      // loop
      _canvasUpdate();
      async function _canvasUpdate() {
            src_canvasCtx.drawImage(video, 0, 0, video.videoWidth, video.videoHeight, 0, 0, src_canvas.width, src_canvas.height);     
            if (send_num<1){
                send_num += 1
                const img = src_canvas.toDataURL('image/jpeg', quality);
                const result = google.colab.kernel.invokeFunction('notebook.run', [img], {});
                result.then(function(value) {
                    parse = JSON.parse(JSON.stringify(value))["data"]
                    parse = JSON.parse(JSON.stringify(parse))["application/json"]
                    parse = JSON.parse(JSON.stringify(parse))["img_str"]
                    var image = new Image()
                    image.src = parse;
                    image.onload = function(){dst_canvasCtx.drawImage(image, 0, 0)}
                    send_num -= 1
                })
            }
            if (exit_flg){
                requestAnimationFrame(_canvasUpdate);   
            }else{
                stream.getVideoTracks()[0].stop();
            }
      };
    }
    ''')
  display(js)
  data = eval_js('useCam({})'.format(quality))

動かします。止めるときはexitボタンで。

use_cam()
a2kiti
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away