0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Lチカで始めるテスト自動化(13)外部プログラムの呼出し

Last updated at Posted at 2020-11-14

##1. はじめに
Lチカで始めるテスト自動化シリーズ第十三弾です。

前処理や後処理などで外部プログラムを呼び出せるようテストランナーを改修し、Pythonで実装した"Hello, World."プログラムを動作サンプルとして実行します。

##2. 作るもの
下記3点を作成します。

  1. 外部プログラム(helloworld.py)
  2. テストスクリプト(helloworld_pass.csv、helloworld_fail.csv)
  3. テストランナー(test-runner.py)

###2.1 外部プログラム
テストランナーが呼び出す外部プログラムが任意の個数の引数を持てることを確認できるよう、わざと2個の引数を持つようにしています。

  • 引数1:終了ステータス(0または1)
  • 引数2:任意の文字列

このhelloworld.pyは"Hello, World."と引数2で与えられた文字列をprintし、引数1で与えられた終了ステータスを返して終了します。

helloworld.py
import sys

print("Hello, World.")
print(sys.argv[2])

if int(sys.argv[1])==0:
    status = 0
else:
    status = 1

sys.exit(status)

###2.2 テストスクリプト
外部プログラムの呼出しコマンドを"run"としました。
CSVファイルの2カラム目に、テストランナーが呼び出す外部プログラムと引数を記述します。
テストランナーの動作確認のためfailするものも作成します。

helloworld_pass.csv
run,python helloworld.py 0 expected=pass
helloworld_fail.csv
run,python helloworld.py 1 expected=fail

###2.3 テストランナーの改修
テストスクリプトの1カラム目が"run"の場合に、2カラム目をsubprocess.run()へ渡します。また、returncodeを見てPass/Fail判定も行います。

test-runner.py
import subprocess
from subprocess import PIPE

def main():

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

##3. 実行結果
実行環境はWindows10 64bit(1909)+Anaconda Python 3.6.9です。

runコマンドの引数で与えたプログラムが実行され、終了ステータスに応じてPass/Fail判定できました。

>python test-runner.py helloworld_pass
['run', 'python helloworld.py 0 expected=pass']
CompletedProcess(args='python helloworld.py 0 expected=pass', returncode=0, stdout='Hello, World.\nexpected=pass\n', stderr='')
['2020/11/29 01:28:49', 'run', 'python helloworld.py 0 expected=pass', 'OK']
PASS
>python test-runner.py helloworld_fail
['run', 'python helloworld.py 1 expected=fail']
CompletedProcess(args='python helloworld.py 1 expected=fail', returncode=1, stdout='Hello, World.\nexpected=fail\n', stderr='')
['2020/11/29 01:29:52', 'run', 'python helloworld.py 1 expected=fail', 'NG']
FAIL

##付録A. test-runner.pyのソース

test-runner.py
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import sys
import codecs
import csv
import datetime
import serial
import pyvisa as visa
import cv2
from PIL import Image
import pyocr
import pyocr.builders
import platform
import subprocess
from subprocess import PIPE

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

def serial_write(h, string):
    if h != UNINITIALIZED:
        string = string + '\n'
        string = str.encode(string)
        h.write(string)
        return True
    else:
        print("UART Not Initialized.")
        return False

def close_uart(h):
    if h != UNINITIALIZED:
        h.close()
    else:
        #print("UART Not Initialized.")
        pass

def open_dso():
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        try:
            dso = rm.open_resource(resource)
        except:
            print(resource, "Not Found.")
        else:
            print(resource, "Detected.")
            return dso

    #Throw an error to caller if none succeed.
    return dso

def open_cam(camera_number, width, height):
    h = cv2.VideoCapture(camera_number)
    h.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    h.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    return h

def close_cam(cam):
    if cam != UNINITIALIZED:
        cam.release()

def capture_cam(cam, filename):
    if cam != UNINITIALIZED:
        _, img = cam.read()
        cv2.imwrite(filename, img)
        return True
    else:
        print("CAM Not Ready.")
        return False

def crop_img(filename_in, v, h, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    v0 = int(v.split(':')[0])
    v1 = int(v.split(':')[1])
    h0 = int(h.split(':')[0])
    h1 = int(h.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False
    
    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False
    
    if uart != UNINITIALIZED:
        serial_write(uart, "servoread")
        
        servo_position = uart.readline().strip().decode('utf-8')
        print(servo_position)
        
        # discard "OK"
        devnull = uart.readline().strip().decode('utf-8')
        
        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)
        
        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1
            
        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            serial_write(uart, command)
            # discard "OK"
            devnull = uart.readline().strip().decode('utf-8')
            sleep(0.2) # sec.
            
        return True
    else:
        print("UART Not Initialized.")
        return False

def main():
    is_passed = True
    val = str(UNINITIALIZED)
    uart = UNINITIALIZED
    dso = UNINITIALIZED
    cam = UNINITIALIZED
    ocr = UNINITIALIZED

    if len(sys.argv) == 2:
        script_file_name = sys.argv[1] + ".csv"
        result_file_name = sys.argv[1] + "_result.csv"
    else:
        script_file_name = "script.csv"
        result_file_name = "result.csv"

    with codecs.open(script_file_name, 'r', 'utf-8') as file:
        script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')

        with codecs.open(result_file_name, 'w', 'utf-8') as file:
            result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')

            for cmd in script:
                timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
                print(cmd)

                if "#" in cmd[0]:
                    pass

                elif cmd[0] == "sleep":
                    sleep(float(cmd[1]))

                elif cmd[0] == "open_uart":
                    if len(cmd) == 2:
                        dsrdtr_val = 1
                    else:
                        dsrdtr_val = int(cmd[2])
                    try:
                        uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
                    except:
                        is_passed = False

                elif cmd[0] == "send":
                    ret = serial_write(uart, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rcvd":
                    try:
                        val = uart.readline().strip().decode('utf-8')
                        cmd.append(val)
                    except:
                        is_passed = False

                elif cmd[0] == "open_dso":
                    try:
                        dso = open_dso()
                    except:
                        is_passed = False

                elif cmd[0] == "dso":
                    try:
                        if "?" in cmd[1]:
                            val = dso.query(cmd[1]).rstrip().replace(",", "-")
                            cmd.append(val)
                        else:
                            dso.write(cmd[1])
                    except:
                        is_passed = False

                elif cmd[0] == "open_cam":
                    if len(cmd) == 2:
                        cam = open_cam(int(cmd[1]), 640, 480)
                    else:
                        cam = open_cam(int(cmd[1]), int(cmd[2]), int(cmd[3]))

                elif cmd[0] == "close_cam":
                    close_cam(cam)
                    cam = UNINITIALIZED

                elif cmd[0] == "capture_cam":
                    ret = capture_cam(cam, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "crop_img":
                    crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

                elif cmd[0] == "open_ocr":
                    ocr = open_ocr()
                    if ocr == UNINITIALIZED:
                        is_passed = False

                elif cmd[0] == "exec_ocr":
                    try:
                        val = exec_ocr(ocr, cmd[1])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "exec_labelimg":
                    try:
                        val = exec_labelimg(cmd[1], cmd[2])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "set_servo":
                    ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

                elif cmd[0] == "eval_str_eq":
                    if str(val) != str(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_eq":
                    if int(val) != int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_gt":
                    if int(val) < int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_lt":
                    if int(val) > int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_eq":
                    if float(val) != float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_gt":
                    if float(val) < float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_lt":
                    if float(val) > float(cmd[1]):
                        is_passed = False

                else:
                    cmd.append("#")

                if is_passed == True:
                    cmd.append("OK")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                else:
                    cmd.append("NG")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                    close_uart(uart)
                    close_cam(cam)
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        close_uart(uart)
        close_cam(cam)
        print("PASS")
        sys.exit(0)

main()

##付録B. Lチカで始めるテスト自動化・記事一覧

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
  3. Lチカで始めるテスト自動化(3)オシロスコープの組込み
  4. Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
  5. Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
  6. Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
  7. Lチカで始めるテスト自動化(7)タイムスタンプの保存
  8. Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
  9. Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
  10. Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
  11. Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
  12. Lチカで始めるテスト自動化(12)書籍化の作業メモ

Lチカで始めるテスト自動化1~11を電子書籍化し技術書典で頒布しています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?