##1. はじめに
Lチカで始めるテスト自動化シリーズ第六弾です。
第五弾でWebカメラの画像キャプチャができるようになりました。そこで今回は組込み機器(IoTデバイス)の液晶画面に所定の文字列(1234およびabcd)を表示し、Webカメラで画像をキャプチャしてTensorFlowの識別器にかけ、識別器のスコアが期待値を超えているか否かでPass/Fail判定をします。
- テストベンチの製作
- 識別器の製作
- テストランナーの改修
AI部分は「Tensorflowの画像分類でハムケツを判定する」を参考にさせていただき、識別器の製作はretrain.pyで、画像の識別はlabel_image.pyで行っています。
これまでの記事はこちらをご覧ください。
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
##2. テストベンチの製作
###2.1 テストベンチ全景
テストベンチを以下に示します。タミヤのユニバーサルプレートL(210x160mm)にM5StackとロジクールのWebカメラC270を固定しています。
###2.2 テスト対象の組込み機器(IoTデバイス)
テスト対象の組込み機器(IoTデバイス)としてM5Stackを使用します。M5Stackの背面に厚手の両面テープを貼って固定しています。M5Stackのソースは付録A.1をご覧ください。
- 起動すると液晶画面を白で塗りつぶします
- 通信速度は115200bpsで、筆者の環境ではCOM6で認識されています
- UART通信で任意の文字データを受信するとコマンド制御モードに遷移します
|おもなコマンド |機能 |
|--------------------+--------------------------------------|
|m5lcd clear |画面をクリア(白で全画面描画)
|m5lcd str <strings> |stringsで与えられた文字列を画面に描画
###2.3 Webカメラ
Webカメラは数センチの距離でピントが合うように分解してピントを調整しています。また、丸いシールを油性ペンで黒く塗ったものをインジケータのLEDに貼り付けています(レンズ左側)。
Webカメラの画像の例を以下に示します。
###2.4 テスト実行環境
Windows10(1909)で実行しています。PythonやTensorFlowのバージョンは以下の通りです。
>conda list|findstr python
ipython 7.15.0 py36_0
ipython_genutils 0.2.0 py36_0
mecab-python-windows 0.996.1 pypi_0 pypi
python 3.6.9 h5500b2f_0
python-dateutil 2.8.1 py_0
>conda list|findstr tensorflow
tensorflow 1.14.0 mkl_py36hb88db5b_0
tensorflow-base 1.14.0 mkl_py36ha978198_0
tensorflow-estimator 1.14.0 py_0
tensorflow-hub 0.8.0 pyhe6710b0_0
##3. 識別器の製作
識別器はTensorFlowのretrain.pyコマンドで学習させて作ります。
- 教師データ作成用テストスクリプトを実行する
- その様子をWebカメラで撮影し動画ファイルに保存する
- 動画ファイルから1フレームずつ文字列の描画されているエリアを切り出す
- 画像ごとにフォルダに分けて整理する
- retrain.pyで学習する
###3.1 教師データ作成用テストスクリプト
30fpsで動画を撮影すると1秒当たり30枚の画像が手に入ります。液晶画面に文字列を表示したあと4秒のsleepを入れて120フレームぶんの画像を確保します。
#
# the 3rd parameter is used for serial.Serial() dsrdtr
open_uart,COM6,0
#
# M5Stack cmd mode
send,x
#
# string 1234 on TFT display
send,m5lcd clear
send,m5lcd str 1234
sleep,4
#
# string abcd on TFT display
send,m5lcd clear
send,m5lcd str abcd
sleep,4
###3.2 Webカメラで撮影し動画ファイルに保存する
Python/OpenCVでWebカメラ!撮影した動画を保存するで紹介されているスクリプトを使わせていただきました。
###3.3 動画ファイルから1フレームずつ文字列の描画されているエリアを切り出す
Python, OpenCVで動画ファイルからフレームを切り出して保存で紹介されているopencv_video_to_still.py (MIT License、Copyright 2019 nkmk.me) を使わせていただきました。
変更箇所は下記2点です。
- 文字列が描画されているエリア(210:280, 240:400)を切り出すように変更
- 動画のファイル名や切り出した画像の出力先フォルダ名を筆者の環境に合わせる
import cv2
import os
def save_all_frames(video_path, dir_path, basename, ext='jpg'):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return
os.makedirs(dir_path, exist_ok=True)
base_path = os.path.join(dir_path, basename)
digit = len(str(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))))
n = 0
while True:
ret, frame = cap.read()
if ret:
frame2 = frame[210:280,240:400]
cv2.imwrite('{}_{}.{}'.format(base_path, str(n).zfill(digit), ext), frame2)
n += 1
else:
return
save_all_frames('video.mp4', 'work', 'video_img')
###3.4 画像ごとにフォルダに分けて整理する
人力で整理します。
####3.4.1 photos_160x70フォルダを作成する
- retrain.pyで指定するフォルダ名です
- フォルダ名は任意の名前でOKです
####3.4.2 photos_160x70フォルダにstr1234、strabcd、white、restartの4つのフォルダを作成する
- このフォルダ名はlabel_image.pyのラベルとして利用されます
- フォルダ名に"_"を使うと"_"を半角スペースに置き換えたものがラベルになるようで、それだと4章のテストランナーの改修に困るため"_"は使用を避けます
####3.4.3 切り出した画像ファイルを各フォルダへ移動する
|フォルダ|str1234|strabcd|white|restart|
|-------+-------+-------+-----+-------|
|画像の例||||
###3.5 retrain.pyで学習する
python retrain.py --image_dir photos_160x70
教師データの作り直しや追加などで2回以上行うときは、C:\tmpフォルダをリネームまたは削除してから実行します。
##4. テストランナーの改修
- label_image.pyをtest-runner.pyと同じフォルダに配置します
- 識別器に画像を与えてスコアを取得する関数(exec_labelimg())を追加します
- subprocessでlabel_image.pyに画像ファイルの名前を与えて実行する
- 標準出力のlogからfindstr(またはgrep)で所望のラベルの行を取得する
- 取得した行からスコアを切り出してreturnする
- exec_labelimg()の呼び出し側はreturnされた値を変数valに格納する
- M5StackとArduinoのどちらともUARTで通信できるようdsrdtrパラメータをテストスクリプトで指定できるようにします
- dsrdtrの値は、Arduinoは1、M5Stackは0を指定する
|コマンド |引数 |機能 |
|-------------+------------------------------------------------------------------+------------------------------------|
|exec_labelimg|・識別したい画像ファイルのファイル名
・label_image.pyのラベル名|識別器に画像を与えてスコアを取得する|
|open_uart |・テスト治具のポート番号
・dsrdtrのパラメータ(0または1) |指定されたポート番号のUARTをopenする|
import platform
import subprocess
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + " label_image.py --graph=c:\\tmp\\output_graph.pb --labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder --output_layer=final_result --image=" + filename + \
"|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def main():
(略)
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
(略)
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
##5. テスト実行結果
###5.1 テストスクリプト
-
open_camコマンド実行後1回目のcapture_camコマンド実行でNortonのダイアログがポップアップすることと、期待した画像をキャプチャできないことが度々発生するため、冒頭でわざとcapture_camコマンドを発行しています。
-
label_image.pyのスコアが0.9を超えたらPassと判定します。
#
# cam(0): external web camera
open_cam,0
capture_cam,start.jpg
#
# COM6 : M5Stack
open_uart,COM6,0
#
# M5Stack cmd mode
send,x
#
# string 1234 on TFT display
send,m5lcd clear
send,m5lcd str 1234
#
# capture and crop
sleep,1
capture_cam,str_1234.jpg
crop_img,str_1234.jpg,210:280,240:400,str_1234_crop.jpg
#
# tensorflow label image and evaluation
exec_labelimg,str_1234_crop.jpg,str1234
eval_dbl_gt,0.90
sleep,3
#
# string abcd on TFT display
send,m5lcd clear
send,m5lcd str abcd
#
# capture and crop
sleep,1
capture_cam,str_abcd.jpg
crop_img,str_abcd.jpg,210:280,240:400,str_abcd_crop.jpg
#
# tensorflow label image and evaluation
exec_labelimg,str_abcd_crop.jpg,strabcd
eval_dbl_gt,0.90
sleep,3
###5.2 テスト結果
文字列1234の画像のスコアは0.99075866、文字列abcdの画像のスコアは0.94599265で共に0.9を超え、自動テストをPassしました。
#,OK
# cam(0): external web camera,OK
open_cam,0,OK
capture_cam,start.jpg,OK
#,OK
# COM6 : M5Stack,OK
open_uart,COM6,0,OK
#,OK
# M5Stack cmd mode,OK
send,x,OK
#,OK
# string 1234 on TFT display,OK
send,m5lcd clear,OK
send,m5lcd str 1234,OK
#,OK
# capture and crop,OK
sleep,1,OK
capture_cam,str_1234.jpg,OK
crop_img,str_1234.jpg,210:280,240:400,str_1234_crop.jpg,OK
#,OK
# tensorflow label image and evaluation,OK
exec_labelimg,str_1234_crop.jpg,str1234,0.99075866,OK
eval_dbl_gt,0.90,OK
sleep,3,OK
#,OK
# string abcd on TFT display,OK
send,m5lcd clear,OK
send,m5lcd str abcd,OK
#,OK
# capture and crop,OK
sleep,1,OK
capture_cam,str_abcd.jpg,OK
crop_img,str_abcd.jpg,210:280,240:400,str_abcd_crop.jpg,OK
#,OK
# tensorflow label image and evaluation,OK
exec_labelimg,str_abcd_crop.jpg,strabcd,0.94599265,OK
eval_dbl_gt,0.90,OK
sleep,3,OK
####5.2.1 label_image.pyのスコア
参考までにstr_1234_crop.jpgとstr_abcd_crop.jpgのスコアを以下に示します。
str1234 0.99075866
strabcd 0.0063487873
white 0.002393859
restart 0.00049863075
strabcd 0.94599265
str1234 0.049787287
white 0.0028259188
restart 0.0013940954
##6. おわりに
- テスト実行結果の判定をAI(機械学習)を使ってできるようになりました。
- AI(機械学習)部分の改善は追々。
##A. 付録
###A.1 M5Stackのソース
/***********************************************************************
* MyM5Stack 010
* 2018-07-01 by ka's
*
* 2018-07-04 import Command Line Interface from arduinoLeonardoToolsV2
* https://qiita.com/pbjpkas/items/97dbf835b0aab6725e94
*
* 2018-07-16 add PCF8563 RTC Module and wire read/write
* https://qiita.com/pbjpkas/items/cc3bf43a5f9fd2c3415d
*
* 2018-07-22 add M5.Lcd.* Command Operation Function
* https://qiita.com/pbjpkas/items/62f625e21312560c26ce
*
* 2020-06-21 modify for support "m5lcd clear" and "m5lcd str" only.
*
***********************************************************************/
/***********************************************************************
* Copyright 2018 ka's@pbjpkas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
#include <M5Stack.h>
void setup();
void loop();
#define ERR_OK 0
#define ERR_INVALID -1 // 不正
#define ERR_NULL -2 // 引数がNULL
#define ERR_MALLOC -3 // mallocの戻り値がNULL
/***********************************************************************
Function Prototype : m5lcd
***********************************************************************/
void m5lcd_clear();
void m5lcd_str(char *str);
/***********************************************************************
Function Prototype : Command Mode
***********************************************************************/
#define CMD_QUIT 1
#define CMD_OK ERR_OK
#define CMD_BUF_LENGTH 64 // 63+1
#define CMD_MAX_LENGTH 64 // 63+1
#define ARG_MAX_LENGTH 64 // 63+1
void cmd_print_help(void);
void cmd_print_ver(void);
int cmd_execute(char *buf);
void cmd_rx_data(void);
/***********************************************************************
Function : setup and loop
***********************************************************************/
// the setup routine runs once when M5Stack starts up
void setup()
{
// Initialize the M5Stack object
M5.begin();
M5.Speaker.setVolume(0);
Serial.begin(115200);
m5lcd_clear();
}
// the loop routine runs over and over again forever
void loop()
{
if(Serial.available())
{
cmd_rx_data();
}
}
/***********************************************************************
Function : m5lcd
***********************************************************************/
void m5lcd_clear()
{
M5.Lcd.fillScreen(TFT_WHITE);
}
void m5lcd_str(char *str)
{
M5.Lcd.setTextColor(TFT_BLUE, TFT_WHITE); // Adding a background colour erases previous text automatically
M5.Lcd.drawCentreString(str,160,120,4);
}
/***********************************************************************
Function : Command Mode
***********************************************************************/
void cmd_print_help(void)
{
Serial.println(F("Available Command:"));
Serial.println(F("help, ? : print Help Messages"));
Serial.println(F("ver : print Version Information"));
Serial.println(F("m5lcd clear : M5Stack LCD Clear"));
Serial.println(F("m5lcd str <strings> : M5Stack LCD print strings"));
Serial.println(F("quit, exit : Quit Command Control Mode"));
}
void cmd_print_ver(void)
{
Serial.print("This is ");
Serial.print(__FILE__);
Serial.print(" ");
Serial.print("Build at ");
Serial.print(__DATE__);
Serial.print(" ");
Serial.print(__TIME__);
Serial.print("\r\n");
}
int cmd_execute(char *buf)
{
int i, x, y;
unsigned int ux;
int return_val = CMD_OK;
char cmd[CMD_MAX_LENGTH];
char arg1[ARG_MAX_LENGTH];
char arg2[ARG_MAX_LENGTH];
strcpy(cmd, "");
strcpy(arg1, "");
strcpy(arg2, "");
sscanf(buf, "%s %s %s", &cmd, &arg1, &arg2);
if (strcmp(cmd, "help")==0){ cmd_print_help(); }
else if(strcmp(cmd, "?" )==0){ cmd_print_help(); }
else if(strcmp(cmd, "ver" )==0){ cmd_print_ver(); }
else if((strcmp(cmd, "quit")==0) or (strcmp(cmd, "exit")==0))
{
return CMD_QUIT;
}
else if(strcmp(cmd, "m5lcd")==0 and strcmp(arg1, "clear")==0)
{
m5lcd_clear();
}
else if(strcmp(cmd, "m5lcd")==0 and strcmp(arg1, "str")==0)
{
m5lcd_str(&arg2[0]);
}
else
{
return ERR_INVALID;
}
return return_val;
}
void cmd_rx_data(void)
{
int i;
int return_val = CMD_OK;
char buf[CMD_BUF_LENGTH];
/* モード切替時の "Hit any key" のキー操作を捨てる */
while(Serial.available()){ Serial.read(); }
Serial.print(F("\r\n### Command Mode. ###\r\n"));
Serial.print(F("### Hit ? to help.###\r\n"));
Serial.print(F("$"));
i=0;
while(1)
{
if(Serial.available())
{
buf[i] = Serial.read();
Serial.print(buf[i]); //echo-back
if ( (buf[i] == 0x08) or (buf[i] == 0x7f) ) //BackSpace, Delete
{
buf[i] = '\0';
if(i) i--;
}
else if( (buf[i] == '\r') or (buf[i] == '\n') )
{
Serial.print( F("\r\n") );
buf[i] = '\0';
return_val = cmd_execute(&buf[0]);
for(i=0; i<CMD_BUF_LENGTH; i++) buf[i] = '\0';
i=0;
if(return_val == CMD_QUIT)
{
Serial.print(F("### Quit Command Mode. ###\r\n"));
return;
}
else if(return_val == ERR_INVALID)
{
Serial.print(F("?\r\n"));
Serial.print(F("$"));
}
else
{
Serial.print(F("OK\r\n$"));
}
}
else
{
i++;
if(i>=CMD_BUF_LENGTH)
{
Serial.print(F("### CMD BUFFER FULL, CLEAR. ###\r\n"));
for(i=0; i<CMD_BUF_LENGTH; i++) buf[i] = '\0';
i=0;
}
}
}
}// while
}
###A.2 test-runner.pyのソース
#!/usr/bin/python3
#
# This software includes the work that is distributed in the Apache License 2.0
#
from time import sleep
import serial
import codecs
import csv
import platform
import sys
import subprocess
import visa
import cv2
from PIL import Image
import pyocr
import pyocr.builders
UNINITIALIZED = 0xdeadbeef
def open_cam(camera_number):
return cv2.VideoCapture(camera_number)
def close_cam(cam):
if cam != UNINITIALIZED:
cam.release()
def capture_cam(cam, filename):
if cam != UNINITIALIZED:
_, img = cam.read()
cv2.imwrite(filename, img)
return True
else:
print("CAM Not Ready.")
return False
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Not Ready.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + " label_image.py --graph=c:\\tmp\\output_graph.pb --labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder --output_layer=final_result --image=" + filename + \
"|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def main():
is_passed = True
val = str(UNINITIALIZED)
cam = UNINITIALIZED
ocr = UNINITIALIZED
dso = UNINITIALIZED
uart = UNINITIALIZED
with codecs.open('script.csv', 'r', 'utf-8') as file:
script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open('result.csv', 'w', 'utf-8') as file:
result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')
for cmd in script:
print(cmd)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "open_cam":
cam = open_cam(int(cmd[1]))
elif cmd[0] == "close_cam":
close_cam(cam)
cam = UNINITIALIZED
elif cmd[0] == "capture_cam":
ret = capture_cam(cam, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
print(cmd)
result.writerow(cmd)
close_uart(uart)
close_cam(cam)
print("FAIL")
sys.exit(1)
if is_passed == True:
close_uart(uart)
close_cam(cam)
print("PASS")
sys.exit(0)
main()