2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Lチカで始めるテスト自動化(24)コマンドの実行結果を任意のCSVファイルへ出力する

Last updated at Posted at 2022-09-25

1. はじめに

コマンドの実行結果を任意のCSVファイルに出力して結果をサクッと確認できるようにします。出力先のファイルは複数指定できます。

2. テストランナーの改修箇所

2.1 exportコマンドの追加

  • exportコマンドは、1)出力したいデータをカンマ区切りでバッファに並べ、2)バッファのデータをファイルに出力するコマンド群です。
  • 出力可能なデータは、1)任意の文字列、2)タイムスタンプ、3)コマンドの実行結果です。
  • ファイル名は".csv"込みで与えてください。
  • ファイルのopen、closeは自動で実行します。
コマンド 説明
export,<filename.csv>,str,<string> filename.csvのバッファに文字列stringを追加する
export,<filename.csv>,linenumber filename.csvのバッファにスクリプトの行番号を追加する
export,<filename.csv>,timestamp filename.csvのバッファにタイムスタンプを追加する
export,<filename.csv>,val filename.csvのバッファにコマンドの実行結果valを追加する
export,<filename.csv>,writerow バッファのデータをfilename.csvに出力しバッファをクリアする

2.1.1 timestamp

テストランナーがスクリプトファイルを1行ずつ読み込むたびに変数timestampに時刻を格納します。

参考
for cmd in script:
    timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")

2.1.2 val

テストランナーがコマンドを実行するとコマンドによっては実行結果を変数valに格納します。

参考
elif cmd[0] == "rcvd":
    try:
        val = uart.readline().strip().decode('utf-8')
        cmd.append(val)
    except:
        is_passed = False

2.2 rusleepコマンドの修正

sleepの時間をランダムに設定する"rusleep"コマンドを改修しsleep時間をvalに格納するようにします。

2.3 Windows環境でUSB接続のWebカメラの起動時間を短縮

【Python】opencvでWebカメラの起動に時間がかかる問題の対処」を参考にさせていただきました。ありがとうございます。

3. 使い方

出力先ファイル(test00.csv)の1行目にカラムの説明を、2行目以降にrusleepコマンドのsleep時間を出力する例を示します。

3.1 実行方法

実行コマンド
python test-runner.py exporttest

3.2 スクリプトファイル

exporttest.csv
#
export,test00.csv,str,linenumber
export,test00.csv,str,timestamp
export,test00.csv,str,commandNumber
export,test00.csv,str,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,1
export,test00.csv,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,2
export,test00.csv,val
export,test00.csv,writerow
#
rusleep,1,3
export,test00.csv,linenumber
export,test00.csv,timestamp
export,test00.csv,str,3
export,test00.csv,val
export,test00.csv,writerow

3.3 出力先ファイル

test00.csv
linenumber,timestamp,commandNumber,val
9,2022/09/25 14:45:32,1,2.898651484480436
16,2022/09/25 14:45:34,2,2.725171927338968
23,2022/09/25 14:45:37,3,2.3582335804867225

4. 実装

ソースコード全文は付録をご参照ください。

4.1 exportコマンド

def main():
#略
    #exportfiles
    #概要
    #  exportコマンドで使用するN行3列の二次元配列
    #説明
    #  以下の3つの要素を1行3桁の配列で構造化し、
    #    1.ファイルオブジェクト(open()の戻り値)
    #    2.Writerオブジェクト(csv.writer()の戻り値)
    #    3.ファイルに出力するデータのバッファ
    #  出力するファイルの数だけこの配列を行方向に追加する
    #構造
    #  exportfiles = [ 
    #                 [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
    #                 [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
    #                 ...
    #                ]
    #初期値
    #  空の1行3列の2次元配列で初期化する
    exportfiles = [\
                   [ [],[],[] ]\
                  ]
#略
                #export,<filename.csv>,str,<string>
                #export,<filename.csv>,linenumber
                #export,<filename.csv>,timestamp
                #export,<filename.csv>,val
                #export,<filename.csv>,writerow
                elif cmd[0] == "export":
                    #1)引数が3以上の場合に処理を行う
                    if len(cmd) >= 3:
                        #2)操作対象のファイルディスクリプタを決める
                        ##2.1)操作対象のファイルディスクリプタを検索する
                        exportfile = None
                        i = 0 #exportfiles配列の行番号
#                        print(exportfiles)
                        for f in exportfiles:
                            if f[0]:
#                                print(f[0].name)
                                if f[0].name == cmd[1]:
                                    exportfile = f
                                    break
                                i = i + 1

#                        print("index " + str(i))

                        ##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
                        if exportfile == None:
                            if i >= 1:
                                exportfiles.append([[],[],[]])
                            exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
                            exportfiles[i][1] = csv.writer(exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
#                            print(i, exportfiles)

                        if cmd[2] == "str":
#                            print("str  " + exportfiles[i][0].name)
                            exportfiles[i][2].append(cmd[3])

                        elif cmd[2] == "linenumber":
#                            print("line " + exportfiles[i][0].name)
                            exportfiles[i][2].append(str(script_line))

                        elif cmd[2] == "timestamp":
#                            print("time " + exportfiles[i][0].name)
                            exportfiles[i][2].append(timestamp)

                        elif cmd[2] == "val":
#                            print("val  " + exportfiles[i][0].name)
                            exportfiles[i][2].append(str(val))

                        elif cmd[2] == "writerow":
#                            print("wrow " + exportfiles[i][0].name)
#                            print(exportfiles)
                            exportfiles[i][1].writerow(exportfiles[i][2])
                            exportfiles[i][2].clear()

                        else:
                            print("export parameter error")
                            is_passed = False

                    else:
                        print("export parameter error")
                        is_passed = False
#略
                else: #FAILで終了
#略
                    for f in exportfiles:
                        if f[0]:
                            print("close " + f[0].name)
                            f[0].close()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        device_close(uart, cam)
        for f in exportfiles:
            if f[0]:
                print("close " + f[0].name)
                f[0].close()
        print("PASS")
        sys.exit(0)

4.1.1 exportコマンド(旧バージョン)

ファイルオブジェクト(open()の戻り値)、Writerオブジェクト(csv.writer()の戻り値)、ファイルに出力するデータのバッファのそれぞれに個別の配列を使用していた旧バージョンです。

旧バージョン
def main():
#略
    exportfiles= []   #file descripter for export command
    exportfile = None #file descriptor for export command
    exportcsvw = [[]] #csv.write return value array
    exportdata = [[]] #export data array
#略
                #export,<filename.csv>,str,<string>
                #export,<filename.csv>,linenumber
                #export,<filename.csv>,timestamp
                #export,<filename.csv>,val
                #export,<filename.csv>,writerow
                elif cmd[0] == "export":
                    #1)引数が3以上の場合に処理を行う
                    if len(cmd) >= 3:
                        #2)操作対象のファイルディスクリプタを決める
                        ##2.1)操作対象のファイルディスクリプタを検索する
                        exportfile = None
                        i = 0 #exportcsvw, exportdataのインデックス
                        for f in exportfiles:
#                            print("find " + f.name)
                            if f.name == cmd[1]:
                                exportfile = f
                                break
                            i = i + 1

#                        print(i)

                        ##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
                        if exportfile == None:
                            exportfile = codecs.open(cmd[1], 'w', 'utf-8')
#                            print("open " + exportfile.name)
                            exportfiles.append(exportfile)
                            if i>=1:
                                exportcsvw.append([])
                                exportdata.append([])
                            exportcsvw[i] = csv.writer(exportfile, delimiter=',', lineterminator='\r\n', quotechar='"')
#                            print(exportfiles)
#                            print(exportcsvw)
#                            print(exportdata)

                        if cmd[2] == "str":
#                            print("str  " + exportfile.name)
                            exportdata[i].append(cmd[3])

                        elif cmd[2] == "linenumber":
#                            print("line " + exportfile.name)
                            exportdata[i].append(str(script_line))

                        elif cmd[2] == "timestamp":
#                            print("time " + exportfile.name)
                            exportdata[i].append(timestamp)

                        elif cmd[2] == "val":
#                            print("val  " + exportfile.name)
                            exportdata[i].append(str(val))

                        elif cmd[2] == "writerow":
#                            print("wrow " + exportfile.name)
#                            print(exportfiles)
#                            print(exportcsvw)
#                            print(exportdata)
                            exportcsvw[i].writerow(exportdata[i])
                            exportdata[i].clear()

                        else:
                            print("export parameter error")
                            is_passed = False

                    else:
                        print("export parameter error")
                        is_passed = False
#略
                else: #FAILで終了
#略
                    for f in exportfiles:
                        print("close " + f.name)
                        f.close()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        device_close(uart, cam)
        for f in exportfiles:
            print("close " + f.name)
            f.close()
        print("PASS")
        sys.exit(0)

4.2 rusleepコマンド

改修前
                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    cmd.append(str(fval))
                    cmds += str(fval)
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(fval)
改修後
                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    val = str(fval) #modified.
                    cmd.append(val) #modified.
                    cmds += val     #modified.
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(fval)

5. おわりに

  • もともと備えている実行結果ファイルresult.csvをパースすることなく実行結果をサクッと把握できるようになりました。
  • 出力先のファイル名やファイル数を決め打ちにしたくなかったためちょっと凝った作りになっています。この分量だとモジュール化して別ファイルに分けた方がよいかも。
  • Pythonはprint文で配列の中身が分かるのは便利ですね。デバッグがはかどりました。

付録A. ソースコード全文

importしているcamera.pyはこちらです。

test-runner.py
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa

#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2

from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

def serial_write(h, string):
    if h != UNINITIALIZED:
        string = string + '\n'
        string = str.encode(string)
        h.write(string)
        return True
    else:
        print("UART Not Initialized.")
        return False

def close_uart(h):
    if h != UNINITIALIZED:
        h.close()
    else:
        #print("UART Not Initialized.")
        pass

def open_dso():
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        try:
            dso = rm.open_resource(resource)
        except:
            print(resource, "Not Found.")
        else:
            print(resource, "Detected.")
            return dso

    #Throw an error to caller if none succeed.
    return dso

def crop_img(filename_in, v, h, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    v0 = int(v.split(':')[0])
    v1 = int(v.split(':')[1])
    h0 = int(h.split(':')[0])
    h1 = int(h.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False

    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False

    if uart != UNINITIALIZED:
        serial_write(uart, "servoread")

        servo_position = uart.readline().strip().decode('utf-8')
        print(servo_position)

        # discard "OK"
        devnull = uart.readline().strip().decode('utf-8')

        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)

        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1

        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            serial_write(uart, command)
            # discard "OK"
            devnull = uart.readline().strip().decode('utf-8')
            sleep(0.2) # sec.

        return True
    else:
        print("UART Not Initialized.")
        return False

def device_close(uart, cam):
    close_uart(uart)

    for i in range(len(cam)):
        if cam[i].get_video_rec() == True:
            cam[i].set_video_rec(False)
            cam[i].thread.join()
        else:
            pass
            #print("No Recording.")
        cam[i].close_cam()

def main():
    is_passed = True
    val = str(UNINITIALIZED)
    fval = 0.0
    uart = UNINITIALIZED
    dso = UNINITIALIZED
    cam = UNINITIALIZED
    ocr = UNINITIALIZED
    cmds = ""
    cam0 = Camera(0)
    cam1 = Camera(1)
    cam2 = Camera(2)
    cam  = [cam0, cam1, cam2]

    #exportfiles
    #概要
    #  exportコマンドで使用するN行3列の二次元配列
    #説明
    #  以下の3つの要素を1行3桁の配列で構造化し、
    #    1.ファイルオブジェクト(open()の戻り値)
    #    2.Writerオブジェクト(csv.writer()の戻り値)
    #    3.ファイルに出力するデータのバッファ
    #  出力するファイルの数だけこの配列を行方向に追加する
    #構造
    #  exportfiles = [ 
    #                 [ [FileObject],[WriterObject],[Buffer] ], #1つめの出力ファイルの配列
    #                 [ [FileObject],[WriterObject],[Buffer] ], #2つめの出力ファイルの配列
    #                 ...
    #                ]
    #初期値
    #  空の1行3列の2次元配列で初期化する
    exportfiles = [\
                   [ [],[],[] ]\
                  ]

    if len(sys.argv) == 2:
        script_file_name = sys.argv[1] + ".csv"
        result_file_name = sys.argv[1] + "_result.csv"
    else:
        script_file_name = "script.csv"
        result_file_name = "result.csv"

    with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
        script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')

        with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
            result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')

            script_line = 0
            for cmd in script:
                timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
                script_line += 1
                print("##### " + timestamp + " " + str(script_line) + " #####")

                cmds = str(script_line) + ":"
                for i in range(len(cmd)):
                    cmds += cmd[i] + ","
                for i in range(len(cam)):
                    cam[i].set_video_txt(cmds)
                print(cmds)

                if "#" in cmd[0]:
                    pass

                elif cmd[0] == "sleep":
                    sleep(float(cmd[1]))

                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    val = str(fval)
                    cmd.append(val)
                    cmds += val
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(fval)

                elif cmd[0] == "prompt":
                    print("Enter y/n")
                    val = input()
                    val = val.lower()
                    cmd.append(val)
                    cmds += val
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(1)
                    if val != "y":
                        is_passed = False

                elif cmd[0] == "open_uart":
                    if len(cmd) == 2:
                        dsrdtr_val = 1
                    else:
                        dsrdtr_val = int(cmd[2])
                    try:
                        uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
                    except:
                        is_passed = False

                elif cmd[0] == "send":
                    ret = serial_write(uart, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rcvd":
                    try:
                        val = uart.readline().strip().decode('utf-8')
                        cmd.append(val)
                    except:
                        is_passed = False

                elif cmd[0] == "open_dso":
                    try:
                        dso = open_dso()
                    except:
                        is_passed = False

                elif cmd[0] == "dso":
                    try:
                        if "?" in cmd[1]:
                            val = dso.query(cmd[1]).rstrip().replace(",", "-")
                            cmd.append(val)
                        else:
                            dso.write(cmd[1])
                    except:
                        is_passed = False

                elif cmd[0] == "open_cam":
                    if len(cmd) == 2:
                        cam[int(cmd[1])].open_cam(640, 480)
                    else:
                        cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))

                elif cmd[0] == "close_cam":
                    cam[int(cmd[1])].close_cam()

                elif cmd[0] == "capture_cam":
                    ret = cam[int(cmd[1])].capture_cam(cmd[2])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rec_start":
                    if cam[int(cmd[1])].get_video_rec() == False:
                        cam[int(cmd[1])].set_video_rec(True)
                        cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
                        cam[int(cmd[1])].thread.start()
                    else:
                        print("Already Recording.")

                elif cmd[0] == "rec_stop":
                    if cam[int(cmd[1])].get_video_rec() == True:
                        cam[int(cmd[1])].set_video_rec(False)
                        cam[int(cmd[1])].thread.join()
                    else:
                        print("No Recording.")

                elif cmd[0] == "crop_img":
                    crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

                elif cmd[0] == "open_ocr":
                    ocr = open_ocr()
                    if ocr == UNINITIALIZED:
                        is_passed = False

                elif cmd[0] == "exec_ocr":
                    try:
                        val = exec_ocr(ocr, cmd[1])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "exec_labelimg":
                    try:
                        val = exec_labelimg(cmd[1], cmd[2])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "set_servo":
                    ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

                elif cmd[0] == "eval_str_eq":
                    if str(val) != str(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_eq":
                    if int(val) != int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_gt":
                    if int(val) < int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_lt":
                    if int(val) > int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_eq":
                    if float(val) != float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_gt":
                    if float(val) < float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_lt":
                    if float(val) > float(cmd[1]):
                        is_passed = False

                #export,<filename.csv>,str,<string>
                #export,<filename.csv>,linenumber
                #export,<filename.csv>,timestamp
                #export,<filename.csv>,val
                #export,<filename.csv>,writerow
                elif cmd[0] == "export":
                    #1)引数が3以上の場合に処理を行う
                    if len(cmd) >= 3:
                        #2)操作対象のファイルディスクリプタを決める
                        ##2.1)操作対象のファイルディスクリプタを検索する
                        exportfile = None
                        i = 0 #exportfiles配列の行番号
#                        print(exportfiles)
                        for f in exportfiles:
                            if f[0]:
#                                print(f[0].name)
                                if f[0].name == cmd[1]:
                                    exportfile = f
                                    break
                                i = i + 1

#                        print("index " + str(i))

                        ##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
                        if exportfile == None:
                            if i >= 1:
                                exportfiles.append([[],[],[]])
                            exportfiles[i][0] = codecs.open(cmd[1], 'w', 'utf-8')
                            exportfiles[i][1] = csv.writer(exportfiles[i][0], delimiter=',', lineterminator='\r\n', quotechar='"')
#                            print(i, exportfiles)

                        if cmd[2] == "str":
#                            print("str  " + exportfiles[i][0].name)
                            exportfiles[i][2].append(cmd[3])

                        elif cmd[2] == "linenumber":
#                            print("line " + exportfiles[i][0].name)
                            exportfiles[i][2].append(str(script_line))

                        elif cmd[2] == "timestamp":
#                            print("time " + exportfiles[i][0].name)
                            exportfiles[i][2].append(timestamp)

                        elif cmd[2] == "val":
#                            print("val  " + exportfiles[i][0].name)
                            exportfiles[i][2].append(str(val))

                        elif cmd[2] == "writerow":
#                            print("wrow " + exportfiles[i][0].name)
#                            print(exportfiles)
                            exportfiles[i][1].writerow(exportfiles[i][2])
                            exportfiles[i][2].clear()

                        else:
                            print("export parameter error")
                            is_passed = False

                    else:
                        print("export parameter error")
                        is_passed = False

                else:
                    cmd.append("#")

                if is_passed == True:
                    cmd.append("OK")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                else:
                    cmd.append("NG")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                    device_close(uart, cam)
                    for f in exportfiles:
                        if f[0]:
                            print("close " + f[0].name)
                            f[0].close()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        device_close(uart, cam)
        for f in exportfiles:
            if f[0]:
                print("close " + f[0].name)
                f[0].close()
        print("PASS")
        sys.exit(0)

main()
旧バージョン
test-runner.py(旧バージョン)
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import random
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa

#【Python】opencvでWebカメラの起動に時間がかかる問題の対処
# https://qiita.com/youichi_io/items/b894b85d790720ea2346
import os
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2

from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE
import threading
from camera import Camera

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

def serial_write(h, string):
    if h != UNINITIALIZED:
        string = string + '\n'
        string = str.encode(string)
        h.write(string)
        return True
    else:
        print("UART Not Initialized.")
        return False

def close_uart(h):
    if h != UNINITIALIZED:
        h.close()
    else:
        #print("UART Not Initialized.")
        pass

def open_dso():
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        try:
            dso = rm.open_resource(resource)
        except:
            print(resource, "Not Found.")
        else:
            print(resource, "Detected.")
            return dso

    #Throw an error to caller if none succeed.
    return dso

def crop_img(filename_in, v, h, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    v0 = int(v.split(':')[0])
    v1 = int(v.split(':')[1])
    h0 = int(h.split(':')[0])
    h1 = int(h.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False

    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False

    if uart != UNINITIALIZED:
        serial_write(uart, "servoread")

        servo_position = uart.readline().strip().decode('utf-8')
        print(servo_position)

        # discard "OK"
        devnull = uart.readline().strip().decode('utf-8')

        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)

        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1

        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            serial_write(uart, command)
            # discard "OK"
            devnull = uart.readline().strip().decode('utf-8')
            sleep(0.2) # sec.

        return True
    else:
        print("UART Not Initialized.")
        return False

def device_close(uart, cam):
    close_uart(uart)

    for i in range(len(cam)):
        if cam[i].get_video_rec() == True:
            cam[i].set_video_rec(False)
            cam[i].thread.join()
        else:
            pass
            #print("No Recording.")
        cam[i].close_cam()

def main():
    is_passed = True
    val = str(UNINITIALIZED)
    fval = 0.0
    uart = UNINITIALIZED
    dso = UNINITIALIZED
    cam = UNINITIALIZED
    ocr = UNINITIALIZED
    cmds = ""
    cam0 = Camera(0)
    cam1 = Camera(1)
    cam2 = Camera(2)
    cam  = [cam0, cam1, cam2]
    exportfiles= []   #file descripter for export command
    exportfile = None #file descriptor for export command
    exportcsvw = [[]] #csv.write return value array
    exportdata = [[]] #export data array

    if len(sys.argv) == 2:
        script_file_name = sys.argv[1] + ".csv"
        result_file_name = sys.argv[1] + "_result.csv"
    else:
        script_file_name = "script.csv"
        result_file_name = "result.csv"

    with codecs.open(script_file_name, 'r', 'utf-8') as file_script:
        script = csv.reader(file_script, delimiter=',', lineterminator='\r\n', quotechar='"')

        with codecs.open(result_file_name, 'w', 'utf-8') as file_result:
            result = csv.writer(file_result, delimiter=',', lineterminator='\r\n', quotechar='"')

            script_line = 0
            for cmd in script:
                timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
                script_line += 1
                print("##### " + timestamp + " " + str(script_line) + " #####")

                cmds = str(script_line) + ":"
                for i in range(len(cmd)):
                    cmds += cmd[i] + ","
                for i in range(len(cam)):
                    cam[i].set_video_txt(cmds)
                print(cmds)

                if "#" in cmd[0]:
                    pass

                elif cmd[0] == "sleep":
                    sleep(float(cmd[1]))

                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    val = str(fval)
                    cmd.append(val)
                    cmds += val
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(fval)

                elif cmd[0] == "prompt":
                    print("Enter y/n")
                    val = input()
                    val = val.lower()
                    cmd.append(val)
                    cmds += val
                    for i in range(len(cam)):
                        cam[i].set_video_txt(cmds)
                    print(cmds)
                    sleep(1)
                    if val != "y":
                        is_passed = False

                elif cmd[0] == "open_uart":
                    if len(cmd) == 2:
                        dsrdtr_val = 1
                    else:
                        dsrdtr_val = int(cmd[2])
                    try:
                        uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
                    except:
                        is_passed = False

                elif cmd[0] == "send":
                    ret = serial_write(uart, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rcvd":
                    try:
                        val = uart.readline().strip().decode('utf-8')
                        cmd.append(val)
                    except:
                        is_passed = False

                elif cmd[0] == "open_dso":
                    try:
                        dso = open_dso()
                    except:
                        is_passed = False

                elif cmd[0] == "dso":
                    try:
                        if "?" in cmd[1]:
                            val = dso.query(cmd[1]).rstrip().replace(",", "-")
                            cmd.append(val)
                        else:
                            dso.write(cmd[1])
                    except:
                        is_passed = False

                elif cmd[0] == "open_cam":
                    if len(cmd) == 2:
                        cam[int(cmd[1])].open_cam(640, 480)
                    else:
                        cam[int(cmd[1])].open_cam(int(cmd[2]), int(cmd[3]))

                elif cmd[0] == "close_cam":
                    cam[int(cmd[1])].close_cam()

                elif cmd[0] == "capture_cam":
                    ret = cam[int(cmd[1])].capture_cam(cmd[2])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rec_start":
                    if cam[int(cmd[1])].get_video_rec() == False:
                        cam[int(cmd[1])].set_video_rec(True)
                        cam[int(cmd[1])].thread = threading.Thread(target=cam[int(cmd[1])].video_record, args=(cmd[2],))
                        cam[int(cmd[1])].thread.start()
                    else:
                        print("Already Recording.")

                elif cmd[0] == "rec_stop":
                    if cam[int(cmd[1])].get_video_rec() == True:
                        cam[int(cmd[1])].set_video_rec(False)
                        cam[int(cmd[1])].thread.join()
                    else:
                        print("No Recording.")

                elif cmd[0] == "crop_img":
                    crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

                elif cmd[0] == "open_ocr":
                    ocr = open_ocr()
                    if ocr == UNINITIALIZED:
                        is_passed = False

                elif cmd[0] == "exec_ocr":
                    try:
                        val = exec_ocr(ocr, cmd[1])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "exec_labelimg":
                    try:
                        val = exec_labelimg(cmd[1], cmd[2])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "set_servo":
                    ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

                elif cmd[0] == "eval_str_eq":
                    if str(val) != str(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_eq":
                    if int(val) != int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_gt":
                    if int(val) < int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_lt":
                    if int(val) > int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_eq":
                    if float(val) != float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_gt":
                    if float(val) < float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_lt":
                    if float(val) > float(cmd[1]):
                        is_passed = False

                #export,<filename.csv>,str,<string>
                #export,<filename.csv>,linenumber
                #export,<filename.csv>,timestamp
                #export,<filename.csv>,val
                #export,<filename.csv>,writerow
                elif cmd[0] == "export":
                    #1)引数が3以上の場合に処理を行う
                    if len(cmd) >= 3:
                        #2)操作対象のファイルディスクリプタを決める
                        ##2.1)操作対象のファイルディスクリプタを検索する
                        exportfile = None
                        i = 0 #exportcsvw, exportdataのインデックス
                        for f in exportfiles:
#                            print("find " + f.name)
                            if f.name == cmd[1]:
                                exportfile = f
                                break
                            i = i + 1

#                        print(i)

                        ##2.2)操作対象のファイルディスクリプタがない場合は自動でopenする
                        if exportfile == None:
                            exportfile = codecs.open(cmd[1], 'w', 'utf-8')
#                            print("open " + exportfile.name)
                            exportfiles.append(exportfile)
                            if i>=1:
                                exportcsvw.append([])
                                exportdata.append([])
                            exportcsvw[i] = csv.writer(exportfile, delimiter=',', lineterminator='\r\n', quotechar='"')
#                            print(exportfiles)
#                            print(exportcsvw)
#                            print(exportdata)

                        if cmd[2] == "str":
#                            print("str  " + exportfile.name)
                            exportdata[i].append(cmd[3])

                        elif cmd[2] == "linenumber":
#                            print("line " + exportfile.name)
                            exportdata[i].append(str(script_line))

                        elif cmd[2] == "timestamp":
#                            print("time " + exportfile.name)
                            exportdata[i].append(timestamp)

                        elif cmd[2] == "val":
#                            print("val  " + exportfile.name)
                            exportdata[i].append(str(val))

                        elif cmd[2] == "writerow":
#                            print("wrow " + exportfile.name)
#                            print(exportfiles)
#                            print(exportcsvw)
#                            print(exportdata)
                            exportcsvw[i].writerow(exportdata[i])
                            exportdata[i].clear()

                        else:
                            print("export parameter error")
                            is_passed = False

                    else:
                        print("export parameter error")
                        is_passed = False

                else:
                    cmd.append("#")

                if is_passed == True:
                    cmd.append("OK")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                else:
                    cmd.append("NG")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                    device_close(uart, cam)
                    for f in exportfiles:
                        print("close " + f.name)
                        f.close()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        device_close(uart, cam)
        for f in exportfiles:
            print("close " + f.name)
            f.close()
        print("PASS")
        sys.exit(0)

main()

付録B. Lチカで始めるテスト自動化・記事一覧

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
  3. Lチカで始めるテスト自動化(3)オシロスコープの組込み
  4. Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
  5. Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
  6. Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
  7. Lチカで始めるテスト自動化(7)タイムスタンプの保存
  8. Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
  9. Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
  10. Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
  11. Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
  12. Lチカで始めるテスト自動化(12)書籍化の作業メモ
  13. Lチカで始めるテスト自動化(13)外部プログラムの呼出し
  14. Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
  15. Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
  16. Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する
  17. Lチカで始めるテスト自動化(17)コマンド制御のBLE Keyboard & MouseをM5Stackで製作しiOSアプリをテストスクリプトで操作する
  18. Lチカで始めるテスト自動化(18)秋月のIoT学習HATキットの圧電ブザーでテスト終了時にpass/failに応じてメロディを流す
  19. Lチカで始めるテスト自動化(19)Webカメラの映像を録画しながらテストスクリプトを実行する
  20. Lチカで始めるテスト自動化(20)複数のカメラ映像の同時録画
  21. Lチカで始めるテスト自動化(21)キーボード入力待ちの実装
  22. Lチカで始めるテスト自動化(22)DACをコマンドで制御してLチカする
  23. Lチカで始めるテスト自動化(23)ブレッドボード互換のユニバーサル基板

電子書籍化したものを技術書典ka'sらぼで頒布しています。

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化 -2- リレー駆動回路の設計 (※書き下ろし)
  3. Lチカで始めるテスト自動化 -3- Raspberry Pi Zero WHとIoT学習HATキットで作るテストランナー
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?