0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する

Posted at

##1. はじめに
秋月Raspberry Pi Zero WH用IoT学習HATキットに8文字x2行のLCDや緑・黄・赤のLEDが載っているのを見て、Raspberry Pi Zero WHでテストランナーを動かしてLCDやアンドンで進捗状況がわかると便利かもと思い作ってみました。
overview.jpg

  • 4行のテストスクリプトを流し、成功して終了する例
    pass.gif

  • 4行のテストスクリプトを流し、2行目で失敗して終了する例
    fail.gif

##2. AQM0802A LCDドライバ
Yuta KitagamiさんがMITライセンスで公開されているRaspberry Pi用のAKI_I2C_AQM0802A.pyを利用させていただき1、python3.7.32で動くように下記2点修正します。

  • print文のシンタックス
  • IOError→OSError
    • python3.3でIOErrorはOSErrorに統合されました。
AKI_I2C_AQM0802A.py
############################################################
#The MIT License (MIT)
#Copyright (c) 2015 Yuta Kitagami
#Project:    https://github.com/nonNoise/akilib
#
#Modified by pbjpkas for python3 support, 2021
#
############################################################

import smbus
import time

I2C_ADDR = 0x3e
class AKI_I2C_AQM0802A:
    def __init__(self):
        print ("AKI_I2C_AQM0802A")
        self.i2c = smbus.SMBus(1)
    def i2cReg(self,wr,addr,data):
        try:
            if(wr == "w"):
                return self.i2c.write_byte_data(I2C_ADDR,addr,data)
            elif(wr == "r"):
                return self.i2c.read_byte(I2C_ADDR,addr)
            else :
                return -1
        except OSError(errno):
            print ("No ACK!")
            exit()
    def Init_LCD(self):
        # "Function set"
        self.i2cReg("w",0x00,0x38)
        time.sleep(0.10)
        # "Function set"
        self.i2cReg("w",0x00,0x39)
        time.sleep(0.10)
        # "Internal OSC frequency"
        self.i2cReg("w",0x00,0x14)
        time.sleep(0.10)
        # "Contrast Set"
        self.i2cReg("w",0x00,0x70)
        time.sleep(0.10)
        # "Power/ICON Set"
        self.i2cReg("w",0x00,0x56)
        time.sleep(0.10)
        # "Follower control"
        self.i2cReg("w",0x00,0x6c)
        time.sleep(0.10)
        # "Function set"
        self.i2cReg("w",0x00,0x38)
        time.sleep(0.10)
        # "Display ON/OFF"
        self.i2cReg("w",0x00,0x0c)
        time.sleep(0.10)


    def ClearDisplay(self):
        # "Clear Display"
        self.i2cReg("w",0x00,0x01)
        time.sleep(0.1)

    def WritePos(self,a,b):        
        if a ==0:
            self.i2cReg("w",0x00,0x80|b)
        elif a ==1:
            self.i2cReg("w",0x00,0x80|0x40|b)
        #time.sleep(0.3)

    def WriteChar(self,c):
        self.i2cReg("w",0x40,c )
        time.sleep(0.3)

    def WriteStr(self,s,t=0):
        #print s
        for c in s :        
            #print ord(c)
            self.i2cReg("w",0x40,ord(c))
            time.sleep(t)

    def NewClearDisplay(self,pos,time):
        self.WritePos(pos,0)
        self.WriteStr("                ",time)
        # "Clear Display"

##3. テストランナーの改修
一つ前の記事 "Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する" の付録A. test-runner.pyのソースに下記4点の改修を行います。差分を示します。全部入りソースは付録Aをご覧ください。

  1. LCDドライバモジュールおよびGPIOモジュールのインポート
  2. LED点灯関数やLCD表示関数の追加
  3. テストスクリプトの総行数や実行中の行番号を取得しLCDに描画
  4. テスト結果のpass/failに応じてLEDを点灯

###3.1 LCDドライバモジュールおよびGPIOモジュールのインポート
LCDドライバモジュールおよびGPIOモジュールをインポートします。併せてRaspberry Pi Zero WHにインストールしていないモジュールを適宜コメントアウトします。

14,20c14,20
< import serial
< import pyvisa as visa
< import cv2
< from PIL import Image
< import pyocr
< import pyocr.builders
< import platform
---
> #import serial
> #import pyvisa as visa
> #import cv2
> #from PIL import Image
> #import pyocr
> #import pyocr.builders
> #import platform
22a23,24
> from AKI_I2C_AQM0802A import AKI_I2C_AQM0802A
> import RPi.GPIO as GPIO

###3.2 LED点灯関数やLCD表示関数の追加
LEDを接続しているGPIOの番号を定義します。緑:LED_GR、黄:LED_Y、赤:LED_R、液晶のバックライトLED:LED_LCD_BACKLIGHTです。

26a29,34
> # Akizuki IoT Learning HAT GPIO Number
> LED_GR = 22
> LED_Y = 27
> LED_R = 17
> LED_LCD_BACKLIGHT = 26
> 

LED点灯関数やLCD表示関数を追加します。液晶のバックライトはled_init()で点灯し、以後消灯しません。緑、黄、赤のLEDは以下の表に従います。

|進捗状況|実行中|成功して終了|失敗して終了|
|--------+------+------------+------------|
|緑 |off |on |off |
|黄 |on |off |off |
|赤 |off |off |on |

174a183,219
> def led_init():
>     GPIO.setmode(GPIO.BCM)
> 
>     GPIO.setwarnings(False)
>     GPIO.setup(LED_GR, GPIO.OUT)
>     GPIO.setup(LED_Y, GPIO.OUT)
>     GPIO.setup(LED_R, GPIO.OUT)
>     GPIO.setup(LED_LCD_BACKLIGHT, GPIO.OUT)
> 
>     GPIO.output(LED_GR, False)
>     GPIO.output(LED_Y, True)
>     GPIO.output(LED_R, False)
>     GPIO.output(LED_LCD_BACKLIGHT, True)
> 
> def led_pass():
>     GPIO.output(LED_GR, True)
>     GPIO.output(LED_Y, False)
>     GPIO.output(LED_R, False)
> 
> def led_fail():
>     GPIO.output(LED_GR, False)
>     GPIO.output(LED_Y, False)
>     GPIO.output(LED_R, True)
> 
> def lcd_init():
>     lcd = AKI_I2C_AQM0802A()
>     lcd.Init_LCD()
>     lcd.ClearDisplay()
>     lcd.NewClearDisplay(0, 0)
>     lcd.NewClearDisplay(1, 0)
>     return lcd
> 
> def lcd_str(lcd, row, col, str):
>     lcd.NewClearDisplay(row, 0)
>     lcd.WritePos(row, col)
>     lcd.WriteStr(str)
> 

###3.3 テストスクリプトの総行数や実行中の行番号を取得しLCDに表示
main()の冒頭でled_init()とlcd_init()を実行します。

182a228,229
>     led_init()
>     lcd = lcd_init()

wc -lコマンドでテストスクリプトの総行数をカウントしlcd_str()でLCDの1行目に表示します。

190a238,240
>     script_total_line = subprocess.check_output(['wc', '-l', script_file_name]).decode().split(' ')[0]
>     lcd_str(lcd, 0, 0, script_total_line)
> 

実行中の行番号を変数script_lineへ代入しlcd_str()でLCDの2行目に表示します。

196a247
>             script_line = 0
198a250,252
>                 script_line += 1
>                 lcd_str(lcd, 1, 0, str(script_line))
>                 print("########## " + str(script_line) + " ##########")

###3.4 テスト結果のpass/failに応じてLEDを点灯
failで終了する箇所にled_fail()を、passで終了する箇所にled_pass()を追加します。

343a398
>                     led_fail()
349a405
>         led_pass()

##4. おわりに

  • 意図的にfailさせているとはいえアンドンが赤く光るとドキッとしますね。
  • IoT学習HATキットのBME280接続ソケットにオプションのBME280使用 温湿度・気圧センサモジュールキットを接続してテスト実行環境をモニタリングするのも面白そうと思いました。

##付録A. test-runner.pyのソース

test-runner.py
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import random
import sys
import codecs
import csv
import datetime
#import serial
#import pyvisa as visa
#import cv2
#from PIL import Image
#import pyocr
#import pyocr.builders
#import platform
import subprocess
from subprocess import PIPE
from AKI_I2C_AQM0802A import AKI_I2C_AQM0802A
import RPi.GPIO as GPIO

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

# Akizuki IoT Learning HAT GPIO Number
LED_R = 17
LED_Y = 27
LED_GR = 22
LED_LCD_BACKLIGHT = 26

def serial_write(h, string):
    if h != UNINITIALIZED:
        string = string + '\n'
        string = str.encode(string)
        h.write(string)
        return True
    else:
        print("UART Not Initialized.")
        return False

def close_uart(h):
    if h != UNINITIALIZED:
        h.close()
    else:
        #print("UART Not Initialized.")
        pass

def open_dso():
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        try:
            dso = rm.open_resource(resource)
        except:
            print(resource, "Not Found.")
        else:
            print(resource, "Detected.")
            return dso

    #Throw an error to caller if none succeed.
    return dso

def open_cam(camera_number, width, height):
    h = cv2.VideoCapture(camera_number)
    h.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    h.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    return h

def close_cam(cam):
    if cam != UNINITIALIZED:
        cam.release()

def capture_cam(cam, filename):
    if cam != UNINITIALIZED:
        _, img = cam.read()
        cv2.imwrite(filename, img)
        return True
    else:
        print("CAM Not Ready.")
        return False

def crop_img(filename_in, v, h, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    v0 = int(v.split(':')[0])
    v1 = int(v.split(':')[1])
    h0 = int(h.split(':')[0])
    h1 = int(h.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False

    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False

    if uart != UNINITIALIZED:
        serial_write(uart, "servoread")

        servo_position = uart.readline().strip().decode('utf-8')
        print(servo_position)

        # discard "OK"
        devnull = uart.readline().strip().decode('utf-8')

        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)

        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1

        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            serial_write(uart, command)
            # discard "OK"
            devnull = uart.readline().strip().decode('utf-8')
            sleep(0.2) # sec.

        return True
    else:
        print("UART Not Initialized.")
        return False

def led_init():
    GPIO.setmode(GPIO.BCM)

    GPIO.setwarnings(False)
    GPIO.setup(LED_R, GPIO.OUT)
    GPIO.setup(LED_Y, GPIO.OUT)
    GPIO.setup(LED_GR, GPIO.OUT)
    GPIO.setup(LED_LCD_BACKLIGHT, GPIO.OUT)

    GPIO.output(LED_R, False)
    GPIO.output(LED_Y, True)
    GPIO.output(LED_GR, False)
    GPIO.output(LED_LCD_BACKLIGHT, True)

def led_pass():
    GPIO.output(LED_R, False)
    GPIO.output(LED_Y, False)
    GPIO.output(LED_GR, True)

def led_fail():
    GPIO.output(LED_R, True)
    GPIO.output(LED_Y, False)
    GPIO.output(LED_GR, False)

def lcd_init():
    lcd = AKI_I2C_AQM0802A()
    lcd.Init_LCD()
    lcd.ClearDisplay()
    lcd.NewClearDisplay(0, 0)
    lcd.NewClearDisplay(1, 0)
    return lcd

def lcd_str(lcd, row, col, str):
    lcd.NewClearDisplay(row, 0)
    lcd.WritePos(row, col)
    lcd.WriteStr(str)

def main():
    is_passed = True
    val = str(UNINITIALIZED)
    fval = 0.0
    uart = UNINITIALIZED
    dso = UNINITIALIZED
    cam = UNINITIALIZED
    ocr = UNINITIALIZED
    led_init()
    lcd = lcd_init()

    if len(sys.argv) == 2:
        script_file_name = sys.argv[1] + ".csv"
        result_file_name = sys.argv[1] + "_result.csv"
    else:
        script_file_name = "script.csv"
        result_file_name = "result.csv"

    script_total_line = subprocess.check_output(['wc', '-l', script_file_name]).decode().split(' ')[0]
    lcd_str(lcd, 0, 0, script_total_line)

    with codecs.open(script_file_name, 'r', 'utf-8') as file:
        script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')

        with codecs.open(result_file_name, 'w', 'utf-8') as file:
            result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')

            script_line = 0
            for cmd in script:
                timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
                script_line += 1
                lcd_str(lcd, 1, 0, str(script_line))
                print("########## " + str(script_line) + " ##########")
                print(cmd)

                if "#" in cmd[0]:
                    pass

                elif cmd[0] == "sleep":
                    sleep(float(cmd[1]))

                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    cmd.append(str(fval))
                    sleep(fval)

                elif cmd[0] == "open_uart":
                    if len(cmd) == 2:
                        dsrdtr_val = 1
                    else:
                        dsrdtr_val = int(cmd[2])
                    try:
                        uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
                    except:
                        is_passed = False

                elif cmd[0] == "send":
                    ret = serial_write(uart, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rcvd":
                    try:
                        val = uart.readline().strip().decode('utf-8')
                        cmd.append(val)
                    except:
                        is_passed = False

                elif cmd[0] == "open_dso":
                    try:
                        dso = open_dso()
                    except:
                        is_passed = False

                elif cmd[0] == "dso":
                    try:
                        if "?" in cmd[1]:
                            val = dso.query(cmd[1]).rstrip().replace(",", "-")
                            cmd.append(val)
                        else:
                            dso.write(cmd[1])
                    except:
                        is_passed = False

                elif cmd[0] == "open_cam":
                    if len(cmd) == 2:
                        cam = open_cam(int(cmd[1]), 640, 480)
                    else:
                        cam = open_cam(int(cmd[1]), int(cmd[2]), int(cmd[3]))

                elif cmd[0] == "close_cam":
                    close_cam(cam)
                    cam = UNINITIALIZED

                elif cmd[0] == "capture_cam":
                    ret = capture_cam(cam, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "crop_img":
                    crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

                elif cmd[0] == "open_ocr":
                    ocr = open_ocr()
                    if ocr == UNINITIALIZED:
                        is_passed = False

                elif cmd[0] == "exec_ocr":
                    try:
                        val = exec_ocr(ocr, cmd[1])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "exec_labelimg":
                    try:
                        val = exec_labelimg(cmd[1], cmd[2])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "set_servo":
                    ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

                elif cmd[0] == "eval_str_eq":
                    if str(val) != str(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_eq":
                    if int(val) != int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_gt":
                    if int(val) < int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_lt":
                    if int(val) > int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_eq":
                    if float(val) != float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_gt":
                    if float(val) < float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_lt":
                    if float(val) > float(cmd[1]):
                        is_passed = False

                else:
                    cmd.append("#")

                if is_passed == True:
                    cmd.append("OK")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                else:
                    cmd.append("NG")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                    close_uart(uart)
                    close_cam(cam)
                    led_fail()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        close_uart(uart)
        close_cam(cam)
        led_pass()
        print("PASS")
        sys.exit(0)

main()

##付録B. Raspberry Pi Zero WをJenkinsのエージェントとして利用する
Raspbian GNU/Linux 10 (buster)のJavaをJava 8に入れ替えてRaspberry Pi Zero WをJenkinsのエージェントとして利用するをご覧ください。

##付録C. Lチカで始めるテスト自動化・記事一覧
電子書籍化したものを技術書典で頒布しています。

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
  3. Lチカで始めるテスト自動化(3)オシロスコープの組込み
  4. Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
  5. Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
  6. Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
  7. Lチカで始めるテスト自動化(7)タイムスタンプの保存
  8. Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
  9. Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
  10. Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
  11. Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
  12. Lチカで始めるテスト自動化(12)書籍化の作業メモ
  13. Lチカで始めるテスト自動化(13)外部プログラムの呼出し
  14. Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
  1. 秋月のサンプルプログラムをpython3に対応させる方法もあるのですがライセンスが不明なため、MITライセンスで公開されているプログラムを利用させていただきました。

  2. Raspbian GNU/Linux 10 (buster)に組込まれているpython3のバージョンです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?