asyncioで非同期処理
久しぶりのMicroPython投稿。秋月電子で購入した「RP2040マイコンボードキット」上でMicroPythonを使い、非同期処理フレームワークで有名どころのasyncioの基本を学ぶことにした。global利用など、相変わらず美しないところがあるのは勘弁。なお、「タスク」という用語を用いている。
やること(何が非同期か?)
- キーボード入力処理タスクとLED On/Offタスクとが非同期に動作
- 入力された文字によりLED On/Offの動作を変える
- 's':On/OffをStop
- 'r':On/OffをResume
- 'i':On/Offの間隔を100ms増やす
- 'd':On/Offの間隔を100ms減らす
- 'e':すべてのタスク終了(メイン含む)
- 両タスク間の排他処理は未考慮
ソースコード
いつものように、キーとなるところを記載。
importなど
import uasyncio as asyncio
from machine import Pin
import utime
from sys import stdin
from select import poll, POLLIN
- 各種import
- asyncio
- GPIO PIN(LED用)
- キーボード入力関連(stdin,select)
共通
cmd = 'n' # 's':stop 'r':resume 'e':end 'i':100ms increment 'd':100ms decrement
poll_obj = poll()
poll_obj.register(stdin, POLLIN)
- キーボードから入力されるコマンドの定義(後述)
- キーボード入力用変数
メイン
async def main():
task1 = asyncio.create_task(led_task())
task2 = asyncio.create_task(keyboard_task())
#await led_task() # run led_task() and wait until it ends
await task1 # wait until task1 ends
await task2 # wait until task2 ends
print('main End')
asyncio.run(main())
- キーボード入力処理タスク起動
- LED On/OfFタスク起動
- タスク終了待ち(await)
- 「await タスク」とすると、タスクを起動し、終了まで待つ
キーボード入力処理タスク
async def keyboard_task():
global cmd
count = 0
last_input = 'n'
while True:
if poll_obj.poll(10):
input_one = stdin.read(1)
if input_one == '\n': # '\n' comes twice.
if count == 0:
count += 1
else: # count == 1
cmd = last_input
#print(cmd) # does not print well
#print(cmd.encode())
count = 0
if cmd == 'e':
break
else:
last_input = input_one
else:
await asyncio.sleep_ms(10)
print('keyboard_task End')
- 入力された文字を格納
- 小生の環境では、1文字づつではなく、改行コードが入力された時点で、改行コードを含む文字数のpoll()がTrueとなる
- 改行コードは2回発生
- 改行コードの一つ前に入力された文字をcmdに格納
- 'e'が入力されるとタスク終了
- 「await asyncio.sleep_ms()」により、他のタスクへ制御を移す
LED On/OfFタスク
async def led_task():
global cmd
led = Pin(0, Pin.OUT)
going = 1
interval = 500 # ms
while True:
if cmd == 'e':
break
elif cmd == 's':
going = 0
elif cmd == 'r':
going = 1
elif cmd == 'i':
interval += 100
elif cmd == 'd':
interval -= 100
if interval < 100:
interval = 100
cmd = 'n' # back to default
if going == 1:
led.value(1)
await asyncio.sleep_ms(interval)
led.value(0)
await asyncio.sleep_ms(interval)
else:
await asyncio.sleep_ms(10)
print('led_task End')
- LED接続ピンは0
- 進行中フラグgoing
- LED点滅間隔デフォルト値は500ms
- キーボード入力されたコマンド(文字)の処理(前述)
- 進行中フラグON時には点滅
- 「await asyncio.sleep_ms()」により、他のタスクへ制御を移す
全体
import uasyncio as asyncio
from machine import Pin
import utime
from sys import stdin
from select import poll, POLLIN
cmd = 'n' # 's':stop 'r':resume 'e':end 'i':100ms increment 'd':100ms decrement
poll_obj = poll()
poll_obj.register(stdin, POLLIN)
async def keyboard_task():
global cmd
count = 0
last_input = 'n'
while True:
if poll_obj.poll(10):
input_one = stdin.read(1)
if input_one == '\n': # '\n' comes twice.
if count == 0:
count += 1
else: # count == 1
cmd = last_input
#print(cmd) # does not print well
#print(cmd.encode())
count = 0
if cmd == 'e':
break
else:
last_input = input_one
else:
await asyncio.sleep_ms(10)
print('keyboard_task End')
async def led_task():
global cmd
led = Pin(0, Pin.OUT)
going = 1
interval = 500 # ms
while True:
if cmd == 'e':
break
elif cmd == 's':
going = 0
elif cmd == 'r':
going = 1
elif cmd == 'i':
interval += 100
elif cmd == 'd':
interval -= 100
if interval < 100:
interval = 100
cmd = 'n' # back to default
if going == 1:
led.value(1)
await asyncio.sleep_ms(interval)
led.value(0)
await asyncio.sleep_ms(interval)
else:
await asyncio.sleep_ms(10)
print('led_task End')
async def main():
task1 = asyncio.create_task(led_task())
task2 = asyncio.create_task(keyboard_task())
#await led_task() # run led_task() and wait until it ends
await task1 # wait until task1 ends
await task2 # wait until task2 ends
print('main End')
asyncio.run(main())
検証
ほぼ自明なので省略
終わりに
時間がかかったが、理解してしまえば、使えるような気がする。スレッドとどのように使い分けるのかまでは至らず。