はじめに
散歩をしていたらESP32で取得したデータをJSON形式でデータベースに保存したいという気持ちになったので実装をしてみました.
概要
温度センサー(BME280)から温度データを取得し,ESP32からJSON形式でMongoDBに保存します.FastAPIを使ってみます.
ESP32のファームウェアはMicroPythonのものを使っています.
実装はMongoDB, FastAPI, ESP32に分けて説明します.
実装
MongoDB
Ubuntu22.04にMongoDBをインストール
サイトのコマンドを1から順に実行してください.
1.
sudo apt install software-properties-common gnupg apt-transport-https ca-certificates -y
2.
curl -fsSL https://pgp.mongodb.com/server-7.0.asc | sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor
3.
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list
4.
deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse
5.
sudo apt install mongodb-org -y
6.
mongod --version
MongoDBをstart
sudo systemctl start mongod
2.
sudo systemctl status mongod
3.
sudo systemctl enable mongod
mongosh
でmongoDBに接続できればOKです.
Connecting to: で出てくる緑のアドレスは後で使うのでコピーしておくか,出し方を覚えておいてください.
MongoDB用GUIのMongoDB CompassをPCにインストールしておくと後でデータベースにデータが正確に保存されているか確認する時に便利です.
FastAPI
https://fastapi.tiangolo.com/
JSONをMongoDBに保存するAPIをFastAPIを使って作成します.
pip install fastapi
pip install uvicorn
この二つを使うのでインストールしておいてください.
また,プログラムに出てくるモジュールやライブラリは適宜インストールしてください.
Pythonファイルは3つに分けて記述します.
-
main.py
FastAPIのルーティングやレスポンスを定義します. -
database.py
データベースに対してどんな処理をするか定義します. -
model.py
Pydanticライブラリを使って,受け取るJSONの中身の構造を指定します.バリデーションとか型チェックとか呼ぶらしいです.
main.py
from typing import Union
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from database import (
create_data,
)
from model import Device, Sensors_temp, Sensors_hum, Data
app = FastAPI()
origins = [
'http://localhost:3000',
'http://localhost',
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def read_root():
return {"HELLO" : "WORLD"}
@app.post("/api/data", response_model=Data)
async def post_data(data:Data):
response = await create_data(data.dict())
if response:
return response
raise HTTPException(400, "Something went wrong / Bad Request")
database.py
from model import *
import sys
import datetime
# mongoDB driver
import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
database = client.Monitoring
collection = database.data
async def create_data(data):
document = data
result = await collection.insert_one(document)
return result
model.py
from pydantic import BaseModel
import datetime
from typing import Set, Union
class Device(BaseModel):
id: str
name: str
type: str
manufacturer: str
class Sensors_temp(BaseModel):
id: str
type: str
location: str
value: float
unit: str
timestamp: str
class Sensors_hum(BaseModel):
id: str
type: str
location: str
value: float
unit: str
timestamp: str
class Data(BaseModel):
sensors_hum: Union[Sensors_hum, None] = None
sensors_temp: Union[Sensors_temp, None] = None
device: Union[Device, None] = None
このモデルでは以下のようなJSONが送られることを期待しています.それ以外だとエラーになります.
data = {
"device": {
"id": "002",
"name": "ESP32-WROOM-32",
"type": "MicroController",
"manufacturer": "ESPRESSIF",
},
"sensors_temp": {
"id": "temp_sensor_2",
"type": "temperature",
"location": "door",
"value": float(measure.get_temp()),
"unit": "Celsius",
"timestamp": datetime_str,
},
"sensors_hum": {
"id": "hum_sensor_2",
"type": "humidity",
"location": "door",
"value": float(measure.get_hum()),
"unit": "%",
"timestamp": datetime_str,
},
}
ESP32
ESP32に入れるファイルは以下です.
boot.pyとmain.pyに関しては各自必要に応じて書いてください.
- boot.py
- main.py
- BME280.py
- measure.py
- transmit.py
BME280からのデータ取得とFastAPIに関わるのは下3つのファイルです.
BME280.py
BME280からは温度,湿度,気圧データを取得できます.専用のモジュールをインストールする必要がありますが,microPythonではpipが使えないため,モジュールのファイルを自分で作成する必要があるようです.BME280.pyはそのモジュールファイルです.コピペで使えます.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}".format(hi, hd)
measure.py
import BME280
で先ほどのBME280モジュールファイルをインポートします.
温度,湿度,気圧データを取得するための関数を定義しています.
from machine import Pin,I2C
import utime
import BME280
p21 = Pin(21,Pin.IN,Pin.PULL_UP)
p22 = Pin(22,Pin.IN,Pin.PULL_UP)
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=10000)
def get_temp():
bme = BME280.BME280(i2c=i2c)
temp = bme.temperature
print(f'temperature: {temp} ºC')
return temp
def get_hum():
bme = BME280.BME280(i2c=i2c)
hum = bme.humidity
print(f'humidity: {hum} %')
return hum
def get_pres():
bme = BME280.BME280(i2c=i2c)
pres = bme.pressure
print(f'temperature: {pres} hPa')
return pres
transmit.py
データを取得して,JSON形式にまとめて,fastAPIを呼び出すファイルです.
import measure
でmeasure.pyからデータ取得の関数を呼び出して使っています.
import requests,urequests
import json
import measure
import machine
#現在時刻を取得
url_jst = "http://worldtimeapi.org/api/timezone/Asia/Tokyo"
retry_delay = 5000 # interval time of retry after a failed Web query
response = urequests.get(url_jst)
parsed = response.json()
datetime_str = str(parsed["datetime"])
#送信データをJSON形式にまとめる
data = {
"device": {
"id": "002",
"name": "ESP32-WROOM-32",
"type": "MicroController",
"manufacturer": "ESPRESSIF",
},
"sensors_temp": {
"id": "temp_sensor_2",
"type": "temperature",
"location": "door",
"value": float(measure.get_temp()), #measure.pyで取得したデータ
"unit": "Celsius",
"timestamp": datetime_str, #現在時刻ISO8601形式
},
"sensors_hum": {
"id": "hum_sensor_2",
"type": "humidity",
"location": "door",
"value": float(measure.get_hum()), #measure.pyで取得したデータ
"unit": "%",
"timestamp": datetime_str,
},
}
#送信
def send():
json_data = json.dumps(data)
response = requests.post(
"http://your_VM_ip/api/data",
data = json_data,
headers={"Content-Type":"application/json"}
)
print(response.status_code)
print(data)
send()
microPythonではdatetimeモジュールが使えないようなので,url_jst = "http://worldtimeapi.org/api/timezone/Asia/Tokyo"
で現在時刻を取得しています.
↓このように各JSONのキー部分にセンサーデータの変数を組み込んでいます.
"value": float(measure.get_temp())
send()
でAPIを呼び出してrequests.postしています.post先のアドレスはお使いのVMのIPアドレスに変えて,ルートの/api/data
はFastAPIのmain.pyで定義したルーティング先に合わせてください.
JSONの順番に注意
私は,FastAPIのmodel.pyで
Device -> Sensors_temp -> Sensors_humの順に定義し,ESP32内プログラムのtransmit.pyでも同じように定義しています.なぜなら,model.pyでの定義と一致している必要があるからです.しかし,class Dataの中では順番を逆にしています.
理由はわからないのですが,microPythonからAPIを呼び出してJSONを送ろうとすると上下反転して送信されてしまうようです.私はそこでmodel.pyに引っかかり,エラーになりました.
JSONでデータをネストにしている方は順番を上下逆さまにClass Dataを記述してください.
JSONのネストが無い場合は気にしなくて大丈夫なはずです.
確認
- MongoDBは動いているか
sudo systemctl status mongod
- FastAPIサーバーは動いているか
http://your_VM_ip:port/ にアクセス - ESP32はWi-Fiに接続できているか
実行
こんな感じになればOK
*適当に撮ったスクショなのでターミナルとDBの値は一致してません
参考
FastAPI作成手順
https://techpr.info/python/farm-stack/
https://fastapi.tiangolo.com/
https://fastapi.tiangolo.com/ja/tutorial/body-nested-models/
https://qiita.com/uenosy/items/2f6b1aa258018d3db76c
https://qiita.com/ueda_st_/items/bcbaf3b7c97c5102af4a
MicroPythonで時刻取得
https://beta-notes.way-nifty.com/blog/2020/03/post-c320f1.html
MongoDB,MongoDB Compassの使い方
https://www.mongodb.com/docs/manual/tutorial/install-mongodb-on-ubuntu/
https://kennejs.com/entry/mongodb-compass-howtouse#documentfindfilter