1. はじめに
コマンドの実行結果を任意のCSVファイルに出力して結果をサクッと確認できるようにします。出力先のファイルは複数指定できます。
2. テストランナーの改修箇所
2.1 exportコマンドの追加
- exportコマンドは、1)出力したいデータをカンマ区切りでバッファに並べ、2)バッファのデータをファイルに出力するコマンド群です。
- 出力可能なデータは、1)任意の文字列、2)タイムスタンプ、3)コマンドの実行結果です。
- ファイル名は".csv"込みで与えてください。
- ファイルのopen、closeは自動で実行します。
コマンド | 説明 |
---|---|
export,<filename.csv>,str,<string> | filename.csvのバッファに文字列stringを追加する |
export,<filename.csv>,linenumber | filename.csvのバッファにスクリプトの行番号を追加する |
export,<filename.csv>,timestamp | filename.csvのバッファにタイムスタンプを追加する |
export,<filename.csv>,val | filename.csvのバッファにコマンドの実行結果valを追加する |
export,<filename.csv>,writerow | バッファのデータをfilename.csvに出力しバッファをクリアする |
2.1.1 timestamp
テストランナーがスクリプトファイルを1行ずつ読み込むたびに変数timestampに時刻を格納します。
参考
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
2.1.2 val
テストランナーがコマンドを実行するとコマンドによっては実行結果を変数valに格納します。
参考
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
2.2 rusleepコマンドの修正
sleepの時間をランダムに設定する"rusleep"コマンドを改修しsleep時間をvalに格納するようにします。
2.3 Windows環境でUSB接続のWebカメラの起動時間を短縮
「【Python】opencvでWebカメラの起動に時間がかかる問題の対処」を参考にさせていただきました。ありがとうございます。
3. 使い方
出力先ファイル(test00.csv)の1行目にカラムの説明を、2行目以降にrusleepコマンドのsleep時間を出力する例を示します。
3.1 実行方法
実行コマンド
python test-runner.py exporttest
3.2 スクリプトファイル
exporttest.csv
#
export,test00.csv,str,linenumber
export,test00.csv,str,timestamp
export,test00.csv,str,commandNumber
export,test00.csv,str,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,1
export,test00.csv,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,2
export,test00.csv,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,3
export,test00.csv,val
export,test00.csv,writerow
3.3 出力先ファイル
test00.csv
linenumber,timestamp,commandNumber,val
9,2022/09/25 14:45:32,1,2.898651484480436
16,2022/09/25 14:45:34,2,2.725171927338968
23,2022/09/25 14:45:37,3,2.3582335804867225
4. 実装
ソースコード全文は付録をご参照ください。
4.1 exportコマンド
def main():
#略
#exportfiles
#概要
# exportコマンドで使用するN行3列の二次元配列
#説明
# 以下の3つの要素を1行3桁の配列で構造化し、
# 1.ファイルオブジェクト(open()の戻り値)
# 2.Writerオブジェクト(csv.writer()の戻り値)
# 3.ファイルに出力するデータのバッファ
# 出力するファイルの数だけこの配列を行方向に追加する
#構造
# exportfiles = [
# [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
# [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
# ...
# ]
#初期値
# 空の1行3列の2次元配列で初期化する
exportfiles = [\
[ [],[],[] ]\
]
#略
#export,<filename.csv>,str,<string>
#export,<filename.csv>,linenumber
#export,<filename.csv>,timestamp
#export,<filename.csv>,val
#export,<filename.csv>,writerow
elif cmd[0] == "export":
#1)引数が3以上の場合に処理を行う
if len(cmd) >= 3:
#2)操作対象のファイルディスクリプタを決める
##2.1)操作対象のファイルディスクリプタを検索する
exportfile = None
i = 0 #exportfiles配列の行番号
# print(exportfiles)
for f in exportfiles:
if f[0]:
# print(f[0].name)
if f[0].name == cmd[1]:
exportfile = f
break
i = i + 1
# print("index " + str(i))
##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
if exportfile == None:
if i >= 1:
exportfiles.append([[],[],[]])
exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
exportfiles[i][1] = csv.writer(exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
# print(i, exportfiles)
if cmd[2] == "str":
# print("str " + exportfiles[i][0].name)
exportfiles[i][2].append(cmd[3])
elif cmd[2] == "linenumber":
# print("line " + exportfiles[i][0].name)
exportfiles[i][2].append(str(script_line))
elif cmd[2] == "timestamp":
# print("time " + exportfiles[i][0].name)
exportfiles[i][2].append(timestamp)
elif cmd[2] == "val":
# print("val " + exportfiles[i][0].name)
exportfiles[i][2].append(str(val))
elif cmd[2] == "writerow":
# print("wrow " + exportfiles[i][0].name)
# print(exportfiles)
exportfiles[i][1].writerow(exportfiles[i][2])
exportfiles[i][2].clear()
else:
print("export parameter error")
is_passed = False
else:
print("export parameter error")
is_passed = False
#略
else: #FAILで終了
#略
for f in exportfiles:
if f[0]:
print("close " + f[0].name)
f[0].close()
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
for f in exportfiles:
if f[0]:
print("close " + f[0].name)
f[0].close()
print("PASS")
sys.exit(0)
4.1.1 exportコマンド(旧バージョン)
ファイルオブジェクト(open()の戻り値)、Writerオブジェクト(csv.writer()の戻り値)、ファイルに出力するデータのバッファのそれぞれに個別の配列を使用していた旧バージョンです。
旧バージョン
def main():
#略
exportfiles= [] #file descripter for export command
exportfile = None #file descriptor for export command
exportcsvw = [[]] #csv.write return value array
exportdata = [[]] #export data array
#略
#export,<filename.csv>,str,<string>
#export,<filename.csv>,linenumber
#export,<filename.csv>,timestamp
#export,<filename.csv>,val
#export,<filename.csv>,writerow
elif cmd[0] == "export":
#1)引数が3以上の場合に処理を行う
if len(cmd) >= 3:
#2)操作対象のファイルディスクリプタを決める
##2.1)操作対象のファイルディスクリプタを検索する
exportfile = None
i = 0 #exportcsvw, exportdataのインデックス
for f in exportfiles:
# print("find " + f.name)
if f.name == cmd[1]:
exportfile = f
break
i = i + 1
# print(i)
##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
if exportfile == None:
exportfile = codecs.open(cmd[1], 'w', 'utf-8')
# print("open " + exportfile.name)
exportfiles.append(exportfile)
if i>=1:
exportcsvw.append([])
exportdata.append([])
exportcsvw[i] = csv.writer(exportfile, delimiter=',', lineterminator='\r\n', quotechar='"')
# print(exportfiles)
# print(exportcsvw)
# print(exportdata)
if cmd[2] == "str":
# print("str " + exportfile.name)
exportdata[i].append(cmd[3])
elif cmd[2] == "linenumber":
# print("line " + exportfile.name)
exportdata[i].append(str(script_line))
elif cmd[2] == "timestamp":
# print("time " + exportfile.name)
exportdata[i].append(timestamp)
elif cmd[2] == "val":
# print("val " + exportfile.name)
exportdata[i].append(str(val))
elif cmd[2] == "writerow":
# print("wrow " + exportfile.name)
# print(exportfiles)
# print(exportcsvw)
# print(exportdata)
exportcsvw[i].writerow(exportdata[i])
exportdata[i].clear()
else:
print("export parameter error")
is_passed = False
else:
print("export parameter error")
is_passed = False
#略
else: #FAILで終了
#略
for f in exportfiles:
print("close " + f.name)
f.close()
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
for f in exportfiles:
print("close " + f.name)
f.close()
print("PASS")
sys.exit(0)
4.2 rusleepコマンド
改修前
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
cmd.append(str(fval))
cmds += str(fval)
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
改修後
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
val = str(fval) #modified.
cmd.append(val) #modified.
cmds += val #modified.
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
5. おわりに
- もともと備えている実行結果ファイルresult.csvをパースすることなく実行結果をサクッと把握できるようになりました。
- 出力先のファイル名やファイル数を決め打ちにしたくなかったためちょっと凝った作りになっています。この分量だとモジュール化して別ファイルに分けた方がよいかも。
- Pythonはprint文で配列の中身が分かるのは便利ですね。デバッグがはかどりました。
付録A. ソースコード全文
importしているcamera.pyはこちらです。
test-runner.py
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart != UNINITIALIZED:
serial_write(uart, "servoread")
servo_position = uart.readline().strip().decode('utf-8')
print(servo_position)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
serial_write(uart, command)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
sleep(0.2) # sec.
return True
else:
print("UART Not Initialized.")
return False
def device_close(uart, cam):
close_uart(uart)
for i in range(len(cam)):
if cam[i].get_video_rec() == True:
cam[i].set_video_rec(False)
cam[i].thread.join()
else:
pass
#print("No Recording.")
cam[i].close_cam()
def main():
is_passed = True
val = str(UNINITIALIZED)
fval = 0.0
uart = UNINITIALIZED
dso = UNINITIALIZED
cam = UNINITIALIZED
ocr = UNINITIALIZED
cmds = ""
cam0 = Camera(0)
cam1 = Camera(1)
cam2 = Camera(2)
cam = [cam0, cam1, cam2]
#exportfiles
#概要
# exportコマンドで使用するN行3列の二次元配列
#説明
# 以下の3つの要素を1行3桁の配列で構造化し、
# 1.ファイルオブジェクト(open()の戻り値)
# 2.Writerオブジェクト(csv.writer()の戻り値)
# 3.ファイルに出力するデータのバッファ
# 出力するファイルの数だけこの配列を行方向に追加する
#構造
# exportfiles = [
# [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
# [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
# ...
# ]
#初期値
# 空の1行3列の2次元配列で初期化する
exportfiles = [\
[ [],[],[] ]\
]
if len(sys.argv) == 2:
script_file_name = sys.argv[1] + ".csv"
result_file_name = sys.argv[1] + "_result.csv"
else:
script_file_name = "script.csv"
result_file_name = "result.csv"
with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')
script_line = 0
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
script_line += 1
print("##### " + timestamp + " " + str(script_line) + " #####")
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
val = str(fval)
cmd.append(val)
cmds += val
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
elif cmd[0] == "prompt":
print("Enter y/n")
val = input()
val = val.lower()
cmd.append(val)
cmds += val
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(1)
if val != "y":
is_passed = False
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam[int(cmd[1])].open_cam(640, 480)
else:
cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
cam[int(cmd[1])].close_cam()
elif cmd[0] == "capture_cam":
ret = cam[int(cmd[1])].capture_cam(cmd[2])
if ret == False:
is_passed = False
elif cmd[0] == "rec_start":
if cam[int(cmd[1])].get_video_rec() == False:
cam[int(cmd[1])].set_video_rec(True)
cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
cam[int(cmd[1])].thread.start()
else:
print("Already Recording.")
elif cmd[0] == "rec_stop":
if cam[int(cmd[1])].get_video_rec() == True:
cam[int(cmd[1])].set_video_rec(False)
cam[int(cmd[1])].thread.join()
else:
print("No Recording.")
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "set_servo":
ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
if ret == False:
is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
is_passed = False
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
#export,<filename.csv>,str,<string>
#export,<filename.csv>,linenumber
#export,<filename.csv>,timestamp
#export,<filename.csv>,val
#export,<filename.csv>,writerow
elif cmd[0] == "export":
#1)引数が3以上の場合に処理を行う
if len(cmd) >= 3:
#2)操作対象のファイルディスクリプタを決める
##2.1)操作対象のファイルディスクリプタを検索する
exportfile = None
i = 0 #exportfiles配列の行番号
# print(exportfiles)
for f in exportfiles:
if f[0]:
# print(f[0].name)
if f[0].name == cmd[1]:
exportfile = f
break
i = i + 1
# print("index " + str(i))
##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
if exportfile == None:
if i >= 1:
exportfiles.append([[],[],[]])
exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
exportfiles[i][1] = csv.writer(exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
# print(i, exportfiles)
if cmd[2] == "str":
# print("str " + exportfiles[i][0].name)
exportfiles[i][2].append(cmd[3])
elif cmd[2] == "linenumber":
# print("line " + exportfiles[i][0].name)
exportfiles[i][2].append(str(script_line))
elif cmd[2] == "timestamp":
# print("time " + exportfiles[i][0].name)
exportfiles[i][2].append(timestamp)
elif cmd[2] == "val":
# print("val " + exportfiles[i][0].name)
exportfiles[i][2].append(str(val))
elif cmd[2] == "writerow":
# print("wrow " + exportfiles[i][0].name)
# print(exportfiles)
exportfiles[i][1].writerow(exportfiles[i][2])
exportfiles[i][2].clear()
else:
print("export parameter error")
is_passed = False
else:
print("export parameter error")
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
device_close(uart, cam)
for f in exportfiles:
if f[0]:
print("close " + f[0].name)
f[0].close()
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
for f in exportfiles:
if f[0]:
print("close " + f[0].name)
f[0].close()
print("PASS")
sys.exit(0)
main()
旧バージョン
test-runner.py(旧バージョン)
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart != UNINITIALIZED:
serial_write(uart, "servoread")
servo_position = uart.readline().strip().decode('utf-8')
print(servo_position)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
serial_write(uart, command)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
sleep(0.2) # sec.
return True
else:
print("UART Not Initialized.")
return False
def device_close(uart, cam):
close_uart(uart)
for i in range(len(cam)):
if cam[i].get_video_rec() == True:
cam[i].set_video_rec(False)
cam[i].thread.join()
else:
pass
#print("No Recording.")
cam[i].close_cam()
def main():
is_passed = True
val = str(UNINITIALIZED)
fval = 0.0
uart = UNINITIALIZED
dso = UNINITIALIZED
cam = UNINITIALIZED
ocr = UNINITIALIZED
cmds = ""
cam0 = Camera(0)
cam1 = Camera(1)
cam2 = Camera(2)
cam = [cam0, cam1, cam2]
exportfiles= [] #file descripter for export command
exportfile = None #file descriptor for export command
exportcsvw = [[]] #csv.write return value array
exportdata = [[]] #export data array
if len(sys.argv) == 2:
script_file_name = sys.argv[1] + ".csv"
result_file_name = sys.argv[1] + "_result.csv"
else:
script_file_name = "script.csv"
result_file_name = "result.csv"
with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')
script_line = 0
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
script_line += 1
print("##### " + timestamp + " " + str(script_line) + " #####")
cmds = str(script_line) + ":"
for i in range(len(cmd)):
cmds += cmd[i] + ","
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
val = str(fval)
cmd.append(val)
cmds += val
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(fval)
elif cmd[0] == "prompt":
print("Enter y/n")
val = input()
val = val.lower()
cmd.append(val)
cmds += val
for i in range(len(cam)):
cam[i].set_video_txt(cmds)
print(cmds)
sleep(1)
if val != "y":
is_passed = False
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam[int(cmd[1])].open_cam(640, 480)
else:
cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
cam[int(cmd[1])].close_cam()
elif cmd[0] == "capture_cam":
ret = cam[int(cmd[1])].capture_cam(cmd[2])
if ret == False:
is_passed = False
elif cmd[0] == "rec_start":
if cam[int(cmd[1])].get_video_rec() == False:
cam[int(cmd[1])].set_video_rec(True)
cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
cam[int(cmd[1])].thread.start()
else:
print("Already Recording.")
elif cmd[0] == "rec_stop":
if cam[int(cmd[1])].get_video_rec() == True:
cam[int(cmd[1])].set_video_rec(False)
cam[int(cmd[1])].thread.join()
else:
print("No Recording.")
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "set_servo":
ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
if ret == False:
is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
is_passed = False
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
#export,<filename.csv>,str,<string>
#export,<filename.csv>,linenumber
#export,<filename.csv>,timestamp
#export,<filename.csv>,val
#export,<filename.csv>,writerow
elif cmd[0] == "export":
#1)引数が3以上の場合に処理を行う
if len(cmd) >= 3:
#2)操作対象のファイルディスクリプタを決める
##2.1)操作対象のファイルディスクリプタを検索する
exportfile = None
i = 0 #exportcsvw, exportdataのインデックス
for f in exportfiles:
# print("find " + f.name)
if f.name == cmd[1]:
exportfile = f
break
i = i + 1
# print(i)
##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
if exportfile == None:
exportfile = codecs.open(cmd[1], 'w', 'utf-8')
# print("open " + exportfile.name)
exportfiles.append(exportfile)
if i>=1:
exportcsvw.append([])
exportdata.append([])
exportcsvw[i] = csv.writer(exportfile, delimiter=',', lineterminator='\r\n', quotechar='"')
# print(exportfiles)
# print(exportcsvw)
# print(exportdata)
if cmd[2] == "str":
# print("str " + exportfile.name)
exportdata[i].append(cmd[3])
elif cmd[2] == "linenumber":
# print("line " + exportfile.name)
exportdata[i].append(str(script_line))
elif cmd[2] == "timestamp":
# print("time " + exportfile.name)
exportdata[i].append(timestamp)
elif cmd[2] == "val":
# print("val " + exportfile.name)
exportdata[i].append(str(val))
elif cmd[2] == "writerow":
# print("wrow " + exportfile.name)
# print(exportfiles)
# print(exportcsvw)
# print(exportdata)
exportcsvw[i].writerow(exportdata[i])
exportdata[i].clear()
else:
print("export parameter error")
is_passed = False
else:
print("export parameter error")
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
device_close(uart, cam)
for f in exportfiles:
print("close " + f.name)
f.close()
print("FAIL")
sys.exit(1)
if is_passed == True:
device_close(uart, cam)
for f in exportfiles:
print("close " + f.name)
f.close()
print("PASS")
sys.exit(0)
main()
付録B. Lチカで始めるテスト自動化・記事一覧
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
- Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
- Lチカで始めるテスト自動化(7)タイムスタンプの保存
- Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
- Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
- Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
- Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
- Lチカで始めるテスト自動化(12)書籍化の作業メモ
- Lチカで始めるテスト自動化(13)外部プログラムの呼出し
- Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
- Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
- Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
- Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
- Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
- Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
- Lチカで始めるテスト自動化(20)複数のカメラ映像の同時録画
- Lチカで始めるテスト自動化(21)キーボード入力待ちの実装
- Lチカで始めるテスト自動化(22)DACをコマンドで制御してLチカする
- Lチカで始めるテスト自動化(23)ブレッドボード互換のユニバーサル基板