##1. はじめに
操作は自動で行うがPass/Fail判定は手動で行いたい場合を想定しキーボードからの入力を待てるようにします。入力された文字列がWebカメラの映像に重畳されるようにします。
##2. コマンド
以下のコマンドを作成します。
|コマンド|引数|機能
|--------+----+----
|prompt |なし|キーボードからの入力を待つ。
"y"が入力されたら次のコマンドへ進む。
"y"以外が入力されたらFAILで終了する。
##3. 実装
- キーボードからの入力をinput()で受け取る
- 受け取った文字列をlower()で小文字にする
- Webカメラの映像に文字列を重畳するため1秒sleepする
- "y"以外が入力されたらFail判定へ進むようにする
「Lチカで始めるテスト自動化(20)Webカメラの映像を録画しながらテストスクリプトを実行する」のtest-runner.pyとの差分を以下に説明します。全部入りソースは付録Aをご覧ください。
$ diff test-runner_20.py test-runner_21.py
224a225,237
> elif cmd[0] == "prompt":
> print("Enter y/n")
> val = input()
> val = val.lower()
> cmd.append(val)
> cmds += val
> for i in range(len(cam)):
> cam[i].set_video_txt(cmds)
> print(cmds)
> sleep(1)
> if val != "y":
> is_passed = False
>
##4. 実行例
y、n、HOGEを入力した例を以下に示します。
###4.1 テストスクリプト
prompt.csv
open_cam,0,640,480
rec_start,0,video_0.mp4
prompt
###4.2 yを入力して終了
OKで終了しています。
prompt_result.csv(y)
2021/09/14 23:43:37,open_cam,0,640,480,OK
2021/09/14 23:43:41,rec_start,0,video_0.mp4,OK
2021/09/14 23:43:41,prompt,y,OK
###4.3 nを入力して終了
NGで終了しています。
prompt_result.csv(n)
2021/09/14 23:45:43,open_cam,0,640,480,OK
2021/09/14 23:45:47,rec_start,0,video_0.mp4,OK
2021/09/14 23:45:47,prompt,n,NG
###4.4 HOGEを入力して終了
NGで終了しています。
prompt_result.csv(HOGE)
2021/09/14 23:48:20,open_cam,0,640,480,OK
2021/09/14 23:48:24,rec_start,0,video_0.mp4,OK
2021/09/14 23:48:24,prompt,hoge,NG
##5. おわりに
- キーボード入力待ちをinput()とlower()で実装できました。
##付録A. test-runner.pyのソース
test-runner.py
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart != UNINITIALIZED:
serial_write(uart, "servoread")
servo_position = uart.readline().strip().decode('utf-8')
print(servo_position)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
serial_write(uart, command)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
sleep(0.2) # sec.
return True
else:
print("UART Not Initialized.")
return False
def device_close(uart, cam):
close_uart(uart)
for i in range(len(cam)):
if cam[i].get_video_rec() == True:
cam[i].set_video_rec(False)
cam[i].thread.join()
else:
pass
#print("No Recording.")
cam[i].close_cam()
def main():
is_passed = True
val = str(UNINITIALIZED)
fval = 0.0
uart = UNINITIALIZED
dso = UNINITIALIZED
cam = UNINITIALIZED
ocr = UNINITIALIZED
cmds = ""
cam0 = Camera(0)
cam1 = Camera(1)
cam2 = Camera(2)
cam = [cam0, cam1, cam2]
if len(sys.argv) == 2:
script_file_name = sys.argv[1] + ".csv"
result_file_name = sys.argv[1] + "_result.csv"
else:
script_file_name = "script.csv"
result_file_name = "result.csv"
with codecs.open(script_file_name, 'r', 'utf-8') as file:
script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file:
result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')
script_line = 0
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
script_line += 1
print("##### " + timestamp + " " + str(script_line) + " #####")
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
cmd.append(str(fval))
cmds += str(fval)
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
elif cmd[0] == "prompt":
print("Enter y/n")
val = input()
val = val.lower()
cmd.append(val)
cmds += val
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(1)
if val != "y":
is_passed = False
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam[int(cmd[1])].open_cam(640, 480)
else:
cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
cam[int(cmd[1])].close_cam()
elif cmd[0] == "capture_cam":
ret = cam[int(cmd[1])].capture_cam(cmd[2])
if ret == False:
is_passed = False
elif cmd[0] == "rec_start":
if cam[int(cmd[1])].get_video_rec() == False:
cam[int(cmd[1])].set_video_rec(True)
cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
cam[int(cmd[1])].thread.start()
else:
print("Already Recording.")
elif cmd[0] == "rec_stop":
if cam[int(cmd[1])].get_video_rec() == True:
cam[int(cmd[1])].set_video_rec(False)
cam[int(cmd[1])].thread.join()
else:
print("No Recording.")
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "set_servo":
ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
if ret == False:
is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
is_passed = False
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
device_close(uart, cam)
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
print("PASS")
sys.exit(0)
main()
##付録B. Lチカで始めるテスト自動化・記事一覧
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
- Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
- Lチカで始めるテスト自動化(7)タイムスタンプの保存
- Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
- Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
- Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
- Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
- Lチカで始めるテスト自動化(12)書籍化の作業メモ
- Lチカで始めるテスト自動化(13)外部プログラムの呼出し
- Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
- Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
- Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
- Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
- Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
- Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
- Lチカで始めるテスト自動化(20)複数のカメラ映像の同時録画
電子書籍化したものを技術書典で頒布しています。