I2Cアドレス、レジスターの値、読み取り可能レジスタ数からセンサーを識別します。
今回使用するセンサーは以下の4つの温度センサーです。
- BMP-280
- BME-280
- ADT-7410
- S-5851A
これらのセンサーのI2Cアドレス、レジスターの値、読み取り可能レジスタ数を比較すると、以下の様な特徴がありました。
センサー名 | I2Cアドレス | IDレジスタアドレス | ID | 読み取り可能レジスタ数 | ID取得時の右ビットシフト数 |
---|---|---|---|---|---|
BMP-280 | 118 | 208 | 88 | 256 | 0 |
BME-280 | 118 | 208 | 96 | 256 | 0 |
ADT-7410 | 72 | 11 | 25 | 256 | 3 |
S-5851A | 72 | NULL | NULL | 2 | 0 |
IDレジスタのアドレスと値は、各センサーのデータシートを参照して特定しました。ADT-7410はIDレジスタアドレスの値の内、4~8ビットがIDであるのでIDを取得するには、値を右に3ビットシフトする必要があります。
これをもとに、サーバーにデータベース(以降 DB)を作成します。作成する際、S-5851AのIDレジスタのアドレスは256、値は-1にします。レジスタアドレスは0~255までしか無いのでIDレジスタがない場合は、256をIDレジスタのアドレスとします。またその値を通常入ることがない値の-1に設定します。
DBの内容とクライアントから送られてきたI2Cアドレス、レジスターの値、読み取り可能レジスタ数と比較することでセンサーを識別したいと思います。
今回の実験環境
クライアントのマイクロコントローラにはESP32を使用します。ESP32に任意のセンサーを接続します。サーバーはESXiに仮想マシンを建てました。DBはその仮想マシン上にMariaDBを建てて作成しました。クライアントとサーバー間の通信にはFastAPIを使用しました。クライアント側はMicroPython、サーバー側はPythonで実装しました。
クライアント(ESP32)
from machine import Pin, I2C
import time
import urequests
import ujson
import utime
import os
def main():
# I2Cバスを初期化 (I2Cのピンはデバイスにより異なります)
i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000) # sclとsdaピンは例
# スキャンを実行
I2C_devices = i2c.scan()
I2C_device = hex(0)
sensor_info = dict()
file_info = dict()
if(I2C_devices):
for I2C_device in I2C_devices:
print(hex(I2C_device))
else:
print("I2Cに接続しているセンサーが見つかりませんでした")
# 実行
sensor_info = scan_register(I2C_device, i2c)
sensor_name = send_info(sensor_info)
print(sensor_name["name"])
def scan_register(I2C_addr, i2c):
r_reg_count = 0
values = []
sensor_info = dict()
print("IDレジスタを探索中...")
for reg in range(0x00, 0xFF+1):
try:
# 各レジスタアドレスから1バイト読み取る
data = i2c.readfrom_mem(I2C_addr, reg, 1)
values.append(data[0])
r_reg_count += 1
if(reg == 0xD0):
print(f"0xD0: {hex(values[reg])}")
time.sleep(0.01) # 少しウェイトを入れてI2Cを安定させる
except Exception as e:
# 読み取りに失敗した場合は-1を入れる
values.append(-1)
values.append(-1)
print(f"I2Cアドレス{I2C_addr}, レジスタの値{values}, 読み取り可能レジスタ数{r_reg_count}")
sensor_info = {"I2C_addr": I2C_addr, "register_num": values, "readable_register_num": r_reg_count}
return sensor_info
def send_info(sensor_info):
SERVER_URL = "http://192.168.100.197:8000/api/sensor-info"
result = dict()
try:
headers = {"Content-Type": "application/json"}
# POSTリクエスト送信
response = urequests.post(SERVER_URL, headers=headers, data=ujson.dumps(sensor_info))
# レスポンス確認
if response.status_code == 200:
result = ujson.loads(response.text)
print(result["name"])
else:
print("Failed to download the file. Status code:", response.status_code)
response.close()
return result
except Exception as e:
print("Failed to send data:", e)
if __name__ == "__main__":
main()
サーバー(仮想マシン)
from fastapi import FastAPI
from fastapi.responses import FileResponse
import os
from pydantic import BaseModel
import pymysql.cursors
# JSONデータのスキーマ定義
class SensorInfo(BaseModel):
I2C_addr: int
register_num: list
readable_register_num: int
app = FastAPI()
@app.post("/api/sensor-info")
async def receive_sensor_info(info: SensorInfo):
# データを確認・処理
connection = pymysql.connect(host='localhost',
user='root',
password='任意のパスワード',
database='任意のデータベース名',
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
sql="SELECT name, file_dir FROM sensor_info WHERE I2C=%s AND id_register_addr=%s AND id=%s AND readable_register_num=%s AND id_bit_shift=%s"
for i in range(0x101):
for j in range(4):
cursor.execute(sql, (info.I2C_addr, i, info.register_num[i] >> j, info.readable_register_num, j))
search_result = cursor.fetchone()
if(search_result != None):
print({"name": search_result["name"]})
return {"name": search_result["name"]}
return {"message": "Sensor not found"}
実行手順
クライアントにget_sensor_info.pyを記述し、サーバーにmain.pyを記述します。get_seosnor_info.pyのSERVER_URLと、main.pyのpassword、databaseには適切な値に設定してください。サーバー側でmain.pyを実行してから、クライアントでget_sensor_info.pyを実行してください。
サーバー側は実行する場合以下のコマンドをmain.pyがあるディレクトリで実行します。実行すると以下のような表示になります。
uvicorn main:app --host 0.0.0.0 --port 8000
実行結果
クライアントの実行結果
サーバーの実行結果
参考にしたサイト