はじめに
今回はESP32 MicroPython で、加速度センサーデータをソケット通信で送ってみたいと思います。
ソケット通信については以下の記事を参考にしてください
電話には電話番号がついてます。そして、電話をかけるときは、この電話番号 でどの電話機にかけるかを特定します。インターネットには数多くのコンピュー タが接続されています。インターネット上でコンピュータ同士が通信をすると きは、どのコンピュータ(またはグループ)と通信をしたいのかを特定しなけ ればいけません。インターネット上の電話番号に相当するのは、IPアドレスと ポート番号と呼ばれるものです。
引用元: http://research.nii.ac.jp/~ichiro/syspro98/socket.html
ソケット通信には引用文に記述されている通り、電話をかける側が必要です。それと同時に電話をかけられる側が必要です。今回は電話をかける側をステーション(ST)、電話をかけられる側をアクセスポイント(AP)とし、STがデータを送る側、APがデータを受け取る側とします。
目的
今回私は、二台のESP32に搭載されていた加速度センサーデータ取得し、それを同時に処理するためにはどうしたらよいか考えました。考えた結果、ソケット通信を用いて片方の加速度センサーデータをもう片方に送り、送り先で計算させるという結論に至りました。
準備するもの
まず、今回ソケット通信をするに辺りAPとSTの二つのESP32が必要です。
続いて、MicroPythonのバージョンはv1.21.0です。
そして、使う加速度センサーはLIS3DHとします。
プログラム
ST側からAP側に加速度センサーデータを送ります。プログラム起動にはデータを受け取る側つまりAP側のプログラムから実行させ、次にデータを送る側つまりST側のプログラムを実行させます。
続いて以下でAP側とST側の詳しい説明をします。
AP側
まずはコードの全文です。
import socket
import network
import time
import re
import ure
import _thread
import time
import utime
from machine import Pin, SoftI2C, I2C, PWM
#I2C設定
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
#アドレス設定
address = 0x18
#LIS3DH設定
i2c.writeto_mem(address, 0x20, bytes([0x57]))
#プログラムの経過時間の記録用
sec = 0
# アクセスポイント(AP)モードの設定
ap_ssid = #各々で確認してください
ap_password = #各々で確認してください
ap = network.WLAN(network.AP_IF)
ap.active(True)
ap.config(essid=ap_ssid, password=ap_password)
print(ap)
# APのIPアドレスを取得
ap_ip = ap.ifconfig()[0]
# 受信用のソケットの設定
s = socket.socket()
port = 80
s.bind((ap_ip, port))
s.listen(1)
#接続確認
print("APモード: 接続待機中", ap_ip)
while True:
client, addr = s.accept()
print("Connected by", addr)
while True:
try:
#データの受け取り
data = client.recvfrom(1024)
#受けったデータの出力
print(data)
#数字だけ取り出す
ST_num = str(data[0])
#データ読み込み
xl = i2c.readfrom_mem(address, 0x28, 1)[0]
xh = i2c.readfrom_mem(address, 0x29, 1)[0]
yl = i2c.readfrom_mem(address, 0x2A, 1)[0]
yh = i2c.readfrom_mem(address, 0x2B, 1)[0]
zl = i2c.readfrom_mem(address, 0x2C, 1)[0]
zh = i2c.readfrom_mem(address, 0x2D, 1)[0]
#データ変換
out_x = (xh << 8 | xl) >> 4
out_y = (yh << 8 | yl) >> 4
out_z = (zh << 8 | zl) >> 4
#極性判断
if out_x >= 2048:
out_x -= 4096
if out_y >= 2048:
out_y -= 4096
if out_z >= 2048:
out_z -= 4096
#AP側だと分かりやすいのようにする
AP_num = out_z
#処理部分
if AP_num > ST_num:
print(APの方が高い)
else
print(STの方が高い)
#物理量(加速度)に変換
out_x = out_x / 1024
out_y = out_y / 1024
out_z = out_z / 1024
except Exception as e:
print("Error:", e)
# ソケットを閉じる
client.close()
s.close()
プログラムの解説
加速度センサー側の解説
import time
import utime
from machine import Pin, SoftI2C, I2C
#I2C設定
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
#アドレス設定
address = 0x18
#LIS3DH設定
i2c.writeto_mem(address, 0x20, bytes([0x57]))
while True:
#データ読み込み
xl = i2c.readfrom_mem(address, 0x28, 1)[0]
xh = i2c.readfrom_mem(address, 0x29, 1)[0]
yl = i2c.readfrom_mem(address, 0x2A, 1)[0]
yh = i2c.readfrom_mem(address, 0x2B, 1)[0]
zl = i2c.readfrom_mem(address, 0x2C, 1)[0]
zh = i2c.readfrom_mem(address, 0x2D, 1)[0]
#データ変換
out_x = (xh << 8 | xl) >> 4
out_y = (yh << 8 | yl) >> 4
out_z = (zh << 8 | zl) >> 4
#極性判断
if out_x >= 2048:
out_x -= 4096
if out_y >= 2048:
out_y -= 4096
if out_z >= 2048:
out_z -= 4096
#物理量(加速度)に変換
out_x = out_x / 1024
out_y = out_y / 1024
out_z = out_z / 1024
加速度センサーで必要なコードです。
順番に解説していきます。
#I2C設定
i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
はじめにI2Cの設定です。
まず、I2Cとは、、、
I2C は、デバイス間通信のための2線式プロトコルです。物理レベルでは、クロックラインである SCL とデータラインである SDA の2本のワイヤで構成されています。
引用元: https://micropython-docs-ja.readthedocs.io/ja/latest/library/machine.I2C.html
ということです。
ということなので、クロックラインのSCLとデータラインのSDAの設定をしなくてはいけません。
今回私が用いたESP32は、SCLがピンの22、SDAがピンの21でした。そのため、そこに合わせてあげます。
freqは400kHzにします。これは引用元に習っています。
#LIS3DH設定
address = 0x18
i2c.writeto_mem(address, 0x20, bytes([0x57]))
続いて、LIS3DHの設定です。
ここでは、加速度センサーから得られたデータの書き込む設定をしています。
addressの0x18は、一番下のbitはRead/Write用の0x30, 0x31を右に1つシフトすると、Read/Writeのbit無しの0x18になるので、今回はそれを活用します。詳しくは、下記の引用元をご参照ください。
ソケット通信側の解説
# アクセスポイント(AP)モードの設定
ap_ssid = "" #適切なSSIDの設定
ap_password = "" #適切なパスワードの設定
ここでは、AP側のSSIDとパスワードの設定をしています。
ap = network.WLAN(network.AP_IF)
ap.active(True)
ap.config(essid=ap_ssid, password=ap_password)
ここでは、Wi-Fi接続するための設定を行っています。
apでWi-Fiインターフェースのオブジェクトのインスタンスを作成、ap.active(True)でインターフェースを有効にし、ap.configで接続が出来ているか確認します。
# APのIPアドレスを取得
ap_ip = ap.ifconfig()[0]
ここでは、ソケット通信のためにAPのIPアドレスを取得しています。
# 受信用のソケットの設定
s = socket.socket()
port = 80
s.bind((ap_ip, port))
ここでは、受信用のソケットの設定をしています。
AP側のIPアドレスとポートをアドレスとしてs.bindでバインドします。
#接続確認
s.listen(1)
print("APモード: 接続待機中", ap_ip)
s.listen(1)でサーバーが接続を受け付けるようにします。
while True:
client, addr = s.accept()
print("Connected by", addr)
サーバーとの接続が確立された後、ST側の接続を受け入れます。
while True:
#データの受け取り
data = client.recvfrom(1024)
#受けったデータの出力
print(data)
ここでは、ST側から送られてきたデータを受け取っています。
さらに確認のため、出力しています。
#数字だけ取り出す
ST_data = str(data[0])
ST_num = ST_data.replace("b'", "").replace("'", "")
出力すると(b'センサーデータ',アドレス)という感じに来ていたので、先頭のセンサーデータだけを取り出します。取り出した後、数字として使いたいためreplaceを用いて数字だけを取り出します。
AP_num = out_z
#処理部分
if AP_num > ST_num:
print(APの方が高い)
else
print(STの方が高い)
ここでは、AP側のセンサーデータとST側のセンサーデータの処理をしている箇所です。今回分かりやすいようにout_z(地上からの高さの値)同士を比較しています。
# ソケットを閉じる
client.close()
s.close()
最後にソケット閉じて終了です。
ST側
まずはコードの全文です。
import time
import utime
import socket
import network
from machine import Pin, SoftI2C, I2C
# Set up Station (ST) mode
sta_ssid = "" #適切なSSIDの設定
sta_password = "" #適切なパスワードの設定
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect(sta_ssid, sta_password)
while not sta.isconnected():
pass
print("ST Mode: Connected to WiFi")
# Get the ST IP address
sta_ip = sta.ifconfig()[0]
ap_ip = "" # Replace with the AP's IP address
port = 80
# Set up socket for connecting to the AP
s = socket.socket()
# APに接続
s.connect(socket.getaddrinfo(ap_ip, port)[0][-1])
print(socket.getaddrinfo(ap_ip, port)[0][-1])
#I2C設定
#i2c = I2C(scl=Pin(22), sda=Pin(21), freq=400000)
i2c = SoftI2C(scl=Pin(22), sda=Pin(21), freq=400000)
address = 0x18
#address = 0x1A
#LIS3DH設定
i2c.writeto_mem(address, 0x20, bytes([0x57]))
#i2c.writeto_mem(address, 0x23, bytes([0x08]))
sec = 0
while True:
try:
# 時間
time_data = utime.ticks_ms() / 1000
#データ読み込み
xl = i2c.readfrom_mem(address, 0x28, 1)[0]
xh = i2c.readfrom_mem(address, 0x29, 1)[0]
yl = i2c.readfrom_mem(address, 0x2A, 1)[0]
yh = i2c.readfrom_mem(address, 0x2B, 1)[0]
zl = i2c.readfrom_mem(address, 0x2C, 1)[0]
zh = i2c.readfrom_mem(address, 0x2D, 1)[0]
# データ変換
out_x = (xh << 8 | xl) >> 4
out_y = (yh << 8 | yl) >> 4
out_z = (zh << 8 | zl) >> 4
#極性判断
if out_x >= 2048:
out_x -= 4096
if out_y >= 2048:
out_y -= 4096
if out_z >= 2048:
out_z -= 4096
#物理量(加速度)に変換
out_x = out_x / 1024
out_y = out_y / 1024
out_z = out_z / 1024
print("s: "+str(sec)+" time: "+str(time_data)+' Z: ' + str(out_z))
send_data = round(out_z, 6)
s.send(str(send_data))
sec += 0.3
# 一時停止
time.sleep(0.3)
#60秒間測定
if sec >= 60.0:
print("STモードを終了します")
s.close()
break
except Exception as e:
print("Error:", e)
プログラムの解説
加速度センサーのプログラムは前述したので省略させていただきます。
ソケット通信側の解説
sta_ssid = "" #適切なSSIDの設定
sta_password = "" #適切なパスワードの設定
ここでは、ソケット通信に必要なSSIDとパスワードを設定します。これはAP側に合わせてください。
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect(sta_ssid, sta_password)
while not sta.isconnected():
pass
ここでは、Wi-Fi接続の確認を行っています。無事に繋がると"ST Mode: Connected to WiFi"と表示されます。
# Get the ST IP address
sta_ip = sta.ifconfig()[0]
ap_ip = "" # Replace with the AP's IP address
port = 80
ここでは、ソケット通信に必要なものを設定しています。AP側のIPアドレスとポートをここでは、設定してください。
# Set up socket for connecting to the AP
s = socket.socket()
# APに接続
s.connect(socket.getaddrinfo(ap_ip, port)[0][-1])
ここでは、ソケット通信でAP側と接続をしています。
send_data = round(out_z)
s.send(str(send_data))
ここでは、加速度センサーデータを実際に送っています。送るデータはstr側でキャストして送っています。
おわりに
今回は、加速度センサーデータをソケット通信で送る方法について解説しました。しかし、加速度センサー側のエラーが多発して開発が難しく現状try文で無理やり繋げています。次回はそのエラーを直していきたいです。