8
7

Raspberry Pi Pico WにおけるセンサとWeb UIの非同期I/O処理 【Python】

Last updated at Posted at 2023-05-21

ちょっと調べたところ日本語のまとめが見当たらなかったのでいつものように自分用にメモ。
記事タイトルをどう表記するのかが難しい、これで後で自分で意味が理解できるのかちょっと不安。

結論

Raspberry Pi Pico Wでsocketを使ってhttpサーバを立ち上げるのは比較的簡単だがセンサと同時に(=非同期で)処理するのが難しい。だったらsocketの代わりにお手軽にasyncio1 (CPython)を使えばいいじゃん。

説明「しない」内容

  • Raspberry Pi Pico (W)の基本情報・基本的な使い方
  • Python開発環境のセットアップ方法
  • Raspberry Pi Pico (W)へのプログラム転送方法
  • Pythonの一般知識
  • HTMLの一般知識

(どうでもいい)経緯

自宅のトイレがね、空かないんすよ。いや開かない、じゃなくて、空かない。ドアが壊れてる訳じゃないです。誰とは言いませんが家族の「使用中」が長いんです。仕方なく順番待ちのQueueに並ぶんですが、トイレの前でずっとQueue待ちする訳にもいかず、自室にいてトイレが空いたことを知りたい、と。特急電車なんかによくあるトイレ使用灯、みたいなイメージ。
つまり、トイレの照明が消えたらそのことを教えて欲しい、と。出来ればPushで。随時使用状態も自室から知れたら(Pull)いいな、と。じゃ何か出来ないか考えよう、となった訳です。
何か作る場合、普段はAruduinoが安いのでそっちを使うことが多いのですが、いざネットワークを使おうとするといろいろ面倒な気がするので、ワンパッケージなRaspberry Pi Pico W(長いので以下RasPicoW)が多少高くても便利かな、と。

主旨

何をやりたいのかというと、

  • センサーの値を定期的に読み込む(=監視、今回は照度、つまりトイレ照明のON/OFFの監視 2)
  • 読み込んだデータをWebページに表示(ローカル無線LAN経由)
  • 規定値を超えたら何がしかpushで通知
  • 言語はお手軽に(Micro)Pythonで

問題点というかポイント

当然リモートでのUIはお手軽にブラウザを使いたいのでRasPicoWでhttpサーバを動かすのですが、通信に普通にMicroPythonのsocket(CPython)を直接使うと、httpサーバがクライアント(ブラウザ)待ち受け中は処理を掴みっぱなしにして、つまり止まってしまって他の処理が出来なくなってしまう様です。
これのなにがマズいかというと、自分でWebページを見に行った(=サーバが待ち受け状態から次へ処理が進んだ)時じゃないとセンサの値を読みに行けない。ブラウザを開いているあいだは自動更新して処理を回すとしても、ブラウザを閉じてしまうとRasPicoWがhttpサーバの待ち受け状態で止まってしまい、通知が欲しい時に送られてこない。
RasPicoWのRP2040はマルチコアでマルチスレッドにも対応してるはずなのですが、現時点でMicroPythonはまだThreadingには対応しておらず、低レベルAPIな_threadしか使えない上に、これでhttpサーバ用とPush通知用に2つsocketを使おうとするとRasPicoWではリソース不足が発生するようです。このあたりまだファームウエアにbrush-upの余地があるのか完全なH/W実力不足なのか不明。(時価1200円だしね)
で、これを出来るだけ簡単に解決したい、と。

まずは簡単なH/Wの話

センサは秋月の照度センサモジュール、TSL25721(時価500円)を使います3。ふつーに3.3V電源+-とSDA・SCLをつなげばOK。モジュール側の+電源は3v3とVIN、2か所接続する必要があります。秋月のページにあるサンプルプログラム(ZIP)も見ておくといいでしょう。

ケースはタカチのSW-85Sとユニバーサル基板TNF53-79。適当にピンソケット(モジュール側はL型のやつ)を立てました。
電源はブックオフで適当なマイクロUSBコネクタの携帯充電器をゲット。未使用290円でした。コードをカットして+5VをRasPicoWのVSYSに接続。
ケースは現物合わせで穴を空けておきます。これでトイレのコンセントに直接ぶっ刺せばOK。
これだけなので総費用は2500円ぐらい?

本題のMicroPython

  • ローカルネットワークへの無線LAN接続の説明はほかにもいっぱい情報があるので説明省略。ウチの環境ではスタティックIPなところぐらいが注意点です。ふつーのDHCPの場合は冒頭のIPアドレスの指定をブランクに。ついでにRasPicoWのmacアドレスの調べ方も自分が忘れる自信があるのでコメントに入れておきました。

重要な警告
この例はWifiのパスワードを平文で保存していて、H/W自体は簡単にコンセントから抜いて持ち去ることができるので、その点セキュリティ的にガバガバなのを承知の上で、問題ないと思われる場合(来客の来ない自宅など)だけ使用する等の覚悟が必要です。

  • Push通知はLine notifyを使いますが、この部分はコチラの丸パクリなので(ありがとうございます)アクセスキーの取得方法などもそちらを参照してください。同じくここからtiny_line.pyをコピペしてファイルをRasPicoWに保存します。

  • センサーの処理部分は秋月のZIP内にあったTSL2572.py、ほぼそのままです。定数の定義は自分には逆に読みづらいのでパスしました。どうせあとではもう弄らないし。

def init_TSL25721():	# returns success:1, fail:-1
    if (get_reg(0x12) != 0x34) :
        return -1
    set_reg(0x80|0x20|0x0F, 0x00)
    set_reg(0x80|0x20|0x0D, 0x00)
    set_reg(0x80|0x20|0x01, 0xC0)
    set_reg(0x80|0x20|0x00, 0x02|0x01)
    return 1

def get_reg(reg) :	# retruns byte data
    :
    :

本題のMicroPithonの本題

  • httpサーバを直接socketを使わずに、asyncioを使って書きます。他の部分も非同期用に書き直します。
  • まずは例によって最後の部分から。ふつーWhile True:ループで回すメイン部分を非同期IO処理用として丸ごとmain()関数にします。非同期を示すasyncdefの前に入れるのを忘れずに。
#
# main
#
async def main():
    :
    :
  • main()内、ローカルの無線LAN接続までは普通に実行しますが、httpサーバ部分は非同期で動かすタスクの中に専用の組み込み関数asyncio.start_server()を登録して
asyncio.create_task(asyncio.start_server(http_server, my_ip, listen_port))

とします。サーバーの本体はコールバック関数であるhttp_server(この名前は何でも良いがプログラム前半に書く関数本体の名前に合わせる)です。 ここで自アドレスと待ち受けポートを指定します。

  • http_server()本体側ではdefの前にasyncをつけた上で引数にreaderwriterのstreamオブジェクトを指定します。
async def http_server(reader, writer):

これがsocketでいうところのrecv()とsend()の役割になります。(実はsocketのラッパーAPIになってるのか?) httpリスエストの受け取り時にもawaitを追加。

 _request_line = await reader.readline()

表示用のWebページの基本デザインもこちらからいただきましたが、送信用のHTMLヘッダーに

<meta http-equiv="refresh" content="2; url=http://%(ref_url)s">

を入れて自分自身を2秒ごとに更新するようにしてあります。

  • 照度のデータは最新のデータだけが必要なのでQueueにはせず、グローバル変数lux_valueで受け渡し。特に競合対策のLockはしていませんが、httpサーバ側は読むだけだし、壊れてもすぐセンサに更新されてしまうのでこれで多分実使用上は問題ないはず。

writerはStreamオブジェクトなので最後にdrain()するのを忘れずに。
時間がかかりそうなdrain()wait_closed()awaitを付加。その他HTML表示用のチマチマした処理の説明省略。ちなみにNotifyのオン・オフ状態の保存にはデバッグ時に人間が見た時の解りやすさの観点からRasPicoW上のLEDを使っています。

 await writer.drain()		# Flash data
 await writer.wait_closed()
 print('Connection closed')
  • 話はmain()に戻って、get_adc()がhttpサーバーと非同期で(=疑似並列で)動くセンサー読み取り関数で、中はwhile True:になっていますので戻っては来ません。こちらも動作中httpサーバ側を待たせるためawaitを付けておきます。
await get_adc()

この関数の中で、「Notify ON/OFF」ボタンがON(赤)の時に照明の照度がThreshold以下になったらLine notifyを発火します。無限ループの中ですので発火タイミングはhttpサーバの状態に依存しません。

tl.notify('電気が消えました')

最後のところ、センサ読み取りサイクルの間のsleepには非同期専用のasyncio.sleep()を使います。

  • 本当の動作スタート部分はここ。これも専用のasyncio.run()main()を指定して使います。念のためtryに入れて例外発生時には新しいイベントループを作ります。
try:
    asyncio.run(main())

except OSError as e:
    print(f' >> ERROR: {e}')

finally:
    asyncio.new_event_loop()

最後に

とまぁポイントはこんなところです。これでブラウザを閉じてもLineの通知が来るようになりました。
現時点でWeb上の日本語のRaspberry Pi Pico W httpサーバの情報はsocketを直接使ったものが殆どだったので(u)asyncioを使ったケースとしてここに残しておきます。今回は照度センサーを使いましたが、それ以外のセンサーでも使えるのではないかと思います。

今日のところは以上です。

プログラム全体

main.py
import network
import socket
import time
from machine import Pin, I2C
import uasyncio as asyncio
from tiny_line import tiny_line
#import ubinascii   # for mac address visivility

ssid = 'your ssid'
password = 'your password'
ip_addr = '' #In case of static ip, set here.
listen_port = 80
access_token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
tl = tiny_line(access_token, debug = True)

led = Pin('LED', machine.Pin.OUT, value=0)
i2c = I2C(0, scl=Pin(1), sda=Pin(0), freq=100_000)
device_address = 0x39

threshold = 1.0
gain = 1.0
cpl = (2.73 * (256 - 0xC0) * gain) / (60.0)
lux_value = 0.0

def init_TSL25721():	# returns success:1, fail:-1
    if (get_reg(0x12) != 0x34) :
        return -1
    set_reg(0x80|0x20|0x0F, 0x00)
    set_reg(0x80|0x20|0x0D, 0x00)
    set_reg(0x80|0x20|0x01, 0xC0)
    set_reg(0x80|0x20|0x00, 0x02|0x01)
    return 1

def get_reg(reg) :	# retruns byte data
    _dat = i2c.readfrom_mem(device_address, 0x80|0x20|reg, 1)
    return _dat[0]

def set_reg(reg, dat) :	# returns void
    _buf = bytearray(1)
    _buf[0] = dat
    i2c.writeto_mem(device_address, reg, _buf)

def get_adc():	#returns void, set global lux_value
    global lux_value
    while True:
        _dat = i2c.readfrom_mem(device_address, 0x80|0x20|0x14, 4)
        _adc0 = (_dat[1] << 8) | _dat[0]
        _adc1 = (_dat[3] << 8) | _dat[2]
        _lux1 = (_adc0 * 1.00 - _adc1 * 1.87) / cpl
        _lux2 = (_adc0 * 0.63 - _adc1 * 1.00) / cpl
        if _lux1 <= 0 and _lux2 <= 0:
            lux_value = 0.0
        else:
            lux_value = _lux1 if _lux1 > _lux2 else _lux2
        
        if led.value() == 1 and lux_value < threshold:
            print('電気が消えました')
            tl.notify('電気が消えました')
            led.value(0)
        await asyncio.sleep(1.0)

def connect_wlan():	# returns my ip address
    #Connect to wlan
    _wlan = network.WLAN(network.STA_IF)
    _wlan.active(True)
    #mac = ubinascii.hexlify(wlan.config('mac'),':').decode()
    _wlan.config(pm = 0xa11140)  # Disable power-save mode
    _wlan.connect(ssid, password)
    if ip_addr != '':
        _wlan.ifconfig((ip_addr , 'xxx.xxx.xxx.xxx', 'yyy.yyy.yyy.yyy', 'zzz.zzz.zzz.zzz'))
    _max_wait = 10
    while _max_wait > 0:
        if _wlan.status() < 0 or _wlan.status() >= 3:
            break
        _max_wait -= 1
        print('Waiting for connection...')
        time.sleep(1.0)
    
    if _wlan.status() == 3:
        _my_ip = _wlan.ifconfig()[0]
        print(f'Connected on {_my_ip}...')
        return _my_ip
    else:
        raise RuntimeError('Network connection failed: ' + str(_wlan.status()))
        # CYW43_LINK_DOWN (0)
        # CYW43_LINK_JOIN (1)
        # CYW43_LINK_NOIP (2)
        # CYW43_LINK_UP (3)
        # CYW43_LINK_FAIL (-1)
        # CYW43_LINK_NONET (-2)
        # CYW43_LINK_BADAUTH (-3)

async def http_server(reader, writer):
    global lux_value, my_ip
    _led_state = 'off' if led.value() == 0 else 'on'
    
    _peer_ip = reader.get_extra_info('peername')[0]
    print(f"Client connected from {_peer_ip}")
    _request_line = await reader.readline()
    print('Request:')
    print(_request_line)
    # We are not interested in HTTP request headers, skip them
    while await reader.readline() != b'\r\n':
        pass
    _request = str(_request_line)
    (_led_state, _light_state) = set_html_param(_request)
    _button_color = 'Red' if _led_state == 'on' else 'Green'
    _button_value = 'off' if _led_state == 'on' else 'on'
    _response = html % {'ref_url': my_ip,					\
                        'button_color': _button_color,  	\
                        'button_label': _led_state.upper(),	\
                        'button_value': _button_value,  	\
                        'light_state': _light_state,    	\
                        'lux_value': lux_value,         	\
                        }
    writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
    writer.write(_response)
    
    await writer.drain()		# Flash data
    await writer.wait_closed()
    print(f'Connection from {_peer_ip} closed')

def set_html_param(request):	#returns _led_state, _light_state
    global lux_value
    _led_on_pos = request.find('led=on')
    _led_off_pos = request.find('led=off')
    print('led=on pos: ' + str(_led_on_pos))
    print('led=off pos: ' + str(_led_off_pos))

    if led.value() == 1:
        if _led_on_pos == 8 or _led_off_pos != 8:	# URI param starts from 9th charactor
            _led_state = 'on'
        else:
            led.value(0)
            _led_state = 'off'
    else:
        if _led_on_pos == 8 and lux_value > threshold:
            led.value(1)
            _led_state = 'on'
        else:
            _led_state = 'off'
    
    if lux_value < threshold:
        _light_state = 'は消えています'
    else:
        _light_state = 'はついています'

    return (_led_state, _light_state)

html = '''
<!DOCTYPE html><html>
<head><meta name="viewport" content="width=device-width, initial-scale=1">
<meta charset="UTF-8">
<meta http-equiv="refresh" content="2; url=http://%(ref_url)s">
<link rel="icon" href="data:,">
<style>html { font-family: Helvetica; display: inline-block; margin: 0px auto; text-align: center;}
.buttonGreen { background-color: #4CAF50; border: 2px solid #000000;; color: white; padding: 15px 32px; text-align: center; text-decoration: none; display: inline-block; font-size: 16px; margin: 4px 2px; cursor: pointer; }
.buttonRed { background-color: #D11D53; border: 2px solid #000000;; color: white; padding: 15px 32px; text-align: center; text-decoration: none; display: inline-block; font-size: 16px; margin: 4px 2px; cursor: pointer; }
.textdefault { font-size: 1.8em; }
text-decoration: none; font-size: 30px; margin: 2px; cursor: pointer;}
</style></head>
<body><center><h1>照明状態</h1></center><br><br>
<form><center>
<center> <button class="button%(button_color)s" name="led" value="%(button_value)s" type="submit">Notify %(button_label)s</button>
</form>
<br><br>
<br><br>
<p><span class="textdefault">電気%(light_state)s<br><br>照度: %(lux_value).1f Lux</span></p></body></html>
'''

#
# main
#
async def main():
    global my_ip, listen_port
    print('Initializing sensor...')
    if init_TSL25721() != 1:
        raise RuntimeError('Sensor initializing failed.')

    print('Wlan connection...')
    my_ip = connect_wlan()
    
    print(f'Listening on http server port {listen_port}...')
    asyncio.create_task(asyncio.start_server(http_server, my_ip, listen_port))
    
    await get_adc()

try:
    asyncio.run(main())

except OSError as e:
    print(f' >> ERROR: {e}')

finally:
    asyncio.new_event_loop()        
  1. (追記) MycroPython v1.21.0からCPython版との互換性向上によりモジュール名のuプリフィックスが不要になりました(uasyncioasyncio)。未確認ですが、後方互換性があるため、コード自体はv1.20以前のuasyncioのままでも問題ないはずです。

  2. 窓のないマンショントイレでよかった

  3. なぜか秋月のセンサ基板が販売終了してしまったので、スイッチサイエンスの基板(時価660円)などが使えるかもしれません。ただし制御部分のプログラムはサンプルを参考に自分で書く必要あります。

8
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7