##1. はじめに
新しいはんだごてが欲しいなあと思って各メーカーのはんだごての商品紹介ページをいろいろ眺めていたところ、HAKKOの自動はんだ付けシステム/はんだ付けロボットシステムHU-200が目に留まり、今時はこういったはんだ付けも自動化なんだな、すごいなと思いました。
ところでこのようなPCやタブレットのアプリで制御するような組込みシステムのシステムテストをもし仮に自動化しようとしたら、アプリの画面とシステム全体を撮影するカメラの映像を同時に記録したくなると思いました。そこで複数の動画を同時に録画できるようにテストランナーを改修します。
##2. テストベンチ
###2.1 テストベンチの構成
試作したテストベンチの構成を以下に示します。
- PCに外付けモニタを接続し画面を複製します。PCと外付けモニタの間にパススルー機能付きのHDMIビデオキャプチャデバイスを挟み、画面をキャプチャできるようにします。
- PCにWebカメラを接続し、外付けモニタの画面の任意の箇所を映します。
- 複数のカメラ映像を同時に録画できるように改修したテストランナーをPCで実行します。
###2.2 実行例
PCのデスクトップのスクリーンショットを以下に示します。
- 右側の水色の枠は実行中のスクリプトです。
- 左上のピンクの枠はWebカメラの映像です。外付けモニタの実行中のスクリプトのエリアを映しています。
- 左下の赤枠はビデオキャプチャデバイスの映像です。スクリーンショットと同様の映像が映っています。
- 参考までに右下にタスクマネージャーを配置しています。
- Webカメラ、ビデオキャプチャの映像はどちらも640x480のサイズで、2本同時に録画している間のプロセッサ使用率は10%~20%程度でした。
- PCのスペックはCore i7-7500U(2.70GHz)、Intel HD Graphics 620、メモリ16GBです(ThinkPad X1 Carbon 5th)。
##3. 改修の方針
- テストランナーからビデオキャプチャのプログラムを分離する
- 分離したビデオキャプチャのプログラムをクラス化する
- 各UVCデバイスごとにインスタンスや録画のスレッドを作成する
以下のコマンドは引数にカメラデバイスの番号を与えるようにします。
|コマンド |引数
|-----------+--------
|close_cam |カメラデバイスの番号
|capture_cam|カメラデバイスの番号, ファイル名
|rec_start |カメラデバイスの番号, ファイル名
|rec_stop |カメラデバイスの番号
併せて終了時のデバイスクローズ処理を関数化し、pass、failともこの関数を呼ぶようにします。
##4. テストランナーの改修
「Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する」のtest-runner.pyとの差分を以下に説明します。全部入りソースは付録Aをご覧ください。
###4.1 cameraモジュールのimport
ビデオキャプチャのプログラムをcamera.py(5章)へ分離し、cameraモジュールをimportします。
from camera import Camera
###4.2 ビデオキャプチャ関係のプログラムの移動
以下のグローバル変数や関数はcamera.pyへ移動し、テストランナーから削除します。
- gVideoTxt
- set_video_txt()
- get_video_txt()
- gVideoRec
- set_video_rec()
- get_video_rec()
- open_cam()
- close_cam()
- capture_cam()
- video_rec()
###4.3 デバイスクローズ処理の関数化
以下の関数を新たに作成します。
def device_close(uart, cam):
close_uart(uart)
for i in range(len(cam)):
if cam[i].get_video_rec() == True:
cam[i].set_video_rec(False)
cam[i].thread.join()
else:
pass
#print("No Recording.")
cam[i].close_cam()
###4.4 main()の改修
####4.4.1 Cameraクラスのインスタンス作成
cam0、cam1、cam2の3つのインスタンスを作成し、配列に格納して添え字でアクセスできるようにします。gVideoTxtやgVideoTxtの初期化はコンストラクタで行います。
def main():
~略~
set_video_rec(False)
set_video_txt("null")
def main():
~略~
cam0 = Camera(0)
cam1 = Camera(1)
cam2 = Camera(2)
cam = [cam0, cam1, cam2]
###4.4.2 実行中のスクリプト行番号およびコマンド文字列の生成
cam0~cam2に同一の文字列を設定します。
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
set_video_txt(cmds)
print(cmds)
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
###4.4.3 ランダムスリープrusleepコマンド
4.4.2同様、set_video_txt()の部分を修正します。
set_video_txt(cmds)
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
###4.4.4 open_camコマンド
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam = open_cam(int(cmd[1]), 640, 480)
else:
cam = open_cam(int(cmd[1]), int(cmd[2]), int(cmd[3]))
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam[int(cmd[1])].open_cam(640, 480)
else:
cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
###4.4.5 close_camコマンド
elif cmd[0] == "close_cam":
close_cam(cam)
cam = UNINITIALIZED
elif cmd[0] == "close_cam":
cam[int(cmd[1])].close_cam()
###4.4.6 capture_camコマンド
elif cmd[0] == "capture_cam":
ret = capture_cam(cam, cmd[1])
elif cmd[0] == "capture_cam":
ret = cam[int(cmd[1])].capture_cam(cmd[2])
###4.4.7 rec_startコマンド
elif cmd[0] == "rec_start":
if get_video_rec() == False:
set_video_rec(True)
thread1 = threading.Thread(target=video_rec, args=(cam,cmd[1],))
thread1.start()
elif cmd[0] == "rec_start":
if cam[int(cmd[1])].get_video_rec() == False:
cam[int(cmd[1])].set_video_rec(True)
cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
cam[int(cmd[1])].thread.start()
###4.4.8 rec_stopコマンド
elif cmd[0] == "rec_stop":
if get_video_rec() == True:
set_video_rec(False)
thread1.join()
elif cmd[0] == "rec_stop":
if cam[int(cmd[1])].get_video_rec() == True:
cam[int(cmd[1])].set_video_rec(False)
cam[int(cmd[1])].thread.join()
###4.4.9 終了処理
device_close()を呼ぶようにします。
else:
~略~
if get_video_rec() == True:
set_video_rec(False)
thread1.join()
#else:
# print("No Recording.")
close_uart(uart)
close_cam(cam)
~略~
if is_passed == True:
if get_video_rec() == True:
set_video_rec(False)
thread1.join()
#else:
# print("No Recording.")
close_uart(uart)
close_cam(cam)
~略~
else:
~略~
device_close(uart, cam)
~略~
if is_passed == True:
device_close(uart, cam)
~略~
##5. camera.py
モジュール化に併せて以下の変更を行っています。
- gVideoTxt、gVideoRecがグローバル変数からインスタンス変数になるため変数名を修正(video_txt、video_rec)
- video_rec()の関数名をvideo_record()に変更
- 画像に重畳するタイムスタンプにミリ秒を追加
# -*- coding: utf-8 -*-
# camera.py
# python 3.x
#
# This software includes the work that is distributed
# in the Apache License 2.0
# ka's@pbjpkas 2021
#
import cv2
import datetime
class Camera:
def __init__(self, camera_id):
self.camera_id = camera_id
self.w = 640
self.h = 480
self.f = 30.0
self.video_txt = "null" # Text over Video Frame
self.video_rec = False # Video Recording Thread flag
self.thread = False
self.cam = False # cv2.VideoCapture() handler
def set_video_txt(self, txt):
self.video_txt = str(txt)
def get_video_txt(self):
return self.video_txt
def set_video_rec(self, flag):
if flag:
self.video_rec = True
else:
self.video_rec = False
def get_video_rec(self):
return self.video_rec
def open_cam(self, width, height):
if self.cam == False:
self.cam = cv2.VideoCapture(self.camera_id)
self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.w = int(self.cam.get(cv2.CAP_PROP_FRAME_WIDTH))
self.h = int(self.cam.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.f = self.cam.get(cv2.CAP_PROP_FPS)
print(str(self.w) + "x" + str(self.h) + "," + str(self.f) + "fps")
else:
print("CAM " + str(self.camera_id) + " Already Opend.")
def close_cam(self):
if self.cam != False:
self.cam.release()
self.cam = False
print("CAM " + str(self.camera_id) + " Released.")
else:
print("CAM " + str(self.camera_id) + " Not Opend.")
def text_over_img(self, img):
timestamp = (datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")[:-3])
videotxt = self.get_video_txt()
cv2.rectangle(img, (5,5), (self.w-5,25), (255,255,255), thickness=-1)
cv2.putText(img, timestamp, ( 10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
cv2.putText(img, videotxt, (235,20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
def capture_cam(self, filename):
if self.cam == False:
print("CAM Not Ready.")
return False
ret, img = self.cam.read()
if ret == True:
self.text_over_img(img)
cv2.imwrite(filename, img)
return True
else:
return False
def video_record(self, filename):
if self.cam == False:
print("CAM Not Ready.")
return False
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
out = cv2.VideoWriter(filename, fourcc, self.f, (self.w, self.h))
msg = "CAM " + str(self.camera_id) + " : q to quit"
while True:
ret, img = self.cam.read()
if ret == True:
self.text_over_img(img)
out.write(img)
cv2.imshow(msg, img)
if (cv2.waitKey(1) == ord('q')) or (self.get_video_rec() == False):
print("### rec stop ###")
break
out.release()
return True
##6. 実行例
2.2節のテストスクリプトと実行結果を以下に示します。カメラ番号1はWebカメラ、2はビデオキャプチャデバイスです。
open_cam,1,640,480
open_cam,2,640,480
rec_start,1,video_1.mp4
rec_start,2,video_2.mp4
sleep,120
rec_stop,1
rec_stop,2
2021/09/12 15:10:47,open_cam,1,640,480,OK
2021/09/12 15:10:47,open_cam,2,640,480,OK
2021/09/12 15:10:58,rec_start,1,video_1.mp4,OK
2021/09/12 15:10:58,rec_start,2,video_2.mp4,OK
2021/09/12 15:10:58,sleep,120,OK
2021/09/12 15:12:58,rec_stop,1,OK
2021/09/12 15:12:58,rec_stop,2,OK
##7. おわりに
- ビデオキャプチャ関係のプログラムをクラス化したことで、UVCデバイスが増えてもインスタンスを増やすことで対応できるようになりました。
- 各UVCデバイスごとにインスタンスや録画のスレッドを作成することで、デスクトップのビデオキャプチャとWebカメラの映像の2本の動画を同時に録画できるようになりました。
##付録A. test-runner.pyのソース
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart != UNINITIALIZED:
serial_write(uart, "servoread")
servo_position = uart.readline().strip().decode('utf-8')
print(servo_position)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
serial_write(uart, command)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
sleep(0.2) # sec.
return True
else:
print("UART Not Initialized.")
return False
def device_close(uart, cam):
close_uart(uart)
for i in range(len(cam)):
if cam[i].get_video_rec() == True:
cam[i].set_video_rec(False)
cam[i].thread.join()
else:
pass
#print("No Recording.")
cam[i].close_cam()
def main():
is_passed = True
val = str(UNINITIALIZED)
fval = 0.0
uart = UNINITIALIZED
dso = UNINITIALIZED
cam = UNINITIALIZED
ocr = UNINITIALIZED
cmds = ""
cam0 = Camera(0)
cam1 = Camera(1)
cam2 = Camera(2)
cam = [cam0, cam1, cam2]
if len(sys.argv) == 2:
script_file_name = sys.argv[1] + ".csv"
result_file_name = sys.argv[1] + "_result.csv"
else:
script_file_name = "script.csv"
result_file_name = "result.csv"
with codecs.open(script_file_name, 'r', 'utf-8') as file:
script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file:
result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')
script_line = 0
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
script_line += 1
print("##### " + timestamp + " " + str(script_line) + " #####")
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
cmd.append(str(fval))
cmds += str(fval)
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam[int(cmd[1])].open_cam(640, 480)
else:
cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
cam[int(cmd[1])].close_cam()
elif cmd[0] == "capture_cam":
ret = cam[int(cmd[1])].capture_cam(cmd[2])
if ret == False:
is_passed = False
elif cmd[0] == "rec_start":
if cam[int(cmd[1])].get_video_rec() == False:
cam[int(cmd[1])].set_video_rec(True)
cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
cam[int(cmd[1])].thread.start()
else:
print("Already Recording.")
elif cmd[0] == "rec_stop":
if cam[int(cmd[1])].get_video_rec() == True:
cam[int(cmd[1])].set_video_rec(False)
cam[int(cmd[1])].thread.join()
else:
print("No Recording.")
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "set_servo":
ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
if ret == False:
is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
is_passed = False
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
device_close(uart, cam)
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
print("PASS")
sys.exit(0)
main()
##付録B. Lチカで始めるテスト自動化・記事一覧
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
- Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
- Lチカで始めるテスト自動化(7)タイムスタンプの保存
- Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
- Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
- Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
- Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
- Lチカで始めるテスト自動化(12)書籍化の作業メモ
- Lチカで始めるテスト自動化(13)外部プログラムの呼出し
- Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
- Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
- Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
- Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
- Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
- Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
電子書籍化したものを技術書典で頒布しています。