0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Lチカで始めるテスト自動化(27)矩形の画像を動画のフレームから切り出してファイルに保存する

Posted at

1. はじめに

矩形の画像を動画のフレームから切り出してファイルに保存する機能を追加し、例としてrec_startコマンドで録画した動画に重畳している時刻情報をexec_ocrコマンドでテキスト化します。
metadata superimpose

2. テストランナーの改修

2.1 コマンド仕様

コマンド 引数 機能
crop_mov 入力ファイル名
切り出し位置(左上)
切り出し位置(右下)
出力フォルダ名
矩形の画像を動画のフレームから切り出して白黒二値化し出力フォルダに保存する

出力ファイル名は、<入力ファイル名のベースネーム>_<連番(8桁)>.<画像の形式>。

2.2 実装

Python, OpenCVで動画ファイルからフレームを切り出して保存で紹介されているopencv_video_to_still.py (MIT License、Copyright 2019 nkmk.me) をベースにさせていただきました。

crop_mov()
def crop_mov(filename_in, p0, p1, dirname_out, ext="jpg"):
    """
    This function is based on
    https://github.com/nkmk/python-snippets/blob/c4d70389733a34250595f1c17d737ef004f2dcbd/notebook/opencv_video_to_still.py#L1-L27
    MIT License, Copyright 2019 nkmk.me
    """
    cap = cv2.VideoCapture(filename_in)
    if not cap.isOpened():
        return

    os.makedirs(dirname_out, exist_ok=True)
    base_name = os.path.splitext(os.path.basename(filename_in))[0]
    base_path = os.path.join(dirname_out, base_name)

    #digit = len(str(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))))
    digit = 8

    h0 = int(p0.split(':')[0])
    v0 = int(p0.split(':')[1])
    h1 = int(p1.split(':')[0])
    v1 = int(p1.split(':')[1])

    n = 0
    while True:
        ret, frame = cap.read()
        if ret:
            frame2 = frame[v0:v1, h0:h1]
            frame3 = cv2.resize(frame2, dsize=None, fx=2.0, fy=2.0)
            frame4 = cv2.cvtColor(frame3, cv2.COLOR_BGR2GRAY)
            th, frame5 = cv2.threshold(frame4, 16, 255, cv2.THRESH_OTSU)
            #print(th)
            cv2.imwrite('{}_{}.{}'.format(base_path, str(n).zfill(digit), ext), frame5)
            n += 1
        else:
            return

呼び出し側です。
OpenCVが対応している画像フォーマットを5番目の引数に指定するとその形式で出力します(隠しコマンド)。

Testrunner.parser()
        elif cmd[0] == "crop_mov":
            print(len(cmd))
            if len(cmd) == 6:
                crop_mov(cmd[1], cmd[2], cmd[3], cmd[4], cmd[5])
            else:
                crop_mov(cmd[1], cmd[2], cmd[3], cmd[4], "jpg")

2.3 画像切り出しスクリプト

Webカメラの画像をmov10s.mp4というファイル名で10秒録画し時刻部分の画像を切り出して1_dateフォルダへ保存する例です。

mov10s.csv
open_cam,1,1280,720
rec_start,1,mov10s.mp4
sleep,10
rec_stop,1
crop_mov,mov10s.mp4,115:5,230:25,1_date
python testrunner.py runscript mov10s.csv mov10s_res.csv

2.4 実行例

筆者の環境で実行したところ何フレームか取りこぼしが発生していることがわかりました。

  • 環境
    • Webカメラ ロジクールC270n(720p/30fps)
    • ThinkPad X1 Carbon 5th(Core i7-7500U、メモリ16GB)
    • Windows 10 22H2
    • Python 3.10.0
    • opencv-python 4.5.4.60
  • 動画の先頭フレームの時刻:15:45:28.798
    mov10s_00000000.jpg
  • 動画の末尾フレームの時刻:15:45:36.875
    mov10s_00000231.jpg
  • 切り出した画像は232枚(本来は30fps、8.077秒で242枚)
    mov10s.mov frames

3. OCRしてみた

切り出した画像を以下に示すテストスクリプトでOCRし、動画に重畳している時刻の間隔を調べます。きっちり30fps出ていれば間隔は0.033[sec]になります。

mov10socr.csv
open_ocr
exec_ocr,1_date\mov10s_00000000.jpg
export,mov10s_ocr.txt,val
export,mov10s_ocr.txt,writerow
exec_ocr,1_date\mov10s_00000001.jpg
export,mov10s_ocr.txt,val
export,mov10s_ocr.txt,writerow
・・・
・・・
exec_ocr,1_date\mov10s_00000231.jpg
export,mov10s_ocr.txt,val
export,mov10s_ocr.txt,writerow
set path=%PATH%;C:\Program Files (x86)\Tesseract-OCR
set TESSDATA_PREFIX=C:\Program Files (x86)\Tesseract-OCR
python testrunner.py runscript mov10socr.csv mov10socr_res.csv

OCRの結果を確認して誤検出を適宜修正します。
mov10s-ocr-check.png

動画に重畳している時刻の間隔は概ね0.033[sec]を中心に振れていますが2倍の0.066[sec]を超えるケースも発生していました。
mov10s.mp4 frame interval

mov10s.mp4 frame interval number of occurrences

rec_startコマンドで録画を行っているプログラムを以下に抜粋します1。whileループでループ1回ごとにファイルへの書き込みやGUIの更新、キー入力の監視を行っていて、間隔が伸び縮みしたり画像データの取りこぼしが発生するのはこのためと思います。

  1. Webカメラの画像をcv2.VideoCapture.read()で取得する
  2. 取得した画像にtext_over_img()で時刻情報を重畳する
  3. 画像をcv2.VideoWriter.write()でファイルへ書き込む
  4. 画像をcv2.imshow()でGUIに表示する
  5. GUIのキー入力をcv2.waitKey()で待つ
camera.py
    def text_over_img(self, img):
        timestamp = (datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")[:-3])
        videotxt  = self.get_video_txt()
        cv2.rectangle(img, (5,5), (self.w-5,25), (255,255,255), thickness=-1)
        cv2.putText(img, timestamp, ( 10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
        cv2.putText(img, videotxt,  (235,20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)

    def video_record(self, filename):
        if self.cam == False:
            print("CAM Not Ready.")
            return False

        fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
        out = cv2.VideoWriter(filename, fourcc, self.f, (self.w, self.h))
        msg = "CAM " + str(self.camera_id) + " : q to quit"

        while True:
            ret, img = self.cam.read()
            if ret == True:
                self.text_over_img(img)
                out.write(img)
                cv2.imshow(msg, img)

            if (cv2.waitKey(1) == ord('q')) or (self.get_video_rec() == False):
                print("### rec stop ###")
                break

        out.release()
        return True

4. おわりに

  • 時刻を表示している箇所の画像を動画のフレームから切り出してファイルに保存しOCRでテキスト化できました。
  • OCRしたところ間隔がばらついたり時々取りこぼしていることがわかりました。動画に重畳している時刻情報を利用する場合は時間分解能を0.1秒程度2と考えると良さそうです。

付録A. testrunner.pyのソース

testrunner.py
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa

#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2

from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
from beepsound import BEEPSOUND

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

class Uart:
    def __init__(self):
        self.device = []

    def list(self):
        print(self.device)
        return self.device

    def open(self, port, bps, timeout_val, dsrdtr_val):
        h = self.gethandle(port)
        if h == None:
            try:
                h = serial.Serial(port, int(bps), timeout=timeout_val, dsrdtr=int(dsrdtr_val))
                self.device.append(h)
                return True
            except:
                print("ERROR>>>open fail")
                return False
        else:
            print("WARNING>>>port %s is ready." % port)
            return True

    def send(self, port, txdata):
        h = self.gethandle(port)
        if h == None:
            print("ERROR>>>port %s is not ready." % port)
            return False
        else:
            try:
                txdata = txdata + '\n'
                txdata = str.encode(txdata)
                h.write(txdata)
                return True
            except:
                print("ERROR>>>send fail.")
                return False

    def rcvd(self, port):
        h = self.gethandle(port)
        if h == None:
            print("ERROR>>>port %s is not ready." % port)
            return False
        else:
            try:
                rxdata = h.readline().strip().decode('utf-8')
                print(rxdata)
                return rxdata
            except:
                print("ERROR>>>rcvd fail.")
                return False

    def gethandle(self, port):
        h = None
        for dev in self.device:
            if dev:
                if dev.port == port:
                    h = dev
                    break
        return h

class Export:
    def __init__(self):
        #exportfiles
        #概要
        #  exportコマンドで使用するN行3列の二次元配列
        #説明
        #  以下の3つの要素を1行3桁の配列で構造化し、
        #    1.ファイルオブジェクト(open()の戻り値)
        #    2.Writerオブジェクト(csv.writer()の戻り値)
        #    3.ファイルに出力するデータのバッファ
        #  出力するファイルの数だけこの配列を行方向に追加する
        #構造
        #  exportfiles = [ 
        #                 [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
        #                 [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
        #                 ...
        #                ]
        #初期値
        #  空の1行3列の2次元配列で初期化する
        self.exportfiles = [\
                            [ [],[],[] ]\
                           ]

    def parser(self, cmd, script_line, timestamp, val):
        #export,<filename.csv>,str,<string>
        #export,<filename.csv>,linenumber
        #export,<filename.csv>,timestamp
        #export,<filename.csv>,val
        #export,<filename.csv>,writerow
        #export,close
        if cmd[0] == "export":
            #1)引数が3以上の場合に処理を行う
            if len(cmd) >= 3:
                #2)操作対象のファイルディスクリプタを決める
                ##2.1)操作対象のファイルディスクリプタを検索する
                exportfile = None
                i = 0 #exportfiles配列の行番号
                #print(exportfiles)
                for f in self.exportfiles:
                    if f[0]:
                        #print(f[0].name)
                        if f[0].name == cmd[1]:
                            exportfile = f
                            break
                        i = i + 1

                #print("index " + str(i))

                ##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
                if exportfile == None:
                    if i >= 1:
                        self.exportfiles.append([[],[],[]])
                    self.exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
                    self.exportfiles[i][1] = csv.writer(self.exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
                    #print(i, exportfiles)

                if cmd[2] == "str":
                    #print("str  " + self.exportfiles[i][0].name)
                    self.exportfiles[i][2].append(cmd[3])

                elif cmd[2] == "linenumber":
                    #print("line " + self.exportfiles[i][0].name)
                    self.exportfiles[i][2].append(str(script_line))

                elif cmd[2] == "timestamp":
                    #print("time " + self.exportfiles[i][0].name)
                    self.exportfiles[i][2].append(timestamp)

                elif cmd[2] == "val":
                    #print("val  " + self.exportfiles[i][0].name)
                    self.exportfiles[i][2].append(str(val))

                elif cmd[2] == "writerow":
                    #print("wrow " + self.exportfiles[i][0].name)
                    #print(self.exportfiles)
                    self.exportfiles[i][1].writerow(self.exportfiles[i][2])
                    self.exportfiles[i][2].clear()

                else:
                    print("export parameter error")
                    return False

            elif cmd[1] == "close":
                for f in self.exportfiles:
                    if f[0]:
                        print("close " + f[0].name)
                        f[0].close()

            else:
                print("export parameter error")
                return False

        return True

class Testrunner:
    def __init__(self):
        self.is_passed = True
        self.val  = str(UNINITIALIZED)
        self.fval = 0.0
        self.uart = Uart()
        self.dso  = UNINITIALIZED
        self.ocr  = UNINITIALIZED
        self.cmds = ""
        self.cam0 = Camera(0)
        self.cam1 = Camera(1)
        self.cam2 = Camera(2)
        self.cam  = [self.cam0, self.cam1, self.cam2]
        self.beepsound = BEEPSOUND()
        self.export = Export()
        self.script_line = 0

    def parser(self, cmd):
        timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
        self.script_line += 1
        print("##### " + timestamp + " " + str(self.script_line) + " #####")

        cmds = str(self.script_line) + ":"
        for i in range(len(cmd)):
            cmds += str(cmd[i]) + ","
        for i in range(len(self.cam)):
            self.cam[i].set_video_txt(cmds)
        print(cmds)

        if "#" in cmd[0]:
            pass

        elif cmd[0] == "sleep":
            sleep(float(cmd[1]))

        elif cmd[0] == "rusleep":
            self.fval = random.uniform(float(cmd[1]), float(cmd[2]))
            self.val = str(self.fval)
            cmd.append(self.val)
            cmds += self.val
            for i in range(len(self.cam)):
                self.cam[i].set_video_txt(cmds)
            print(cmds)
            sleep(self.fval)

        elif cmd[0] == "prompt":
            print("Enter y/n")
            self.val = input()
            self.val = self.val.lower()
            cmd.append(self.val)
            cmds += self.val
            for i in range(len(self.cam)):
                self.cam[i].set_video_txt(cmds)
            print(cmds)
            sleep(1)
            if self.val != "y":
                self.is_passed = False

        elif cmd[0] == "list_uart":
             self.val = self.uart.list()
             cmd.append(self.val)

        elif cmd[0] == "open_uart":
            try:
                self.is_passed = self.uart.open(cmd[1], cmd[2], 1.0, cmd[3])
            except:
                print("Open Fail")
                self.is_passed = False

        elif cmd[0] == "send_uart":
            ret = self.uart.send(cmd[1], cmd[2])
            if ret == False:
                self.is_passed = False

        elif cmd[0] == "rcvd_uart":
            try:
                self.val = self.uart.rcvd(cmd[1])
                cmd.append(self.val)
            except:
                self.is_passed = False

        elif cmd[0] == "open_dso":
            try:
                self.dso = open_dso()
                print(self.dso)
                if self.dso == UNINITIALIZED:
                    self.is_passed = False
            except:
                self.is_passed = False

        elif cmd[0] == "dso":
            try:
                if "?" in cmd[1]:
                    self.val = self.dso.query(cmd[1]).rstrip().replace(",", "-")
                    cmd.append(self.val)
                else:
                    self.dso.write(cmd[1])
            except:
                self.is_passed = False

        elif cmd[0] == "open_cam":
            if len(cmd) == 2:
                self.cam[int(cmd[1])].open_cam(640, 480)
            else:
                self.cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))

        elif cmd[0] == "close_cam":
            self.cam[int(cmd[1])].close_cam()

        elif cmd[0] == "close_allcam":
            for i in range(len(self.cam)):
                print("CamID:%d" % i)
                if self.cam[i].get_video_rec() == True:
                    self.cam[i].set_video_rec(False)
                    self.cam[i].thread.join()
                else:
                    pass
                    #print("No Recording.")
                self.cam[i].close_cam()

        elif cmd[0] == "capture_cam":
            ret = self.cam[int(cmd[1])].capture_cam(cmd[2])
            if ret == False:
                self.is_passed = False

        elif cmd[0] == "rec_start":
            if self.cam[int(cmd[1])].get_video_rec() == False:
                self.cam[int(cmd[1])].set_video_rec(True)
                self.cam[int(cmd[1])].thread = threading.Thread(target=self.cam[int(cmd[1])].video_record, args=(cmd[2],))
                self.cam[int(cmd[1])].thread.start()
            else:
                print("Already Recording.")

        elif cmd[0] == "rec_stop":
            if self.cam[int(cmd[1])].get_video_rec() == True:
                self.cam[int(cmd[1])].set_video_rec(False)
                self.cam[int(cmd[1])].thread.join()
            else:
                print("No Recording.")

        elif cmd[0] == "crop_img":
            crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

        elif cmd[0] == "crop_mov":
            print(len(cmd))
            if len(cmd) == 6:
                crop_mov(cmd[1], cmd[2], cmd[3], cmd[4], cmd[5])
            else:
                crop_mov(cmd[1], cmd[2], cmd[3], cmd[4], "jpg")

        elif cmd[0] == "open_ocr":
            self.ocr = open_ocr()
            if self.ocr == UNINITIALIZED:
                self.is_passed = False

        elif cmd[0] == "exec_ocr":
            try:
                self.val = exec_ocr(self.ocr, cmd[1])
            except:
                self.is_passed = False
            else:
                cmd.append(str(self.val))

        elif cmd[0] == "exec_labelimg":
            try:
                self.val = exec_labelimg(cmd[1], cmd[2])
            except:
                self.is_passed = False
            else:
                cmd.append(str(self.val))

        #set_servo, port, ID, Val
        elif cmd[0] == "set_servo":
            ret = set_servo(self.uart, cmd[1], int(cmd[2]), int(cmd[3]))
            if ret == False:
                self.is_passed = False

        elif cmd[0] == "run":
            ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
            self.val = ret.stdout.strip()
            print(ret)
            if ret.returncode != 0:
                self.is_passed = False

        elif cmd[0] == "beepsound":
            if cmd[1] == "FurElise":
                self.beepsound.set_tempo(60)
                self.beepsound.FurElise()
            elif cmd[1] == "Menuet":
                self.beepsound.set_tempo(120)
                self.beepsound.Menuet()
            elif cmd[1] == "ToccataUndFugeInDMoll":
                self.beepsound.set_tempo(120)
                self.beepsound.ToccataUndFugeInDMoll()
            else:
                self.is_passed = False

        elif cmd[0] == "export":
            self.export.parser(cmd, self.script_line, timestamp, self.val)

        elif cmd[0] == "eval_str_eq":
            if str(self.val) != str(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_int_eq":
            if int(self.val) != int(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_int_gt":
            if int(self.val) < int(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_int_lt":
            if int(self.val) > int(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_dbl_eq":
            if float(self.val) != float(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_dbl_gt":
            if float(self.val) < float(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "eval_dbl_lt":
            if float(self.val) > float(cmd[1]):
                self.is_passed = False

        elif cmd[0] == "runscript":
            if len(cmd) == 3:
                self.__runscript(cmd[1],cmd[2])
            else:
                print("ERROR>>>invalid args.")
                self.is_passed = False

        else:
            cmd.append("#")

        return self.is_passed, timestamp, cmd

    def __runscript(self, script_file_name, result_file_name):
        with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
            script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')

            with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
                result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')

                for cmd in script:
                    self.is_passed, timestamp, cmd = self.parser(cmd)
                    if self.is_passed == True:
                        cmd.append("OK")
                        cmd.insert(0,timestamp)
                        print(cmd)
                        result.writerow(cmd)
                    else:
                        cmd.append("NG")
                        cmd.insert(0,timestamp)
                        print(cmd)
                        result.writerow(cmd)
                        print("FAIL")
                        return

        if self.is_passed == True:
            print("PASS")
            return

def help():
    helpmsg = [\
        "usage:",\
        "  python testrunner.py <command>",\
        "command:",\
        "  sleep,<sleep[sec]>                                               sleep",\
        "  rusleep,<lower[sec]>,<upper[sec]>                                random sleep",\
        "  prompt                                                           hit y/n to continue",\
        "  open_uart,<port>,<bps>,<dsrdtr(0 or 1)>                          open uart",\
        "  send_uart,<port>,<txdata>                                        send txdata to port",\
        "  rcvd_uart,<port>                                                 receive rxdata from port",\
        "  list_uart                                                        print uart list",\
        "  set_servo,<port>,<servo ID>,<angle>                              servo moving",\
        "  open_dso                                                         open VISA device",\
        "  dso,<txdata>                                                     send txdata to VISA device",\
        "  open_cam,   <Camera ID>,<Width>,<Height>                         open Camera device",\
        "  capture_cam,<Camera ID>,<filename.png>                           capture Camera picture",\
        "  rec_start,  <Camera ID>,<filename.mp4>                           start Video recording",\
        "  rec_stop,   <Camera ID>                                          stop Video recording",\
        "  close_cam,  <Camera ID>                                          close Camera device",\
        "  close_allcam                                                     close all Camera device",\
        "  crop_img,<filename(from)>,<pos1(x:y)>,<pos2(x:y)>,<filename(to)> crop image from file",\
        "  crop_mov,<filename(from)>,<pos1(x:y)>,<pos2(x:y)>,<dirname(to)>  crop image from movie file",\
        "  exec_ocr,<filename>                                              ocr",\
        "  exec_labelimg,<filename>,<label>                                 TensorFlow labelimg",\
        "  export,<filename.csv>,str,<string>",\
        "  export,<filename.csv>,linenumber",\
        "  export,<filename.csv>,timestamp",\
        "  export,<filename.csv>,val",\
        "  export,<filename.csv>,writerow",\
        "  export,close",\
        "  eval_str_eq,<value>",\
        "  eval_int_eq,<value>",\
        "  eval_int_gt,<value>",\
        "  eval_int_lt,<value>",\
        "  eval_dbl_eq,<value>",\
        "  eval_dbl_gt,<value>",\
        "  eval_dbl_lt,<value>",\
        "  beepsound,<FurElise/Menuet/ToccataUndFugeInDMoll>                beep sound",\
        "  run,<command strings>                                            execute command",\
        "  runscript,<scriptfile.csv>,<resultfile.csv>                      run command in script file"
    ]
    for i in helpmsg:
        print(i)

def open_dso():
    dso = UNINITIALIZED
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        if resource.find("USB0",0,4) == 0: #文字列がUSB0で始まれば0、そうでない場合は-1
            try:
                dso = rm.open_resource(resource)
            except:
                print(resource, "Not Found.")
            else:
                print(resource, "Detected.")
                return dso
        else:
            print("%s is not DSO" % resource)

    #Throw an error to caller if none succeed.
    return dso

def crop_img(filename_in, p0, p1, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    h0 = int(p0.split(':')[0])
    v0 = int(p0.split(':')[1])
    h1 = int(p1.split(':')[0])
    v1 = int(p1.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def crop_mov(filename_in, p0, p1, dirname_out, ext="jpg"):
    """
    This function is based on
    https://github.com/nkmk/python-snippets/blob/c4d70389733a34250595f1c17d737ef004f2dcbd/notebook/opencv_video_to_still.py#L1-L27
    MIT License, Copyright 2019 nkmk.me
    """
    cap = cv2.VideoCapture(filename_in)
    if not cap.isOpened():
        return

    os.makedirs(dirname_out, exist_ok=True)
    base_name = os.path.splitext(os.path.basename(filename_in))[0]
    base_path = os.path.join(dirname_out, base_name)

    #digit = len(str(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))))
    digit = 8

    h0 = int(p0.split(':')[0])
    v0 = int(p0.split(':')[1])
    h1 = int(p1.split(':')[0])
    v1 = int(p1.split(':')[1])

    n = 0
    while True:
        ret, frame = cap.read()
        if ret:
            frame2 = frame[v0:v1, h0:h1]
            frame3 = cv2.resize(frame2, dsize=None, fx=2.0, fy=2.0)
            frame4 = cv2.cvtColor(frame3, cv2.COLOR_BGR2GRAY)
            th, frame5 = cv2.threshold(frame4, 16, 255, cv2.THRESH_OTSU)
            #print(th)
            cv2.imwrite('{}_{}.{}'.format(base_path, str(n).zfill(digit), ext), frame5)
            n += 1
        else:
            return

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, port, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False

    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False

    if uart.gethandle(port) != None:
        uart.send(port, "servoread")

        servo_position = uart.rcvd(port)
        print(servo_position)

        # discard "OK"
        devnull = uart.rcvd(port)

        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)

        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1

        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            uart.send(port, command)
            # discard "OK"
            devnull = uart.rcvd(port)
            sleep(0.2) # sec.

        return True
    else:
        print("ERROR>>>port %s is not ready." % port)
        return False

def main():
    #print(sys.argv)
    if len(sys.argv) == 1:
        help()
    else:
        sys.argv.pop(0)  #引数の1番目の要素(=testrunner.py)を削除

        testrunner = Testrunner()
        ret = testrunner.parser(sys.argv)
        #print(ret)

        testrunner.parser(["close_allcam"])
        testrunner.parser(["export","close"])

        if ret[0] == True:
            testrunner.parser(["beepsound","Menuet"])
            print("PASS")
            sys.exit(0)
        else:
            testrunner.parser(["beepsound","FurElise"])
            print("FAIL")
            sys.exit(1)


if __name__ == '__main__':
    main()

付録B. Lチカで始めるテスト自動化・記事一覧

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
  3. Lチカで始めるテスト自動化(3)オシロスコープの組込み
  4. Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
  5. Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
  6. Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
  7. Lチカで始めるテスト自動化(7)タイムスタンプの保存
  8. Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
  9. Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
  10. Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
  11. Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
  12. Lチカで始めるテスト自動化(12)書籍化の作業メモ
  13. Lチカで始めるテスト自動化(13)外部プログラムの呼出し
  14. Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
  15. Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
  16. Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
  17. Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
  18. Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
  19. Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
  20. Lチカで始めるテスト自動化(20)複数のカメラ映像の同時録画
  21. Lチカで始めるテスト自動化(21)キーボード入力待ちの実装
  22. Lチカで始めるテスト自動化(22)DACをコマンドで制御してLチカする
  23. Lチカで始めるテスト自動化(23)ブレッドボード互換のユニバーサル基板
  24. Lチカで始めるテスト自動化(24)コマンドの実行結果を任意のCSVファイルへ出力する
  25. Lチカで始めるテスト自動化(25)Windowsでもテスト終了時にpass/failに応じてメロディを流す
  26. Lチカで始めるテスト自動化(26)テストランナーのモジュール化

電子書籍化したものを技術書典ka'sらぼで頒布しています。

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化 -2- リレー駆動回路の設計 (※書き下ろし)
  3. Lチカで始めるテスト自動化 -3- Raspberry Pi Zero WHとIoT学習HATキットで作るテストランナー
  1. camera.py全文はLチカで始めるテスト自動化(20)複数のカメラ映像の同時録画 5.camera.pyをご覧ください。

  2. あくまでも筆者の環境で10秒弱録画した動画にもとづく値です。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?