1. はじめに
外部のPythonプログラムからテストランナーを利用できるようモジュール化します。
例)external.py
import testrunner as Testrunner
testrunner = Testrunner.Testrunner()
# UART port open
ret = testrunner.parser(["open_uart","COM7",115200,0])
print(ret)
実行例
>python external.py
##### 2022/10/17 00:08:45 1 #####
1:open_uart,COM7,115200,0,
(True, '2022/10/17 00:08:45', ['open_uart', 'COM7', 115200, 0])
2. 改修の方針
- モジュールとしてimportできるようファイル名からハイフンをなくします。
- コマンドラインから実行したときのみmain()を実行するようにします。
- 従来のmain()が抱えている以下の機能を分離します。
a. 初期化
b. テストスクリプト(CSVファイル)の読み出し
c. テストスクリプトのパース、実行
d. テスト実行ログの書き出し
e. 終了処理 - 上記b、dを実行するコマンド(runscript)を追加します。
- 上記cからexportコマンドの機能を分離します(規模が大きいため)。
i. コマンドで終了時処理を行えるよう、close機能を追加します。 - 今回の改修に併せてUARTのポートを複数持てるようにします。
i. 改修対象のコマンド:open_uart、send_uart、rcvd_uart、set_servo
ii. 追加のコマンド:list_uart - 今回の改修に併せてcrop_imgのコマンド仕様を変更します。
旧)crop_img,入力ファイル名,切り出し位置(縦),切り出し位置(横),出力ファイル名
新)crop_img,入力ファイル名,切り出し位置の左上の座標,切り出し位置の右下の座標,出力ファイル名 - また、コマンドのusageを表示する機能を追加します。
3. 改修内容
3.1 ファイル名の変更
test-runner.pyをtestrunner.pyに変更します。
3.2 コマンドラインから実行したときのみmain()を実行
"if __name__ == '__main__':"でガードをかけます。
旧
main()
新
if __name__ == '__main__':
main()
3.3 main()の機能分離
- Testrunnerクラスを作成
- __init__(self)に初期化コードを移植
- parser()メソッドにスクリプトのパース、実行機能を移植
- __runscript()メソッドにテストスクリプトの読み出し、テスト実行ログの書き出しを移植
3.4 runscriptコマンドの追加
コマンド | 引数 |
---|---|
runscript | <スクリプトのファイル名>,<実行ログのファイル名> |
- ファイル名には拡張子".csv"を含めます。
例)runscript,script.csv,script_result.csv - コマンドラインで実行する場合はrunscriptコマンドを使用します。
例)python testrunner.py runscript script.csv script_result.csv
3.5 エクスポートコマンドの分離
- Exportクラスを作成
- close機能を追加
3.6 UARTポートの複数対応
- Uartクラスを作成
- self.device配列にUARTのハンドラを格納
3.7 crop_imgの仕様変更
切り出し位置の座標の指定方法を「v0:v1,h0:h1」から「x0:y0,x1:y1」に変更します。
旧
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
新
def crop_img(filename_in, p0, p1, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
h0 = int(p0.split(':')[0])
v0 = int(p0.split(':')[1])
h1 = int(p1.split(':')[0])
v1 = int(p1.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
3.8 usage表示機能
- help()を作成
- 引数なしでtestrunner.pyが実行されたらhelp()を実行
def main():
#print(sys.argv)
if len(sys.argv) == 1:
help()
(以下略)
実行例
>python testrunner.py
usage:
python testrunner.py <command>
command:
sleep,<sleep[sec]> sleep
rusleep,<lower[sec]>,<upper[sec]> random sleep
(以下略)
4. ソースコード
testrunner.py
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
from beepsound import BEEPSOUND
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
class Uart:
def __init__(self):
self.device = []
def list(self):
print(self.device)
return self.device
def open(self, port, bps, timeout_val, dsrdtr_val):
h = self.gethandle(port)
if h == None:
try:
h = serial.Serial(port, int(bps), timeout=timeout_val, dsrdtr=int(dsrdtr_val))
self.device.append(h)
return True
except:
print("ERROR>>>open fail")
return False
else:
print("WARNING>>>port %s is ready." % port)
return True
def send(self, port, txdata):
h = self.gethandle(port)
if h == None:
print("ERROR>>>port %s is not ready." % port)
return False
else:
try:
txdata = txdata + '\n'
txdata = str.encode(txdata)
h.write(txdata)
return True
except:
print("ERROR>>>send fail.")
return False
def rcvd(self, port):
h = self.gethandle(port)
if h == None:
print("ERROR>>>port %s is not ready." % port)
return False
else:
try:
rxdata = h.readline().strip().decode('utf-8')
print(rxdata)
return rxdata
except:
print("ERROR>>>rcvd fail.")
return False
def gethandle(self, port):
h = None
for dev in self.device:
if dev:
if dev.port == port:
h = dev
break
return h
class Export:
def __init__(self):
#exportfiles
#概要
# exportコマンドで使用するN行3列の二次元配列
#説明
# 以下の3つの要素を1行3桁の配列で構造化し、
# 1.ファイルオブジェクト(open()の戻り値)
# 2.Writerオブジェクト(csv.writer()の戻り値)
# 3.ファイルに出力するデータのバッファ
# 出力するファイルの数だけこの配列を行方向に追加する
#構造
# exportfiles = [
# [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
# [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
# ...
# ]
#初期値
# 空の1行3列の2次元配列で初期化する
self.exportfiles = [\
[ [],[],[] ]\
]
def parser(self, cmd, script_line, timestamp, val):
#export,<filename.csv>,str,<string>
#export,<filename.csv>,linenumber
#export,<filename.csv>,timestamp
#export,<filename.csv>,val
#export,<filename.csv>,writerow
#export,close
if cmd[0] == "export":
#1)引数が3以上の場合に処理を行う
if len(cmd) >= 3:
#2)操作対象のファイルディスクリプタを決める
##2.1)操作対象のファイルディスクリプタを検索する
exportfile = None
i = 0 #exportfiles配列の行番号
#print(exportfiles)
for f in self.exportfiles:
if f[0]:
#print(f[0].name)
if f[0].name == cmd[1]:
exportfile = f
break
i = i + 1
#print("index " + str(i))
##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
if exportfile == None:
if i >= 1:
self.exportfiles.append([[],[],[]])
self.exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
self.exportfiles[i][1] = csv.writer(self.exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
#print(i, exportfiles)
if cmd[2] == "str":
#print("str " + self.exportfiles[i][0].name)
self.exportfiles[i][2].append(cmd[3])
elif cmd[2] == "linenumber":
#print("line " + self.exportfiles[i][0].name)
self.exportfiles[i][2].append(str(script_line))
elif cmd[2] == "timestamp":
#print("time " + self.exportfiles[i][0].name)
self.exportfiles[i][2].append(timestamp)
elif cmd[2] == "val":
#print("val " + self.exportfiles[i][0].name)
self.exportfiles[i][2].append(str(val))
elif cmd[2] == "writerow":
#print("wrow " + self.exportfiles[i][0].name)
#print(self.exportfiles)
self.exportfiles[i][1].writerow(self.exportfiles[i][2])
self.exportfiles[i][2].clear()
else:
print("export parameter error")
return False
elif cmd[1] == "close":
for f in self.exportfiles:
if f[0]:
print("close " + f[0].name)
f[0].close()
else:
print("export parameter error")
return False
return True
class Testrunner:
def __init__(self):
self.is_passed = True
self.val = str(UNINITIALIZED)
self.fval = 0.0
self.uart = Uart()
self.dso = UNINITIALIZED
self.ocr = UNINITIALIZED
self.cmds = ""
self.cam0 = Camera(0)
self.cam1 = Camera(1)
self.cam2 = Camera(2)
self.cam = [self.cam0, self.cam1, self.cam2]
self.beepsound = BEEPSOUND()
self.export = Export()
self.script_line = 0
def parser(self, cmd):
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
self.script_line += 1
print("##### " + timestamp + " " + str(self.script_line) + " #####")
cmds = str(self.script_line) + ":"
for i in range(len(cmd)):
cmds += str(cmd[i]) + ","
for i in range(len(self.cam)):
self.cam[i].set_video_txt(cmds)
print(cmds)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
self.fval = random.uniform(float(cmd[1]), float(cmd[2]))
self.val = str(self.fval)
cmd.append(self.val)
cmds += self.val
for i in range(len(self.cam)):
self.cam[i].set_video_txt(cmds)
print(cmds)
sleep(self.fval)
elif cmd[0] == "prompt":
print("Enter y/n")
self.val = input()
self.val = self.val.lower()
cmd.append(self.val)
cmds += self.val
for i in range(len(self.cam)):
self.cam[i].set_video_txt(cmds)
print(cmds)
sleep(1)
if self.val != "y":
self.is_passed = False
elif cmd[0] == "list_uart":
self.val = self.uart.list()
cmd.append(self.val)
elif cmd[0] == "open_uart":
try:
self.is_passed = self.uart.open(cmd[1], cmd[2], 1.0, cmd[3])
except:
print("Open Fail")
self.is_passed = False
elif cmd[0] == "send_uart":
ret = self.uart.send(cmd[1], cmd[2])
if ret == False:
self.is_passed = False
elif cmd[0] == "rcvd_uart":
try:
self.val = self.uart.rcvd(cmd[1])
cmd.append(self.val)
except:
self.is_passed = False
elif cmd[0] == "open_dso":
try:
self.dso = open_dso()
except:
self.is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
self.val = self.dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(self.val)
else:
self.dso.write(cmd[1])
except:
self.is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
self.cam[int(cmd[1])].open_cam(640, 480)
else:
self.cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
self.cam[int(cmd[1])].close_cam()
elif cmd[0] == "close_allcam":
for i in range(len(self.cam)):
print("CamID:%d" % i)
if self.cam[i].get_video_rec() == True:
self.cam[i].set_video_rec(False)
self.cam[i].thread.join()
else:
pass
#print("No Recording.")
self.cam[i].close_cam()
elif cmd[0] == "capture_cam":
ret = self.cam[int(cmd[1])].capture_cam(cmd[2])
if ret == False:
self.is_passed = False
elif cmd[0] == "rec_start":
if self.cam[int(cmd[1])].get_video_rec() == False:
self.cam[int(cmd[1])].set_video_rec(True)
self.cam[int(cmd[1])].thread = threading.Thread(target=self.cam[int(cmd[1])].video_record, args=(cmd[2],))
self.cam[int(cmd[1])].thread.start()
else:
print("Already Recording.")
elif cmd[0] == "rec_stop":
if self.cam[int(cmd[1])].get_video_rec() == True:
self.cam[int(cmd[1])].set_video_rec(False)
self.cam[int(cmd[1])].thread.join()
else:
print("No Recording.")
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
self.ocr = open_ocr()
if self.ocr == UNINITIALIZED:
self.is_passed = False
elif cmd[0] == "exec_ocr":
try:
self.val = exec_ocr(self.ocr, cmd[1])
except:
self.is_passed = False
else:
cmd.append(str(self.val))
elif cmd[0] == "exec_labelimg":
try:
self.val = exec_labelimg(cmd[1], cmd[2])
except:
self.is_passed = False
else:
cmd.append(str(self.val))
#set_servo, port, ID, Val
elif cmd[0] == "set_servo":
ret = set_servo(self.uart, cmd[1], int(cmd[2]), int(cmd[3]))
if ret == False:
self.is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
self.val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
self.is_passed = False
elif cmd[0] == "beepsound":
if cmd[1] == "FurElise":
self.beepsound.set_tempo(60)
self.beepsound.FurElise()
elif cmd[1] == "Menuet":
self.beepsound.set_tempo(120)
self.beepsound.Menuet()
elif cmd[1] == "ToccataUndFugeInDMoll":
self.beepsound.set_tempo(120)
self.beepsound.ToccataUndFugeInDMoll()
else:
self.is_passed = False
elif cmd[0] == "export":
self.export.parser(cmd, self.script_line, timestamp, self.val)
elif cmd[0] == "eval_str_eq":
if str(self.val) != str(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_int_eq":
if int(self.val) != int(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_int_gt":
if int(self.val) < int(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_int_lt":
if int(self.val) > int(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(self.val) != float(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(self.val) < float(cmd[1]):
self.is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(self.val) > float(cmd[1]):
self.is_passed = False
elif cmd[0] == "runscript":
if len(cmd) == 3:
self.__runscript(cmd[1],cmd[2])
else:
print("ERROR>>>invalid args.")
self.is_passed = False
else:
cmd.append("#")
return self.is_passed, timestamp, cmd
def __runscript(self, script_file_name, result_file_name):
with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')
for cmd in script:
self.is_passed, timestamp, cmd = self.parser(cmd)
if self.is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
print("FAIL")
return
if self.is_passed == True:
print("PASS")
return
def help():
helpmsg = [\
"usage:",\
" python testrunner.py <command>",\
"command:",\
" sleep,<sleep[sec]> sleep",\
" rusleep,<lower[sec]>,<upper[sec]> random sleep",\
" prompt hit y/n to continue",\
" open_uart,<port>,<bps>,<dsrdtr(0 or 1)> open uart",\
" send_uart,<port>,<txdata> send txdata to port",\
" rcvd_uart,<port> receive rxdata from port",\
" list_uart print uart list",\
" set_servo,<port>,<servo ID>,<angle> servo moving",\
" open_dso open VISA device",\
" dso,<txdata> send txdata to VISA device",\
" open_cam, <Camera ID>,<Width>,<Height> open Camera device",\
" capture_cam,<Camera ID>,<filename.png> capture Camera picture",\
" rec_start, <Camera ID>,<filename.mp4> start Video recording",\
" rec_stop, <Camera ID> stop Video recording",\
" close_cam, <Camera ID> close Camera device",\
" close_allcam close all Camera device",\
" crop_img,<filename(from)>,<pos1(x:y)>,<pos2(x:y)>,<filename(to)> crop image from file",\
" exec_ocr,<filename> ocr",\
" exec_labelimg,<filename>,<label> TensorFlow labelimg",\
" export,<filename.csv>,str,<string>",\
" export,<filename.csv>,linenumber",\
" export,<filename.csv>,timestamp",\
" export,<filename.csv>,val",\
" export,<filename.csv>,writerow",\
" export,close",\
" eval_str_eq,<value>",\
" eval_int_eq,<value>",\
" eval_int_gt,<value>",\
" eval_int_lt,<value>",\
" eval_dbl_eq,<value>",\
" eval_dbl_gt,<value>",\
" eval_dbl_lt,<value>",\
" beepsound,<FurElise/Menuet/ToccataUndFugeInDMoll> beep sound",\
" run,<command strings> execute command",\
" runscript,<scriptfile.csv>,<resultfile.csv> run command in script file"
]
for i in helpmsg:
print(i)
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def crop_img(filename_in, p0, p1, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
h0 = int(p0.split(':')[0])
v0 = int(p0.split(':')[1])
h1 = int(p1.split(':')[0])
v1 = int(p1.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, port, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart.gethandle(port) != None:
uart.send(port, "servoread")
servo_position = uart.rcvd(port)
print(servo_position)
# discard "OK"
devnull = uart.rcvd(port)
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
uart.send(port, command)
# discard "OK"
devnull = uart.rcvd(port)
sleep(0.2) # sec.
return True
else:
print("ERROR>>>port %s is not ready." % port)
return False
def main():
#print(sys.argv)
if len(sys.argv) == 1:
help()
else:
sys.argv.pop(0) #引数の1番目の要素(=testrunner.py)を削除
testrunner = Testrunner()
ret = testrunner.parser(sys.argv)
#print(ret)
testrunner.parser(["close_allcam"])
testrunner.parser(["export","close"])
if ret[0] == True:
testrunner.parser(["beepsound","Menuet"])
print("PASS")
sys.exit(0)
else:
testrunner.parser(["beepsound","FurElise"])
print("FAIL")
sys.exit(1)
if __name__ == '__main__':
main()
5. おわりに
従来のテストスクリプトのCSVファイルを読み込む方法に加えて、外部のPythonプログラムからimportしてテストランナーを実行できるようになりました。
付録A. Lチカで始めるテスト自動化・記事一覧
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
- Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
- Lチカで始めるテスト自動化(7)タイムスタンプの保存
- Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
- Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
- Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
- Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
- Lチカで始めるテスト自動化(12)書籍化の作業メモ
- Lチカで始めるテスト自動化(13)外部プログラムの呼出し
- Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
- Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
- Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
- Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
- Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
- Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
- Lチカで始めるテスト自動化(20)複数のカメラ映像の同時録画
- Lチカで始めるテスト自動化(21)キーボード入力待ちの実装
- Lチカで始めるテスト自動化(22)DACをコマンドで制御してLチカする
- Lチカで始めるテスト自動化(23)ブレッドボード互換のユニバーサル基板
- Lチカで始めるテスト自動化(24)コマンドの実行結果を任意のCSVファイルへ出力する
- Lチカで始めるテスト自動化(25)Windowsでもテスト終了時にpass/failに応じてメロディを流す