9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ラズベリーパイにセンサーを接続し、温度/湿度/圧力をFlaskで取得する

Last updated at Posted at 2019-09-26

目的

ラズベリーパイに温湿度圧力センサーを接続する。
Flaskを設定し、温度/湿度/圧力を取得できるようにする。

Flask添付画像.png

実行環境

ハード

  • Raspberry Pi Zero W
  • AE-BME280(温湿度圧力センサー)
    • はんだ付けが必要
  • ブレッドボード
  • ジャンパワイヤー

ソフト

  • Raspbian 4.14
    (構築時はRaspbian Buster with desktop and recommended softwareでキッティング)
  • Flask 0.12.1

温湿度センサー設定

キッティング

ラズベリーパイとセンサーを接続する。
参考サイト:https://deviceplus.jp/hobby/raspberrypi_entry_039/

温度取得スクリプト

準備

必要に応じて、パッケージをインストールする。

# apt-get install i2c-tools
# apt-get python-smbus

スクリプト作成の方針

元ネタのPythonスクリプトでは、温度/湿度/圧力を全て呼び出す1つのスクリプトでしたが、
Flaskで呼び出す都合上、温度/湿度/圧力/共通部分のスクリプトに分散しました。

温度取得を例にポイントをまとめます。

また、レジスタへのアクセスなどの処理は特に変更していません。

参考サイト:
https://github.com/SWITCHSCIENCE/BME280/blob/master/Python27/bme280_sample.py
http://akizukidenshi.com/download/ds/bosch/BST-BME280_DS001-10.pdf
http://akizukidenshi.com/download/ds/akizuki/AE-BME280_manu_v1.1.pdf

スクリプト

common.py
#coding: utf-8

from smbus import SMBus
import time

i2c_address = 0x76
bus_number  = 1
bus = SMBus(bus_number)

digT = []
digP = []
digH = []

t_fine = 0.0

def writeReg(reg_address, data):
    bus.write_byte_data(i2c_address, reg_address, data)

def get_calib_param():
    calib = []
    
    for i in range (0x88,0x88+24):
        calib.append(bus.read_byte_data(i2c_address,i))
    calib.append(bus.read_byte_data(i2c_address,0xA1))
    for i in range (0xE1,0xE1+7):
        calib.append(bus.read_byte_data(i2c_address,i))

    digT.append((calib[1] << 8) | calib[0])
    digT.append((calib[3] << 8) | calib[2])
    digT.append((calib[5] << 8) | calib[4])
    digP.append((calib[7] << 8) | calib[6])
    digP.append((calib[9] << 8) | calib[8])
    digP.append((calib[11]<< 8) | calib[10])
    digP.append((calib[13]<< 8) | calib[12])
    digP.append((calib[15]<< 8) | calib[14])
    digP.append((calib[17]<< 8) | calib[16])
    digP.append((calib[19]<< 8) | calib[18])
    digP.append((calib[21]<< 8) | calib[20])
    digP.append((calib[23]<< 8) | calib[22])
    digH.append( calib[24] )
    digH.append((calib[26]<< 8) | calib[25])
    digH.append( calib[27] )
    digH.append((calib[28]<< 4) | (0x0F & calib[29]))
    digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
    digH.append( calib[31] )
    
    for i in range(1,2):
        if digT[i] & 0x8000:
            digT[i] = (-digT[i] ^ 0xFFFF) + 1

    for i in range(1,8):
        if digP[i] & 0x8000:
            digP[i] = (-digP[i] ^ 0xFFFF) + 1

    for i in range(0,6):
        if digH[i] & 0x8000:
            digH[i] = (-digH[i] ^ 0xFFFF) + 1

def compensate_P(adc_P):
    global  t_fine
    pressure = 0.0
    
    v1 = (t_fine / 2.0) - 64000.0
    v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
    v2 = v2 + ((v1 * digP[4]) * 2.0)
    v2 = (v2 / 4.0) + (digP[3] * 65536.0)
    v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)  + ((digP[1] * v1) / 2.0)) / 262144
    v1 = ((32768 + v1) * digP[0]) / 32768
    
    if v1 == 0:
        return 0
    pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
    if pressure < 0x80000000:
        pressure = (pressure * 2.0) / v1
    else:
        pressure = (pressure / v1) * 2
    v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
    v2 = ((pressure / 4.0) * digP[7]) / 8192.0
    pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)  

    return "pressure : %7.2f hPa" % (pressure / 100)
    
def compensate_T(adc_T):
    global t_fine
    v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
    v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
    t_fine = v1 + v2
    temperature = t_fine / 5120.0
    return "temp : %-6.2f " % (temperature)


def compensate_H(adc_H):
    global t_fine
    var_h = t_fine - 76800.0
    if var_h != 0:
        var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
    else:
        return 0
    var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
    if var_h > 100.0:
        var_h = 100.0
    elif var_h < 0.0:
        var_h = 0.0
    return "hum : %6.2f %" % (var_h)

def setup():
    osrs_t = 1                  #Temperature oversampling x 1
    osrs_p = 1                  #Pressure oversampling x 1
    osrs_h = 1                  #Humidity oversampling x 1
    mode   = 3                  #Normal mode
    t_sb   = 5                  #Tstandby 1000ms
    filter = 0                  #Filter off
    spi3w_en = 0                        #3-wire SPI Disable

    ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
    config_reg    = (t_sb << 5) | (filter << 2) | spi3w_en
    ctrl_hum_reg  = osrs_h

    writeReg(0xF2,ctrl_hum_reg)
    writeReg(0xF4,ctrl_meas_reg)
    writeReg(0xF5,config_reg)
humidity.py
#coding: utf-8

from smbus import SMBus
from common import *
import time

def readData():
        data = []
        global hum_result
        for i in range (0xF7, 0xF7+8):
                data.append(bus.read_byte_data(i2c_address,i))
        pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        hum_raw  = (data[6] << 8)  |  data[7]

        hum_result = compensate_H(hum_raw)

def hum_start():
    setup()
    get_calib_param()
    readData()
    return hum_result

if __name__ == '__main__':
    try:
        readData()
    except KeyboardInterrupt:
        pass
pressure.py
#coding: utf-8

from smbus import SMBus
from common import *
import time

def readData():
        data = []
        global press_result
        for i in range (0xF7, 0xF7+8):
                data.append(bus.read_byte_data(i2c_address,i))
        pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        hum_raw  = (data[6] << 8)  |  data[7]

        press_result = compensate_P(pres_raw)

def press_start():
    setup()
    get_calib_param()
    readData()
    return press_result

if __name__ == '__main__':
    try:
        readData()
    except KeyboardInterrupt:
        pass
temperature.py
#coding: utf-8

from smbus import SMBus
from common import *
import time

def readData():
        data = []
        global temp_result
        for i in range (0xF7, 0xF7+8):
                data.append(bus.read_byte_data(i2c_address,i))
        pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        hum_raw  = (data[6] << 8)  |  data[7]

        temp_result = compensate_T(temp_raw)

def temp_start():
    setup()
    get_calib_param()
    readData()
    return temp_result

if __name__ == '__main__':
    try:
        readData()
    except KeyboardInterrupt:
        pass

Flask設定

Flaskドキュメント:https://a2c.bitbucket.io/flask/

アプリケーション作成

flask_start.py
from flask import Flask
from pressure import *
from humidity import *
from temperature import *
app = Flask(__name__)

@app.route('/temp')
def temp_check():
    return temp_start()

@app.route('/press')
def press_check():
    return press_start()

@app.route('/hum')
def hum_check():
    return hum_start()

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=True)

実行

# python flask_start.py

(バックグランド処理の場合)
# python flask_start.py > /dev/null 2>&1 &

動作確認

IPアドレスを確認する。

# ip addr

ブラウザでアクセスし、値が画面に表示されることを確認する。

URL
http://IPアドレス:80/temp  # 温度
http://IPアドレス:80/hum   # 湿度
http://IPアドレス:80/press # 圧力
9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?