LoginSignup
3
0

More than 5 years have passed since last update.

AWS IoTとBME280で遊んでみた

Posted at

RaspberryPiに接続した温度・気圧・湿度を測定するセンサー(BME280)のデータを、AWS IoTに送信してみました。

必要なもの

作業概要

 1. AWS IoT SDK チュートリアル ー Raspberry Pi の接続の実施
 2. AWS IoT Device SDK for Pythonに付属しているサンプルbasicPubSub.pyの実行
 3. Raspberry Pi のセットアップとBME280の接続
 4. AWS IoTへセンサーデータ(温度・気圧・湿度)を送信

1. AWS IoT SDK チュートリアル ー Raspberry Pi の接続の実施

チュートリアル通り、モノと証明書とキーペアを作成後、証明書にポリシーをアタッチします。
手順の途中で作成するパブリックおよびプライベートキー、証明書、ルート CA をダウンロードしておきます。

2. AWS IoT Device SDK for Pythonに付属しているサンプルbasicPubSub.pyの実行

SDKに付属しているAWS IoTへ送信するサンプルが動作することを確認をします。後でこのサンプルを利用して、センサーデータをAWS IoTへ送信します。

AWS IoTのドキュメントにはAWS IoT SDKの例はCとJavaScriptだけでPythonの例がありませんが、GitHubにあるAWS IoT Device SDK for Pythonに丁寧な解説が付いています。

Raspberry Piの適当なフォルダ(/hoge/fuga)にプライベートキー、証明書、ルートCAをアップロードして、SDKに付属しているサンプルbasicPubSub.pyを動かしてみます。

$ sudo pip install AWSIoTPythonSDK
$ keyPath=/hoge/fuga
$ endpoint=xxxxxxxxxxxx.iot.<リージョン>.amazonaws.com
$ rootCAFilePath=${keyPath}/<VeriSignのルートCA>
$ certFilePath=${keyPath}/xxxxxxxxxx-certificate.pem.crt
$ privateKeyFilePath=${keyPath}/xxxxxxxxxx-private.pem.key
$ git clone https://github.com/aws/aws-iot-device-sdk-python.git
$ cd aws-iot-device-sdk-python/samples/basicPubSub
$ python basicPubSub.py -e ${endpoint} -r ${rootCAFilePath} -c ${certFilePath} -k ${privateKeyFilePath} -id MyClientID -t MyTopic

AWS IoTのコンソールの[テスト]で、トピックに[MyTopic]を指定してサブスクライブして、下記が表示されれば成功です。

basicPubsub.png

3. Raspberry Pi のセットアップとBME280の接続

Raspberry Piの画面からI2Cを有効にしておきます。
BME280とRaspberry Piは下図のように接続します。

aws_iot_tutorial.png

※今回はI2Cアドレスは0x76を利用するので、SDO端子をGNDにつないでおきます。

4. センサーデータ(温度・気圧・湿度)のAWS IoTへの送信

BME280からデータ(温度・気圧・湿度)を読み取るプログラム(bme280.py)を作成し、次に上で実行確認したサンプル(BasicPubSub.py)からプログラムを呼び出すようにサンプル修正してBME280からのデータをAWS IoTに送信します。

1. BME280から温度・気圧・湿度を読み取るプログラムの作成

Bosch社のBME280のデータシートスイッチサイエンス社のサイトを参考にしました。

大まかな流れです。
 1. レジスタに設定値を設定 (init_bme280)
 2. 補正用データ(固定値)を取得 (read_trimming_parameter)
 3. 生データを取得 (read_bme280の前半)
 4. データを補正 (BME280_compensate_XXX)

bme280.py
from smbus2 import SMBusWrapper
import time


dig_T = []
dig_P = []
dig_H = []
t_fine = None

# I2C Address
I2CADDR  = 0x76

# BME280 Register
# CTRL_HUM(0xF2)
OSRS_H   = 0b001
# CTRL_MEAS(0xF4 )
OSRS_T   = 0b001
OSRS_P   = 0b001
MODE     = 0b11
# CONFIG(0xF5)
T_SB     = 0b101
FILTER   = 0b000
SPI3W_EN = 0b0


def to_signed_short(val):
    return -(val & 0b1000000000000000) | (val & 0b0111111111111111)


def to_signed_12bit(val):
    return -(val & 0b100000000000) | (val & 0b011111111111)


def to_signed_8bit(val):
    return -(val & 0b10000000) | (val & 0b01111111)


def init_bme280():
    with SMBusWrapper(1) as bus:
        CTRL_HUM = OSRS_H
        CTRL_MEAS = (OSRS_T << 5) + (OSRS_P << 2) + MODE
        CONFIG = (T_SB << 5) + (FILTER << 2) + SPI3W_EN
        bus.write_byte_data(I2CADDR, 0xF2, CTRL_HUM )
        bus.write_byte_data(I2CADDR, 0xF4, CTRL_MEAS)
        bus.write_byte_data(I2CADDR, 0xF5, CONFIG   )


def read_bme280():
    with SMBusWrapper(1) as bus:
        block = bus.read_i2c_block_data(I2CADDR, 0xF7, 8)
        raw_pressure    = block[0]*256*16 + block[1]*16 + (block[2] >> 4)
        raw_temperature = block[3]*256*16 + block[4]*16 + (block[5] >> 4)
        raw_humidity    = block[6]*256 + block[7]

        temperature = BME280_compensate_T_int32(raw_temperature)
        pressure    = BME280_compensate_P_int64(raw_pressure)
        humidity    = bme280_compensate_H_int32(raw_humidity)

        return temperature, pressure, humidity


def read_trimming_parameter():
    with SMBusWrapper(1) as bus:
        res1 = bus.read_i2c_block_data(0x76, 0x88, 26)
        res2 = bus.read_i2c_block_data(0x76, 0xE1, 7)

        dig_T.append(res1[1] * 256 + res1[0]) # [7:0] / [15:8] unsigned short
        dig_T.append(to_signed_short(res1[3]  * 256 + res1[2])) # [7:0] / [15:8] signed short
        dig_T.append(to_signed_short(res1[5]  * 256 + res1[4])) # [7:0] / [15:8] signed short
        dig_P.append(res1[7] * 256 + res1[6]) # [7:0] / [15:8] unsigned short
        dig_P.append(to_signed_short(res1[9]  * 256 + res1[8])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[11] * 256 + res1[10])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[13] * 256 + res1[12])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[15] * 256 + res1[14])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[17] * 256 + res1[16])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[19] * 256 + res1[18])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[21] * 256 + res1[20])) # [7:0] / [15:8] signed short
        dig_P.append(to_signed_short(res1[23] * 256 + res1[22])) # [7:0] / [15:8] signed short
        dig_H.append(res1[25]) # [7:0] unsigned char
        dig_H.append(to_signed_short(res2[1]  * 256 + res2[0])) # [7:0] / [15:8] signed short
        dig_H.append(res2[2]) # [7:0] unsigned char
        dig_H.append(to_signed_12bit(res2[3]  * 16 + (res2[4] & 0x0F))) # [11:4] / [3:0] signed short
        dig_H.append(to_signed_12bit(res2[5]  * 16 + (res2[4] >> 4))) # [3:0] / [11:4] signed short
        dig_H.append(to_signed_8bit(res2[6])) # signed char


def BME280_compensate_T_int32(adc_T):
    global t_fine
    var1 = (((adc_T>>3) - (dig_T[0]<<1)) * (dig_T[1])) >> 11
    var2 = (((((adc_T>>4) - dig_T[0]) * ((adc_T>>4) - dig_T[0])) >> 12) * dig_T[2]) >> 14
    t_fine = var1 + var2
    T = (t_fine * 5 + 128) >> 8

    return T / 100.0


def BME280_compensate_P_int64(adc_P):
    var1 = t_fine - 128000
    var2 = var1 * var1 * dig_P[5]
    var2 = var2 + ((var1 * dig_P[4]) << 17)
    var2 = var2 + (dig_P[3] << 35)
    var1 = ((var1 * var1 * dig_P[2]) >> 8) + ((var1 * dig_P[1]) << 12)
    var1 = ((1 << 47) + var1) * dig_P[0] >> 33;

    if var1 == 0:
        return 0 # avoid exception caused by division by zero

    p = 1048576 - adc_P

    p = (((p << 31) - var2) * 3125) / var1
    var1 = (dig_P[8] * (p >> 13) * (p >> 13)) >> 25
    var2 = (dig_P[7] * p) >> 19
    p = ((p + var1 + var2) >> 8) + (dig_P[6] << 4)

    return p / 256.0 / 100.0


def bme280_compensate_H_int32(adc_H):
    v_x1_u32r = t_fine - 76800
    v_x1_u32r = ( \
        ((((adc_H << 14) - (dig_H[3] << 20) - (dig_H[4] * v_x1_u32r)) + 16384) >> 15) \
        * \
        (((((((v_x1_u32r * dig_H[5]) >> 10) * (((v_x1_u32r * dig_H[2]) >> 11) + 32768)) >> 10) + 2097152) * dig_H[1] + 8192) >> 14) \
    )
    v_x1_u32r = (v_x1_u32r - (((((v_x1_u32r >> 15) * (v_x1_u32r >> 15)) >> 7) * dig_H[0]) >> 4))
    if v_x1_u32r < 0:
        v_x1_u32r = 0
    elif v_x1_u32r > 419430400:
        v_x1_u32r = 419430400

    return (v_x1_u32r >> 12) / 1024.0


if __name__ == '__main__':
    init_bme280()
    read_trimming_parameter()
    temperature, pressure, humidity = read_bme280()
    print('temperature: ' + str(temperature))
    print('pressure   : ' + str(pressure))
    print('humidity   : ' + str(humidity))

2. サンプルプログラム(basicPubSub.py)の修正

basicPubSub.pyを下記のように変更し、別名(myBasicPubSub.py)で保存します。

  • 上で作成したbme280.pyのインポートを追加
myBasicPubSub.py
import bme280
  • BME280からのデータを送信する部分を追加
myBasicPubSub.py
# bme280
bme280.init_bme280()
bme280.read_trimming_parameter()

# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if args.mode == 'both' or args.mode == 'publish':
        temperature, pressure, humidity = bme280.read_bme280()

        message = {}
        message['message']     = args.message
        message['sequence']    = loopCount
        message['temperature'] = temperature
        message['pressure']    = pressure
        message['humidity']    = humidity
        messageJson = json.dumps(message)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if args.mode == 'publish':
            print('Published topic %s: %s\n' % (topic, messageJson))
        loopCount += 1
    time.sleep(1)

3. 実行

bme280.pyとmyBasicPubSub.pyを同じディレクトリに配置して実行します。

$ sudo pip install smbus2
$ python myBasicPubSub.py -e ${endpoint} -r ${rootCAFilePath} -c ${certFilePath} -k ${privateKeyFilePath} -id myClientId -t MyTopic

4. 動作確認

AWS IoTのコンソールの[テスト]で、トピックに[MyTopic]を指定してサブスクライブして、下記が表示されれば成功です。

myBasicPubsub.png

雑感

  • 一度I2C通信に慣れておけば、他のセンサーでも応用が効きそう
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0