I2Cアドレス、レジスターの値、読み取り可能レジスタ数からセンサーを識別します。
今回使用するセンサーは以下の4つの温度センサーです。
- BMP-280
 - BME-280
 - ADT-7410
 - S-5851A
 
これらのセンサーのI2Cアドレス、レジスターの値、読み取り可能レジスタ数を比較すると、以下の様な特徴がありました。
| センサー名 | I2Cアドレス | IDレジスタアドレス | ID | 読み取り可能レジスタ数 | ID取得時の右ビットシフト数 | 
|---|---|---|---|---|---|
| BMP-280 | 118 | 208 | 88 | 256 | 0 | 
| BME-280 | 118 | 208 | 96 | 256 | 0 | 
| ADT-7410 | 72 | 11 | 25 | 256 | 3 | 
| S-5851A | 72 | NULL | NULL | 2 | 0 | 
IDレジスタのアドレスと値は、各センサーのデータシートを参照して特定しました。ADT-7410はIDレジスタアドレスの値の内、4~8ビットがIDであるのでIDを取得するには、値を右に3ビットシフトする必要があります。
これをもとに、サーバーにデータベース(以降 DB)を作成します。作成する際、S-5851AのIDレジスタのアドレスは256、値は-1にします。レジスタアドレスは0~255までしか無いのでIDレジスタがない場合は、256をIDレジスタのアドレスとします。またその値を通常入ることがない値の-1に設定します。
DBの内容とクライアントから送られてきたI2Cアドレス、レジスターの値、読み取り可能レジスタ数と比較することでセンサーを識別したいと思います。
今回の実験環境
クライアントのマイクロコントローラにはESP32を使用します。ESP32に任意のセンサーを接続します。サーバーはESXiに仮想マシンを建てました。DBはその仮想マシン上にMariaDBを建てて作成しました。クライアントとサーバー間の通信にはFastAPIを使用しました。クライアント側はMicroPython、サーバー側はPythonで実装しました。
クライアント(ESP32)
from machine import Pin, I2C
import time
import urequests
import ujson
import utime
import os
def main():
    # I2Cバスを初期化 (I2Cのピンはデバイスにより異なります)
    i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000)  # sclとsdaピンは例
    # スキャンを実行
    I2C_devices = i2c.scan()
    I2C_device = hex(0)
    
    sensor_info = dict()
    file_info = dict()
    if(I2C_devices):
        for I2C_device in I2C_devices:
            print(hex(I2C_device))
    else:
        print("I2Cに接続しているセンサーが見つかりませんでした")
    
    # 実行
    sensor_info = scan_register(I2C_device, i2c)
    sensor_name = send_info(sensor_info)
    print(sensor_name["name"])
    
def scan_register(I2C_addr, i2c):
    
    r_reg_count = 0
    values = []
    sensor_info = dict()
    
    print("IDレジスタを探索中...")
    for reg in range(0x00, 0xFF+1):
        try:
            # 各レジスタアドレスから1バイト読み取る
            data = i2c.readfrom_mem(I2C_addr, reg, 1)
            values.append(data[0])
            
            r_reg_count += 1
            if(reg == 0xD0):
                print(f"0xD0: {hex(values[reg])}")
            time.sleep(0.01)  # 少しウェイトを入れてI2Cを安定させる
        except Exception as e:
            # 読み取りに失敗した場合は-1を入れる
            values.append(-1)
    
    values.append(-1)
    
    print(f"I2Cアドレス{I2C_addr}, レジスタの値{values}, 読み取り可能レジスタ数{r_reg_count}")
    sensor_info = {"I2C_addr": I2C_addr, "register_num": values, "readable_register_num": r_reg_count}
    return sensor_info
    
        
def send_info(sensor_info):
    
    SERVER_URL = "http://192.168.100.197:8000/api/sensor-info"
    result = dict()
    
    try:
        
        headers = {"Content-Type": "application/json"}
        # POSTリクエスト送信
        response = urequests.post(SERVER_URL, headers=headers, data=ujson.dumps(sensor_info))
        # レスポンス確認
        if response.status_code == 200:
            result = ujson.loads(response.text)
            print(result["name"]) 
        else:
            print("Failed to download the file. Status code:", response.status_code)
        response.close()
        return result
    
    except Exception as e:
        print("Failed to send data:", e)
     
if __name__ == "__main__":
    main()
サーバー(仮想マシン)
from fastapi import FastAPI
from fastapi.responses import FileResponse
import os
from pydantic import BaseModel
import pymysql.cursors
# JSONデータのスキーマ定義
class SensorInfo(BaseModel):
    I2C_addr: int
    register_num: list
    readable_register_num: int
app = FastAPI()
@app.post("/api/sensor-info")
async def receive_sensor_info(info: SensorInfo):
    # データを確認・処理
    connection = pymysql.connect(host='localhost',
                             user='root',
                             password='任意のパスワード',
                             database='任意のデータベース名',
                             cursorclass=pymysql.cursors.DictCursor)
    with connection:
        with connection.cursor() as cursor:
            sql="SELECT name, file_dir FROM sensor_info WHERE I2C=%s AND id_register_addr=%s AND id=%s AND readable_register_num=%s AND id_bit_shift=%s"
            for i in range(0x101):
                for j in range(4):
                    cursor.execute(sql, (info.I2C_addr, i, info.register_num[i] >> j, info.readable_register_num, j))
                    search_result = cursor.fetchone()
                    if(search_result != None):
                        print({"name": search_result["name"]})
                        return {"name": search_result["name"]}
    return {"message": "Sensor not found"}
    
実行手順
クライアントにget_sensor_info.pyを記述し、サーバーにmain.pyを記述します。get_seosnor_info.pyのSERVER_URLと、main.pyのpassword、databaseには適切な値に設定してください。サーバー側でmain.pyを実行してから、クライアントでget_sensor_info.pyを実行してください。
サーバー側は実行する場合以下のコマンドをmain.pyがあるディレクトリで実行します。実行すると以下のような表示になります。
uvicorn main:app --host 0.0.0.0 --port 8000
実行結果
クライアントの実行結果
サーバーの実行結果
参考にしたサイト








