LoginSignup
0
0

More than 3 years have passed since last update.

Lチカで始めるテスト自動化(16)秋月のIoT学習HATキットにBME280を接続してテスト実行環境の温度・湿度・気圧を取得する

Posted at

1. はじめに

一つ前の記事4.おわりにで以下のように書きました。

IoT学習HATキットのBME280接続ソケットにオプションのBME280使用 温湿度・気圧センサモジュールキットを接続してテスト実行環境をモニタリングするのも面白そうと思いました。

ちょうど手元にBME280があるので試してみました。
IMG_0910s.jpg

左の青い基板はスイッチサイエンスのBME280搭載 温湿度・気圧センサモジュール、右の緑の基板は秋月電子通商のBME280使用 温湿度・気圧センサモジュールキットです。秋月版はそのままIoT学習HATキットに挿せますがスイッチサイエンス版はピン数、ピン配置が異なるため配線に注意します。ソフトウェアはどちらも同じものが利用できます。

2. BME280ドライバ

スイッチサイエンスがMITライセンスで公開されているbme280_sample.pyを利用させていただき、下記3点変更します。

  • python3.7.31で動くようにprint文を修正
  • テストランナーから呼び出して結果を取得できるように変更
  • BME280のI2Cアドレスを0x76固定ではなくテストスクリプトで指定できるように変更

smbus2モジュールをあらかじめRaspberry Pi Zero WHへインストールします。

> pip3 install smbus2
bme280.py(bme280_sample.pyとの差分)
1a2,3
> # This code is based on
> # https://github.com/SWITCHSCIENCE/samplecodes/blob/master/BME280/Python27/bme280_sample.py
61c63,64
< def readData():
---
> def readData(addr):
>   i2c_address = addr
69,71c72,77
<   compensate_T(temp_raw)
<   compensate_P(pres_raw)
<   compensate_H(hum_raw)
---
>   temp = compensate_T(temp_raw)
>   pres = compensate_P(pres_raw)
>   hum  = compensate_H(hum_raw)
>   data = "%.1f,%.1f,%.1f" % (temp, hum, pres)
>   print(data)
>   return data
95c101,102
<   print "pressure : %7.2f hPa" % (pressure/100)
---
>   #print ("pressure : %7.2f hPa" % (pressure/100))
>   return pressure/100
103c110,111
<   print "temp : %-6.2f ℃" % (temperature) 
---
>   #print ("temp : %-6.2f ℃" % (temperature) )
>   return temperature
117c125,126
<   print "hum : %6.2f %" % (var_h)
---
>   #print ("hum : %6.2f %" % (var_h))
>   return var_h
144c153
<       readData()
---
>       readData(0x76)

3. テストランナーの改修

BME280から温度、湿度、気圧を取得して結果ファイルに出力するコマンド「env」を追加します。引数としてBME280のI2Cアドレス(0x76または0x77)を与えます。一つ前の記事の 付録A. test-runner.pyのソース との差分を示します。全部入りソースは付録をご覧ください。

test-runner.py(差分)
24a25
> import bme280
353a355,364
> 
>                 elif cmd[0] == "env":
>                     try:
>                         val = bme280.readData(int(cmd[1],16))
>                     except:
>                         is_passed = False
>                     else:
>                         val = val.split(',')
>                         for i in val:
>                             cmd.append(i)

テストスクリプトと結果ファイルの例です。結果ファイルには気温、湿度、気圧の順に値が格納されます。

script.csv
env,0x76
script_result.csv
2021/05/01 12:22:40,env,0x76,29.4,50.3,1003.2,OK

4. おわりに

  • 秋月のIoT学習HATキットはテスト自動化の学習にも役立つと思いました。
    • 8文字x2行のLCDでテストの進捗状況がわかる
    • 緑/黄/赤のLEDをアンドンに見立ててテストの結果がわかる
    • センサを接続することでテスト実行環境の温度、湿度、気圧を記録できる

付録.A test-runner.pyのソース

test-runner.py
#!/usr/bin/python3

#
# This software includes the work that is distributed
# in the Apache License 2.0
#

from time import sleep
import random
import sys
import codecs
import csv
import datetime
#import serial
#import pyvisa as visa
#import cv2
#from PIL import Image
#import pyocr
#import pyocr.builders
#import platform
import subprocess
from subprocess import PIPE
from AKI_I2C_AQM0802A import AKI_I2C_AQM0802A
import RPi.GPIO as GPIO
import bme280

UNINITIALIZED = 0xdeadbeef
NUM_OF_SERVO = 6

# Akizuki IoT Learning HAT GPIO Number
LED_GR = 22
LED_Y = 27
LED_R = 17
LED_LCD_BACKLIGHT = 26

def serial_write(h, string):
    if h != UNINITIALIZED:
        string = string + '\n'
        string = str.encode(string)
        h.write(string)
        return True
    else:
        print("UART Not Initialized.")
        return False

def close_uart(h):
    if h != UNINITIALIZED:
        h.close()
    else:
        #print("UART Not Initialized.")
        pass

def open_dso():
    rm = visa.ResourceManager()
    resources = rm.list_resources()
    #print(resources)
    for resource in resources:
        #print(resource)
        try:
            dso = rm.open_resource(resource)
        except:
            print(resource, "Not Found.")
        else:
            print(resource, "Detected.")
            return dso

    #Throw an error to caller if none succeed.
    return dso

def open_cam(camera_number, width, height):
    h = cv2.VideoCapture(camera_number)
    h.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    h.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    return h

def close_cam(cam):
    if cam != UNINITIALIZED:
        cam.release()

def capture_cam(cam, filename):
    if cam != UNINITIALIZED:
        _, img = cam.read()
        cv2.imwrite(filename, img)
        return True
    else:
        print("CAM Not Ready.")
        return False

def crop_img(filename_in, v, h, filename_out):
    img = cv2.imread(filename_in, cv2.IMREAD_COLOR)
    v0 = int(v.split(':')[0])
    v1 = int(v.split(':')[1])
    h0 = int(h.split(':')[0])
    h1 = int(h.split(':')[1])
    img2 = img[v0:v1, h0:h1]
    cv2.imwrite(filename_out, img2)
    return True

def open_ocr():
    ocr = pyocr.get_available_tools()
    if len(ocr) != 0:
        ocr = ocr[0]
    else:
        ocr = UNINITIALIZED
        print("OCR Not Ready.")
    return ocr

def exec_ocr(ocr, filename):
    try:
        txt = ocr.image_to_string(
            Image.open(filename),
            lang = "eng",
            builder = pyocr.builders.TextBuilder()
        )
    except:
        print("OCR Fail.")
    else:
        return txt

def exec_labelimg(filename, label_string):
    if platform.system() == "Windows" :
        python = "python"
        grep = "findstr"
    else:
        python = "python3"
        grep = "grep"

    cmd = python + \
          " label_image.py \
          --graph=c:\\tmp\\output_graph.pb \
          --labels=c:\\tmp\\output_labels.txt \
          --input_layer=Placeholder \
          --output_layer=final_result \
          --image=" + filename \
          + "|" + grep + " " + label_string
    print(cmd)
    log = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
    ret = log.stdout.strip().decode("utf-8").split(" ")[1]
    return ret

def set_servo(uart, servo_id, servo_pos):
    if servo_id < 0 or servo_id >= NUM_OF_SERVO:
        print("Invalid Servo ID")
        return False

    if servo_pos < 0 or servo_pos > 180:
        print("Invalid Servo Position")
        return False

    if uart != UNINITIALIZED:
        serial_write(uart, "servoread")

        servo_position = uart.readline().strip().decode('utf-8')
        print(servo_position)

        # discard "OK"
        devnull = uart.readline().strip().decode('utf-8')

        current_pos = int(servo_position.split(' ')[servo_id])
        #print(servo_id, current_pos)

        if current_pos < servo_pos:
            start = current_pos +1
            stop  = servo_pos +1
            step  = 1
        else:
            start = current_pos -1
            stop  = servo_pos -1
            step  = -1

        for i in range(start, stop, step):
            command = "servo " + str(servo_id) + " " + str(i)
            print(command)
            serial_write(uart, command)
            # discard "OK"
            devnull = uart.readline().strip().decode('utf-8')
            sleep(0.2) # sec.

        return True
    else:
        print("UART Not Initialized.")
        return False

def led_init():
    GPIO.setmode(GPIO.BCM)

    GPIO.setwarnings(False)
    GPIO.setup(LED_GR, GPIO.OUT)
    GPIO.setup(LED_Y, GPIO.OUT)
    GPIO.setup(LED_R, GPIO.OUT)
    GPIO.setup(LED_LCD_BACKLIGHT, GPIO.OUT)

    GPIO.output(LED_GR, False)
    GPIO.output(LED_Y, True)
    GPIO.output(LED_R, False)
    GPIO.output(LED_LCD_BACKLIGHT, True)

def led_pass():
    GPIO.output(LED_GR, True)
    GPIO.output(LED_Y, False)
    GPIO.output(LED_R, False)

def led_fail():
    GPIO.output(LED_GR, False)
    GPIO.output(LED_Y, False)
    GPIO.output(LED_R, True)

def lcd_init():
    lcd = AKI_I2C_AQM0802A()
    lcd.Init_LCD()
    lcd.ClearDisplay()
    lcd.NewClearDisplay(0, 0)
    lcd.NewClearDisplay(1, 0)
    return lcd

def lcd_str(lcd, row, col, str):
    lcd.NewClearDisplay(row, 0)
    lcd.WritePos(row, col)
    lcd.WriteStr(str)

def main():
    is_passed = True
    val = str(UNINITIALIZED)
    fval = 0.0
    uart = UNINITIALIZED
    dso = UNINITIALIZED
    cam = UNINITIALIZED
    ocr = UNINITIALIZED
    led_init()
    lcd = lcd_init()

    if len(sys.argv) == 2:
        script_file_name = sys.argv[1] + ".csv"
        result_file_name = sys.argv[1] + "_result.csv"
    else:
        script_file_name = "script.csv"
        result_file_name = "result.csv"

    script_total_line = subprocess.check_output(['wc', '-l', script_file_name]).decode().split(' ')[0]
    lcd_str(lcd, 0, 0, script_total_line)

    with codecs.open(script_file_name, 'r', 'utf-8') as file:
        script = csv.reader(file, delimiter=',', lineterminator='\r\n', quotechar='"')

        with codecs.open(result_file_name, 'w', 'utf-8') as file:
            result = csv.writer(file, delimiter=',', lineterminator='\r\n', quotechar='"')

            script_line = 0
            for cmd in script:
                timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
                script_line += 1
                lcd_str(lcd, 1, 0, str(script_line))
                print("########## " + str(script_line) + " ##########")
                print(cmd)

                if "#" in cmd[0]:
                    pass

                elif cmd[0] == "sleep":
                    sleep(float(cmd[1]))

                elif cmd[0] == "rusleep":
                    fval = random.uniform(float(cmd[1]), float(cmd[2]))
                    cmd.append(str(fval))
                    sleep(fval)

                elif cmd[0] == "open_uart":
                    if len(cmd) == 2:
                        dsrdtr_val = 1
                    else:
                        dsrdtr_val = int(cmd[2])
                    try:
                        uart = serial.Serial(cmd[1], 115200, timeout=1.0, dsrdtr=dsrdtr_val)
                    except:
                        is_passed = False

                elif cmd[0] == "send":
                    ret = serial_write(uart, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "rcvd":
                    try:
                        val = uart.readline().strip().decode('utf-8')
                        cmd.append(val)
                    except:
                        is_passed = False

                elif cmd[0] == "open_dso":
                    try:
                        dso = open_dso()
                    except:
                        is_passed = False

                elif cmd[0] == "dso":
                    try:
                        if "?" in cmd[1]:
                            val = dso.query(cmd[1]).rstrip().replace(",", "-")
                            cmd.append(val)
                        else:
                            dso.write(cmd[1])
                    except:
                        is_passed = False

                elif cmd[0] == "open_cam":
                    if len(cmd) == 2:
                        cam = open_cam(int(cmd[1]), 640, 480)
                    else:
                        cam = open_cam(int(cmd[1]), int(cmd[2]), int(cmd[3]))

                elif cmd[0] == "close_cam":
                    close_cam(cam)
                    cam = UNINITIALIZED

                elif cmd[0] == "capture_cam":
                    ret = capture_cam(cam, cmd[1])
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "crop_img":
                    crop_img(cmd[1], cmd[2], cmd[3], cmd[4])

                elif cmd[0] == "open_ocr":
                    ocr = open_ocr()
                    if ocr == UNINITIALIZED:
                        is_passed = False

                elif cmd[0] == "exec_ocr":
                    try:
                        val = exec_ocr(ocr, cmd[1])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "exec_labelimg":
                    try:
                        val = exec_labelimg(cmd[1], cmd[2])
                    except:
                        is_passed = False
                    else:
                        cmd.append(str(val))

                elif cmd[0] == "set_servo":
                    ret = set_servo(uart, int(cmd[1]), int(cmd[2]))
                    if ret == False:
                        is_passed = False

                elif cmd[0] == "run":
                    ret = subprocess.run(cmd[1], shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
                    val = ret.stdout.strip()
                    print(ret)
                    if ret.returncode != 0:
                        is_passed = False

                elif cmd[0] == "env":
                    try:
                        val = bme280.readData(int(cmd[1],16))
                    except:
                        is_passed = False
                    else:
                        val = val.split(',')
                        for i in val:
                            cmd.append(i)

                elif cmd[0] == "eval_str_eq":
                    if str(val) != str(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_eq":
                    if int(val) != int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_gt":
                    if int(val) < int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_int_lt":
                    if int(val) > int(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_eq":
                    if float(val) != float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_gt":
                    if float(val) < float(cmd[1]):
                        is_passed = False

                elif cmd[0] == "eval_dbl_lt":
                    if float(val) > float(cmd[1]):
                        is_passed = False

                else:
                    cmd.append("#")

                if is_passed == True:
                    cmd.append("OK")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                else:
                    cmd.append("NG")
                    cmd.insert(0,timestamp)
                    print(cmd)
                    result.writerow(cmd)
                    close_uart(uart)
                    close_cam(cam)
                    led_fail()
                    print("FAIL")
                    sys.exit(1)

    if is_passed == True:
        close_uart(uart)
        close_cam(cam)
        led_pass()
        print("PASS")
        sys.exit(0)

main()

付録.B BME280ドライバ

bme280.py
#coding: utf-8
# This code is based on
# https://github.com/SWITCHSCIENCE/samplecodes/blob/master/BME280/Python27/bme280_sample.py

from smbus2 import SMBus
import time

bus_number  = 1
i2c_address = 0x76

bus = SMBus(bus_number)

digT = []
digP = []
digH = []

t_fine = 0.0


def writeReg(reg_address, data):
    bus.write_byte_data(i2c_address,reg_address,data)

def get_calib_param():
    calib = []

    for i in range (0x88,0x88+24):
        calib.append(bus.read_byte_data(i2c_address,i))
    calib.append(bus.read_byte_data(i2c_address,0xA1))
    for i in range (0xE1,0xE1+7):
        calib.append(bus.read_byte_data(i2c_address,i))

    digT.append((calib[1] << 8) | calib[0])
    digT.append((calib[3] << 8) | calib[2])
    digT.append((calib[5] << 8) | calib[4])
    digP.append((calib[7] << 8) | calib[6])
    digP.append((calib[9] << 8) | calib[8])
    digP.append((calib[11]<< 8) | calib[10])
    digP.append((calib[13]<< 8) | calib[12])
    digP.append((calib[15]<< 8) | calib[14])
    digP.append((calib[17]<< 8) | calib[16])
    digP.append((calib[19]<< 8) | calib[18])
    digP.append((calib[21]<< 8) | calib[20])
    digP.append((calib[23]<< 8) | calib[22])
    digH.append( calib[24] )
    digH.append((calib[26]<< 8) | calib[25])
    digH.append( calib[27] )
    digH.append((calib[28]<< 4) | (0x0F & calib[29]))
    digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
    digH.append( calib[31] )

    for i in range(1,2):
        if digT[i] & 0x8000:
            digT[i] = (-digT[i] ^ 0xFFFF) + 1

    for i in range(1,8):
        if digP[i] & 0x8000:
            digP[i] = (-digP[i] ^ 0xFFFF) + 1

    for i in range(0,6):
        if digH[i] & 0x8000:
            digH[i] = (-digH[i] ^ 0xFFFF) + 1  

def readData(addr):
    i2c_address = addr
    data = []
    for i in range (0xF7, 0xF7+8):
        data.append(bus.read_byte_data(i2c_address,i))
    pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
    temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
    hum_raw  = (data[6] << 8)  |  data[7]

    temp = compensate_T(temp_raw)
    pres = compensate_P(pres_raw)
    hum  = compensate_H(hum_raw)
    data = "%.1f,%.1f,%.1f" % (temp, hum, pres)
    print(data)
    return data

def compensate_P(adc_P):
    global  t_fine
    pressure = 0.0

    v1 = (t_fine / 2.0) - 64000.0
    v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
    v2 = v2 + ((v1 * digP[4]) * 2.0)
    v2 = (v2 / 4.0) + (digP[3] * 65536.0)
    v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)  + ((digP[1] * v1) / 2.0)) / 262144
    v1 = ((32768 + v1) * digP[0]) / 32768

    if v1 == 0:
        return 0
    pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
    if pressure < 0x80000000:
        pressure = (pressure * 2.0) / v1
    else:
        pressure = (pressure / v1) * 2
    v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
    v2 = ((pressure / 4.0) * digP[7]) / 8192.0
    pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)  

    #print ("pressure : %7.2f hPa" % (pressure/100))
    return pressure/100

def compensate_T(adc_T):
    global t_fine
    v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
    v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
    t_fine = v1 + v2
    temperature = t_fine / 5120.0
    #print ("temp : %-6.2f ℃" % (temperature) )
    return temperature

def compensate_H(adc_H):
    global t_fine
    var_h = t_fine - 76800.0
    if var_h != 0:
        var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
    else:
        return 0
    var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
    if var_h > 100.0:
        var_h = 100.0
    elif var_h < 0.0:
        var_h = 0.0
    #print ("hum : %6.2f %" % (var_h))
    return var_h


def setup():
    osrs_t = 1          #Temperature oversampling x 1
    osrs_p = 1          #Pressure oversampling x 1
    osrs_h = 1          #Humidity oversampling x 1
    mode   = 3          #Normal mode
    t_sb   = 5          #Tstandby 1000ms
    filter = 0          #Filter off
    spi3w_en = 0            #3-wire SPI Disable

    ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
    config_reg    = (t_sb << 5) | (filter << 2) | spi3w_en
    ctrl_hum_reg  = osrs_h

    writeReg(0xF2,ctrl_hum_reg)
    writeReg(0xF4,ctrl_meas_reg)
    writeReg(0xF5,config_reg)


setup()
get_calib_param()


if __name__ == '__main__':
    try:
        readData(0x76)
    except KeyboardInterrupt:
        pass

付録C. Lチカで始めるテスト自動化・記事一覧

電子書籍化したものを技術書典で頒布しています。

  1. Lチカで始めるテスト自動化
  2. Lチカで始めるテスト自動化(2)テストスクリプトの保守性向上
  3. Lチカで始めるテスト自動化(3)オシロスコープの組込み
  4. Lチカで始めるテスト自動化(4)テストスクリプトの保守性向上(2)
  5. Lチカで始めるテスト自動化(5)WebカメラおよびOCRの組込み
  6. Lチカで始めるテスト自動化(6)AI(機械学習)を用いたPass/Fail判定
  7. Lチカで始めるテスト自動化(7)タイムスタンプの保存
  8. Lチカで始めるテスト自動化(8)HDMIビデオキャプチャデバイスの組込み
  9. Lチカで始めるテスト自動化(9)6DoFロボットアームの組込み
  10. Lチカで始めるテスト自動化(10)6DoFロボットアームの制御スクリプトの保守性向上
  11. Lチカで始めるテスト自動化(11)ロボットアームのコントローラ製作
  12. Lチカで始めるテスト自動化(12)書籍化の作業メモ
  13. Lチカで始めるテスト自動化(13)外部プログラムの呼出し
  14. Lチカで始めるテスト自動化(14)sleepの時間をランダムに設定する
  15. Lチカで始めるテスト自動化(15)Raspberry Pi Zero WHでテストランナーを動かして秋月のIoT学習HATキットに進捗を表示する

  1. Raspbian GNU/Linux 10 (buster)に組込まれているpython3のバージョンです。 

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0