0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

I2Cアドレス、レジスターの値、読み取り可能レジスタ数からセンサーを識別する

Last updated at Posted at 2024-12-18

I2Cアドレス、レジスターの値、読み取り可能レジスタ数からセンサーを識別します。

今回使用するセンサーは以下の4つの温度センサーです。

  • BMP-280
  • BME-280
  • ADT-7410
  • S-5851A

これらのセンサーのI2Cアドレス、レジスターの値、読み取り可能レジスタ数を比較すると、以下の様な特徴がありました。

センサー名 I2Cアドレス IDレジスタアドレス ID 読み取り可能レジスタ数 ID取得時の右ビットシフト数
BMP-280 118 208 88 256 0
BME-280 118 208 96 256 0
ADT-7410 72 11 25 256 3
S-5851A 72 NULL NULL 2 0

IDレジスタのアドレスと値は、各センサーのデータシートを参照して特定しました。ADT-7410はIDレジスタアドレスの値の内、4~8ビットがIDであるのでIDを取得するには、値を右に3ビットシフトする必要があります。
これをもとに、サーバーにデータベース(以降 DB)を作成します。作成する際、S-5851AのIDレジスタのアドレスは256、値は-1にします。レジスタアドレスは0~255までしか無いのでIDレジスタがない場合は、256をIDレジスタのアドレスとします。またその値を通常入ることがない値の-1に設定します。
DBの内容とクライアントから送られてきたI2Cアドレス、レジスターの値、読み取り可能レジスタ数と比較することでセンサーを識別したいと思います。

今回の実験環境

クライアントのマイクロコントローラにはESP32を使用します。ESP32に任意のセンサーを接続します。サーバーはESXiに仮想マシンを建てました。DBはその仮想マシン上にMariaDBを建てて作成しました。クライアントとサーバー間の通信にはFastAPIを使用しました。クライアント側はMicroPython、サーバー側はPythonで実装しました。

クライアント(ESP32)

get_sensor_info.py
from machine import Pin, I2C
import time
import urequests
import ujson
import utime
import os


def main():
    # I2Cバスを初期化 (I2Cのピンはデバイスにより異なります)
    i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400000)  # sclとsdaピンは例
    # スキャンを実行
    I2C_devices = i2c.scan()
    I2C_device = hex(0)
    
    sensor_info = dict()
    file_info = dict()

    if(I2C_devices):
        for I2C_device in I2C_devices:
            print(hex(I2C_device))
    else:
        print("I2Cに接続しているセンサーが見つかりませんでした")
    
    # 実行
    sensor_info = scan_register(I2C_device, i2c)
    sensor_name = send_info(sensor_info)
    print(sensor_name["name"])
    

def scan_register(I2C_addr, i2c):
    
    r_reg_count = 0
    values = []
    sensor_info = dict()
    
    print("IDレジスタを探索中...")
    for reg in range(0x00, 0xFF+1):
        try:
            # 各レジスタアドレスから1バイト読み取る
            data = i2c.readfrom_mem(I2C_addr, reg, 1)
            values.append(data[0])
            
            r_reg_count += 1
            if(reg == 0xD0):
                print(f"0xD0: {hex(values[reg])}")
            time.sleep(0.01)  # 少しウェイトを入れてI2Cを安定させる
        except Exception as e:
            # 読み取りに失敗した場合は-1を入れる
            values.append(-1)
    
    values.append(-1)
    
    print(f"I2Cアドレス{I2C_addr}, レジスタの値{values}, 読み取り可能レジスタ数{r_reg_count}")
    sensor_info = {"I2C_addr": I2C_addr, "register_num": values, "readable_register_num": r_reg_count}
    return sensor_info
    
        
def send_info(sensor_info):
    
    SERVER_URL = "http://192.168.100.197:8000/api/sensor-info"
    result = dict()
    
    try:
        
        headers = {"Content-Type": "application/json"}
        # POSTリクエスト送信
        response = urequests.post(SERVER_URL, headers=headers, data=ujson.dumps(sensor_info))
        # レスポンス確認
        if response.status_code == 200:
            result = ujson.loads(response.text)
            print(result["name"]) 
        else:
            print("Failed to download the file. Status code:", response.status_code)

        response.close()
        return result
    
    except Exception as e:
        print("Failed to send data:", e)

     
if __name__ == "__main__":
    main()

サーバー(仮想マシン)

main.py
from fastapi import FastAPI
from fastapi.responses import FileResponse
import os
from pydantic import BaseModel
import pymysql.cursors

# JSONデータのスキーマ定義
class SensorInfo(BaseModel):
    I2C_addr: int
    register_num: list
    readable_register_num: int


app = FastAPI()

@app.post("/api/sensor-info")
async def receive_sensor_info(info: SensorInfo):
    # データを確認・処理

    connection = pymysql.connect(host='localhost',
                             user='root',
                             password='任意のパスワード',
                             database='任意のデータベース名',
                             cursorclass=pymysql.cursors.DictCursor)

    with connection:
        with connection.cursor() as cursor:
            sql="SELECT name, file_dir FROM sensor_info WHERE I2C=%s AND id_register_addr=%s AND id=%s AND readable_register_num=%s AND id_bit_shift=%s"
            for i in range(0x101):
                for j in range(4):
                    cursor.execute(sql, (info.I2C_addr, i, info.register_num[i] >> j, info.readable_register_num, j))
                    search_result = cursor.fetchone()
                    if(search_result != None):
                        print({"name": search_result["name"]})
                        return {"name": search_result["name"]}
    return {"message": "Sensor not found"}
    

実行手順

クライアントにget_sensor_info.pyを記述し、サーバーにmain.pyを記述します。get_seosnor_info.pyのSERVER_URLと、main.pyのpassword、databaseには適切な値に設定してください。サーバー側でmain.pyを実行してから、クライアントでget_sensor_info.pyを実行してください。
サーバー側は実行する場合以下のコマンドをmain.pyがあるディレクトリで実行します。実行すると以下のような表示になります。

uvicorn main:app --host 0.0.0.0 --port 8000

スクリーンショット 2024-12-18 154531.png

実行結果

クライアントの実行結果

BMP-280の場合
BMP-280 C.png

BME-280の場合
BME-280 C.png

ADT-7410の場合
ADT-7410 C.png

S-5851Aの場合
S-5851A C.png

サーバーの実行結果

BMP-280の場合
BMP-280 S 2024-12-18 162319.png

BME-280の場合
BME-280 S 2024-12-18 162242.png

ADT-7410の場合
ADT7410 S 2024-12-18 162850.png

S-5851Aの場合
S5851A S 2024-12-18 162701.png

参考にしたサイト

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?