##1. はじめに
一つ前の記事の4.おわりにで以下のように書きました。
IoT学習HATキットのBME280接続ソケットにオプションのBME280使用 温湿度・気圧センサモジュールキットを接続してテスト実行環境をモニタリングするのも面白そうと思いました。
左の青い基板はスイッチサイエンスのBME280搭載 温湿度・気圧センサモジュール、右の緑の基板は秋月電子通商のBME280使用 温湿度・気圧センサモジュールキットです。秋月版はそのままIoT学習HATキットに挿せますがスイッチサイエンス版はピン数、ピン配置が異なるため配線に注意します。ソフトウェアはどちらも同じものが利用できます。
##2. BME280ドライバ
スイッチサイエンスがMITライセンスで公開されているbme280_sample.pyを利用させていただき、下記3点変更します。
- python3.7.31で動くようにprint文を修正
- テストランナーから呼び出して結果を取得できるように変更
- BME280のI2Cアドレスを0x76固定ではなくテストスクリプトで指定できるように変更
smbus2モジュールをあらかじめRaspberry Pi Zero WHへインストールします。
> pip3 install smbus2
1a2,3
> # This code is based on
> # https://github.com/SWITCHSCIENCE/samplecodes/blob/master/BME280/Python27/bme280_sample.py
61c63,64
< def readData():
---
> def readData(addr):
> i2c_address = addr
69,71c72,77
< compensate_T(temp_raw)
< compensate_P(pres_raw)
< compensate_H(hum_raw)
---
> temp = compensate_T(temp_raw)
> pres = compensate_P(pres_raw)
> hum = compensate_H(hum_raw)
> data = "%.1f,%.1f,%.1f" % (temp, hum, pres)
> print(data)
> return data
95c101,102
< print "pressure : %7.2f hPa" % (pressure/100)
---
> #print ("pressure : %7.2f hPa" % (pressure/100))
> return pressure/100
103c110,111
< print "temp : %-6.2f ℃" % (temperature)
---
> #print ("temp : %-6.2f ℃" % (temperature) )
> return temperature
117c125,126
< print "hum : %6.2f %" % (var_h)
---
> #print ("hum : %6.2f %" % (var_h))
> return var_h
144c153
< readData()
---
> readData(0x76)
##3. テストランナーの改修
BME280から温度、湿度、気圧を取得して結果ファイルに出力するコマンド「env」を追加します。引数としてBME280のI2Cアドレス(0x76または0x77)を与えます。一つ前の記事の 付録A. test-runner.pyのソース との差分を示します。全部入りソースは付録をご覧ください。
24a25
> import bme280
353a355,364
>
> elif cmd[0] == "env":
> try:
> val = bme280.readData(int(cmd[1],16))
> except:
> is_passed = False
> else:
> val = val.split(',')
> for i in val:
> cmd.append(i)
テストスクリプトと結果ファイルの例です。結果ファイルには気温、湿度、気圧の順に値が格納されます。
env,0x76
2021/05/01 12:22:40,env,0x76,29.4,50.3,1003.2,OK
##4. おわりに
- 秋月のIoT学習HATキットはテスト自動化の学習にも役立つと思いました。
- 8文字x2行のLCDでテストの進捗状況がわかる
- 緑/黄/赤のLEDをアンドンに見立ててテストの結果がわかる
- センサを接続することでテスト実行環境の温度、湿度、気圧を記録できる
##付録.A test-runner.pyのソース
#!/usr/bin/python3
#
# This software includes the work that is distributed
# in the Apache License 2.0
#
from time import sleep
import random
import sys
import codecs
import csv
import datetime
#import serial
#import pyvisa as visa
#import cv2
#from PIL import Image
#import pyocr
#import pyocr.builders
#import platform
import subprocess
from subprocess import PIPE
from AKI_I2C_AQM0802A import AKI_I2C_AQM0802A
import RPi.GPIO as GPIO
import bme280
UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6
# Akizuki IoT Learning HAT GPIO Number
LED_GR = 22
LED_Y = 27
LED_R = 17
LED_LCD_BACKLIGHT = 26
def serial_write(h, string):
if h != UNINITIALIZED:
string = string + '\n'
string = str.encode(string)
h.write(string)
return True
else:
print("UART Not Initialized.")
return False
def close_uart(h):
if h != UNINITIALIZED:
h.close()
else:
#print("UART Not Initialized.")
pass
def open_dso():
rm = visa.ResourceManager()
resources = rm.list_resources()
#print(resources)
for resource in resources:
#print(resource)
try:
dso = rm.open_resource(resource)
except:
print(resource, "Not Found.")
else:
print(resource, "Detected.")
return dso
#Throw an error to caller if none succeed.
return dso
def open_cam(camera_number, width, height):
h = cv2.VideoCapture(camera_number)
h.set(cv2.CAP_PROP_FRAME_WIDTH, width)
h.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
return h
def close_cam(cam):
if cam != UNINITIALIZED:
cam.release()
def capture_cam(cam, filename):
if cam != UNINITIALIZED:
_, img = cam.read()
cv2.imwrite(filename, img)
return True
else:
print("CAM Not Ready.")
return False
def crop_img(filename_in, v, h, filename_out):
img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
v0 = int(v.split(':')[0])
v1 = int(v.split(':')[1])
h0 = int(h.split(':')[0])
h1 = int(h.split(':')[1])
img2 = img[v0:v1, h0:h1]
cv2.imwrite(filename_out, img2)
return True
def open_ocr():
ocr = pyocr.get_available_tools()
if len(ocr) != 0:
ocr = ocr[0]
else:
ocr = UNINITIALIZED
print("OCR Not Ready.")
return ocr
def exec_ocr(ocr, filename):
try:
txt = ocr.image_to_string(
Image.open(filename),
lang = "eng",
builder = pyocr.builders.TextBuilder()
)
except:
print("OCR Fail.")
else:
return txt
def exec_labelimg(filename, label_string):
if platform.system() == "Windows" :
python = "python"
grep = "findstr"
else:
python = "python3"
grep = "grep"
cmd = python + \
" label_image.py \
--graph=c:\\tmp\\output_graph.pb \
--labels=c:\\tmp\\output_labels.txt \
--input_layer=Placeholder \
--output_layer=final_result \
--image=" + filename \
+ "|" + grep + " " + label_string
print(cmd)
log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
ret = log.stdout.strip().decode("utf-8").split(" ")[1]
return ret
def set_servo(uart, servo_id, servo_pos):
if servo_id < 0 or servo_id >= NUM_OF_SERVO:
print("Invalid Servo ID")
return False
if servo_pos < 0 or servo_pos > 180:
print("Invalid Servo Position")
return False
if uart != UNINITIALIZED:
serial_write(uart, "servoread")
servo_position = uart.readline().strip().decode('utf-8')
print(servo_position)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
current_pos = int(servo_position.split(' ')[servo_id])
#print(servo_id, current_pos)
if current_pos < servo_pos:
start = current_pos +1
stop = servo_pos +1
step = 1
else:
start = current_pos -1
stop = servo_pos -1
step = -1
for i in range(start, stop, step):
command = "servo " + str(servo_id) + " " + str(i)
print(command)
serial_write(uart, command)
# discard "OK"
devnull = uart.readline().strip().decode('utf-8')
sleep(0.2) # sec.
return True
else:
print("UART Not Initialized.")
return False
def led_init():
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(LED_GR, GPIO.OUT)
GPIO.setup(LED_Y, GPIO.OUT)
GPIO.setup(LED_R, GPIO.OUT)
GPIO.setup(LED_LCD_BACKLIGHT, GPIO.OUT)
GPIO.output(LED_GR, False)
GPIO.output(LED_Y, True)
GPIO.output(LED_R, False)
GPIO.output(LED_LCD_BACKLIGHT, True)
def led_pass():
GPIO.output(LED_GR, True)
GPIO.output(LED_Y, False)
GPIO.output(LED_R, False)
def led_fail():
GPIO.output(LED_GR, False)
GPIO.output(LED_Y, False)
GPIO.output(LED_R, True)
def lcd_init():
lcd = AKI_I2C_AQM0802A()
lcd.Init_LCD()
lcd.ClearDisplay()
lcd.NewClearDisplay(0, 0)
lcd.NewClearDisplay(1, 0)
return lcd
def lcd_str(lcd, row, col, str):
lcd.NewClearDisplay(row, 0)
lcd.WritePos(row, col)
lcd.WriteStr(str)
def main():
is_passed = True
val = str(UNINITIALIZED)
fval = 0.0
uart = UNINITIALIZED
dso = UNINITIALIZED
cam = UNINITIALIZED
ocr = UNINITIALIZED
led_init()
lcd = lcd_init()
if len(sys.argv) == 2:
script_file_name = sys.argv[1] + ".csv"
result_file_name = sys.argv[1] + "_result.csv"
else:
script_file_name = "script.csv"
result_file_name = "result.csv"
script_total_line = subprocess.check_output(['wc', '-l', script_file_name]).decode().split(' ')[0]
lcd_str(lcd, 0, 0, script_total_line)
with codecs.open(script_file_name, 'r', 'utf-8') as file:
script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')
with codecs.open(result_file_name, 'w', 'utf-8') as file:
result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')
script_line = 0
for cmd in script:
timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
script_line += 1
lcd_str(lcd, 1, 0, str(script_line))
print("########## " + str(script_line) + " ##########")
print(cmd)
if "#" in cmd[0]:
pass
elif cmd[0] == "sleep":
sleep(float(cmd[1]))
elif cmd[0] == "rusleep":
fval = random.uniform(float(cmd[1]), float(cmd[2]))
cmd.append(str(fval))
sleep(fval)
elif cmd[0] == "open_uart":
if len(cmd) == 2:
dsrdtr_val = 1
else:
dsrdtr_val = int(cmd[2])
try:
uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
except:
is_passed = False
elif cmd[0] == "send":
ret = serial_write(uart, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "rcvd":
try:
val = uart.readline().strip().decode('utf-8')
cmd.append(val)
except:
is_passed = False
elif cmd[0] == "open_dso":
try:
dso = open_dso()
except:
is_passed = False
elif cmd[0] == "dso":
try:
if "?" in cmd[1]:
val = dso.query(cmd[1]).rstrip().replace(",", "-")
cmd.append(val)
else:
dso.write(cmd[1])
except:
is_passed = False
elif cmd[0] == "open_cam":
if len(cmd) == 2:
cam = open_cam(int(cmd[1]), 640, 480)
else:
cam = open_cam(int(cmd[1]), int(cmd[2]), int(cmd[3]))
elif cmd[0] == "close_cam":
close_cam(cam)
cam = UNINITIALIZED
elif cmd[0] == "capture_cam":
ret = capture_cam(cam, cmd[1])
if ret == False:
is_passed = False
elif cmd[0] == "crop_img":
crop_img(cmd[1], cmd[2], cmd[3], cmd[4])
elif cmd[0] == "open_ocr":
ocr = open_ocr()
if ocr == UNINITIALIZED:
is_passed = False
elif cmd[0] == "exec_ocr":
try:
val = exec_ocr(ocr, cmd[1])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "exec_labelimg":
try:
val = exec_labelimg(cmd[1], cmd[2])
except:
is_passed = False
else:
cmd.append(str(val))
elif cmd[0] == "set_servo":
ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
if ret == False:
is_passed = False
elif cmd[0] == "run":
ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
val = ret.stdout.strip()
print(ret)
if ret.returncode != 0:
is_passed = False
elif cmd[0] == "env":
try:
val = bme280.readData(int(cmd[1],16))
except:
is_passed = False
else:
val = val.split(',')
for i in val:
cmd.append(i)
elif cmd[0] == "eval_str_eq":
if str(val) != str(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_eq":
if int(val) != int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_gt":
if int(val) < int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_int_lt":
if int(val) > int(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_eq":
if float(val) != float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_gt":
if float(val) < float(cmd[1]):
is_passed = False
elif cmd[0] == "eval_dbl_lt":
if float(val) > float(cmd[1]):
is_passed = False
else:
cmd.append("#")
if is_passed == True:
cmd.append("OK")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
else:
cmd.append("NG")
cmd.insert(0,timestamp)
print(cmd)
result.writerow(cmd)
close_uart(uart)
close_cam(cam)
led_fail()
print("FAIL")
sys.exit(1)
if is_passed == True:
close_uart(uart)
close_cam(cam)
led_pass()
print("PASS")
sys.exit(0)
main()
##付録.B BME280ドライバ
#coding: utf-8
# This code is based on
# https://github.com/SWITCHSCIENCE/samplecodes/blob/master/BME280/Python27/bme280_sample.py
from smbus2 import SMBus
import time
bus_number = 1
i2c_address = 0x76
bus = SMBus(bus_number)
digT = []
digP = []
digH = []
t_fine = 0.0
def writeReg(reg_address, data):
bus.write_byte_data(i2c_address,reg_address,data)
def get_calib_param():
calib = []
for i in range (0x88,0x88+24):
calib.append(bus.read_byte_data(i2c_address,i))
calib.append(bus.read_byte_data(i2c_address,0xA1))
for i in range (0xE1,0xE1+7):
calib.append(bus.read_byte_data(i2c_address,i))
digT.append((calib[1] << 8) | calib[0])
digT.append((calib[3] << 8) | calib[2])
digT.append((calib[5] << 8) | calib[4])
digP.append((calib[7] << 8) | calib[6])
digP.append((calib[9] << 8) | calib[8])
digP.append((calib[11]<< 8) | calib[10])
digP.append((calib[13]<< 8) | calib[12])
digP.append((calib[15]<< 8) | calib[14])
digP.append((calib[17]<< 8) | calib[16])
digP.append((calib[19]<< 8) | calib[18])
digP.append((calib[21]<< 8) | calib[20])
digP.append((calib[23]<< 8) | calib[22])
digH.append( calib[24] )
digH.append((calib[26]<< 8) | calib[25])
digH.append( calib[27] )
digH.append((calib[28]<< 4) | (0x0F & calib[29]))
digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
digH.append( calib[31] )
for i in range(1,2):
if digT[i] & 0x8000:
digT[i] = (-digT[i] ^ 0xFFFF) + 1
for i in range(1,8):
if digP[i] & 0x8000:
digP[i] = (-digP[i] ^ 0xFFFF) + 1
for i in range(0,6):
if digH[i] & 0x8000:
digH[i] = (-digH[i] ^ 0xFFFF) + 1
def readData(addr):
i2c_address = addr
data = []
for i in range (0xF7, 0xF7+8):
data.append(bus.read_byte_data(i2c_address,i))
pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
hum_raw = (data[6] << 8) | data[7]
temp = compensate_T(temp_raw)
pres = compensate_P(pres_raw)
hum = compensate_H(hum_raw)
data = "%.1f,%.1f,%.1f" % (temp, hum, pres)
print(data)
return data
def compensate_P(adc_P):
global t_fine
pressure = 0.0
v1 = (t_fine / 2.0) - 64000.0
v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
v2 = v2 + ((v1 * digP[4]) * 2.0)
v2 = (v2 / 4.0) + (digP[3] * 65536.0)
v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0)) / 262144
v1 = ((32768 + v1) * digP[0]) / 32768
if v1 == 0:
return 0
pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
if pressure < 0x80000000:
pressure = (pressure * 2.0) / v1
else:
pressure = (pressure / v1) * 2
v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
v2 = ((pressure / 4.0) * digP[7]) / 8192.0
pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)
#print ("pressure : %7.2f hPa" % (pressure/100))
return pressure/100
def compensate_T(adc_T):
global t_fine
v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
t_fine = v1 + v2
temperature = t_fine / 5120.0
#print ("temp : %-6.2f ℃" % (temperature) )
return temperature
def compensate_H(adc_H):
global t_fine
var_h = t_fine - 76800.0
if var_h != 0:
var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
else:
return 0
var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
#print ("hum : %6.2f %" % (var_h))
return var_h
def setup():
osrs_t = 1 #Temperature oversampling x 1
osrs_p = 1 #Pressure oversampling x 1
osrs_h = 1 #Humidity oversampling x 1
mode = 3 #Normal mode
t_sb = 5 #Tstandby 1000ms
filter = 0 #Filter off
spi3w_en = 0 #3-wire SPI Disable
ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
config_reg = (t_sb << 5) | (filter << 2) | spi3w_en
ctrl_hum_reg = osrs_h
writeReg(0xF2,ctrl_hum_reg)
writeReg(0xF4,ctrl_meas_reg)
writeReg(0xF5,config_reg)
setup()
get_calib_param()
if __name__ == '__main__':
try:
readData(0x76)
except KeyboardInterrupt:
pass
##付録C. Lチカで始めるテスト自動化・記事一覧
電子書籍化したものを技術書典で頒布しています。
- Lチカで始めるテスト自動化
- Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
- Lチカで始めるテスト自動化(3)オシロスコープの組込み
- Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
- Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
- Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
- Lチカで始めるテスト自動化(7)タイムスタンプの保存
- Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
- Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
- Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
- Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
- Lチカで始めるテスト自動化(12)書籍化の作業メモ
- Lチカで始めるテスト自動化(13)外部プログラムの呼出し
- Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
- Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する
-
Raspbian GNU/Linux 10 (buster)に組込まれているpython3のバージョンです。 ↩