LoginSignup
0
0

MicroPythonを使ってみる(その6:asyncio)

Posted at

asyncioで非同期処理

久しぶりのMicroPython投稿。秋月電子で購入した「RP2040マイコンボードキット」上でMicroPythonを使い、非同期処理フレームワークで有名どころのasyncioの基本を学ぶことにした。global利用など、相変わらず美しないところがあるのは勘弁。なお、「タスク」という用語を用いている。

やること(何が非同期か?)

  • キーボード入力処理タスクとLED On/Offタスクとが非同期に動作
  • 入力された文字によりLED On/Offの動作を変える
    • 's':On/OffをStop
    • 'r':On/OffをResume
    • 'i':On/Offの間隔を100ms増やす
    • 'd':On/Offの間隔を100ms減らす
    • 'e':すべてのタスク終了(メイン含む)
  • 両タスク間の排他処理は未考慮

ソースコード

いつものように、キーとなるところを記載。

importなど

import uasyncio as asyncio
from machine import Pin
import utime
from sys import stdin
from select import poll, POLLIN
  • 各種import
    • asyncio
    • GPIO PIN(LED用)
    • キーボード入力関連(stdin,select)

共通

cmd = 'n' # 's':stop 'r':resume 'e':end 'i':100ms increment 'd':100ms decrement

poll_obj = poll()
poll_obj.register(stdin, POLLIN)
  • キーボードから入力されるコマンドの定義(後述)
  • キーボード入力用変数

メイン

async def main():
    task1 = asyncio.create_task(led_task())
    task2 = asyncio.create_task(keyboard_task())
    #await led_task() # run led_task() and wait until it ends
    await task1 # wait until task1 ends
    await task2 # wait until task2 ends
    print('main End')

asyncio.run(main())
  • キーボード入力処理タスク起動
  • LED On/OfFタスク起動
  • タスク終了待ち(await)
    • 「await タスク」とすると、タスクを起動し、終了まで待つ

キーボード入力処理タスク

async def keyboard_task():
    global cmd
    count = 0
    last_input = 'n'
    while True:       
        if poll_obj.poll(10):
            input_one = stdin.read(1)
            if input_one == '\n': # '\n' comes twice.
                if count == 0:
                    count += 1
                else: # count == 1
                    cmd = last_input
                    #print(cmd) # does not print well
                    #print(cmd.encode())
                    count = 0
                    if cmd == 'e':
                        break
            else:
                last_input = input_one
        else:
            await asyncio.sleep_ms(10)
    print('keyboard_task End')
  • 入力された文字を格納
    • 小生の環境では、1文字づつではなく、改行コードが入力された時点で、改行コードを含む文字数のpoll()がTrueとなる
    • 改行コードは2回発生
    • 改行コードの一つ前に入力された文字をcmdに格納
    • 'e'が入力されるとタスク終了
  • 「await asyncio.sleep_ms()」により、他のタスクへ制御を移す

LED On/OfFタスク

async def led_task():
    global cmd
    led = Pin(0, Pin.OUT)
    going = 1
    interval = 500 # ms
    while True:
        if cmd == 'e':
            break
        elif cmd == 's':
            going = 0
        elif cmd == 'r':
            going = 1
        elif cmd == 'i':
            interval += 100
        elif cmd == 'd':
            interval -= 100
            if interval < 100:
                interval = 100
        cmd = 'n' # back to default
        if going == 1:
            led.value(1)
            await asyncio.sleep_ms(interval)
            led.value(0)
            await asyncio.sleep_ms(interval)
        else:
            await asyncio.sleep_ms(10)
    print('led_task End')
  • LED接続ピンは0
  • 進行中フラグgoing
  • LED点滅間隔デフォルト値は500ms
  • キーボード入力されたコマンド(文字)の処理(前述)
  • 進行中フラグON時には点滅
  • 「await asyncio.sleep_ms()」により、他のタスクへ制御を移す

全体

import uasyncio as asyncio
from machine import Pin
import utime
from sys import stdin
from select import poll, POLLIN

cmd = 'n' # 's':stop 'r':resume 'e':end 'i':100ms increment 'd':100ms decrement

poll_obj = poll()
poll_obj.register(stdin, POLLIN)

async def keyboard_task():
    global cmd
    count = 0
    last_input = 'n'
    while True:       
        if poll_obj.poll(10):
            input_one = stdin.read(1)
            if input_one == '\n': # '\n' comes twice.
                if count == 0:
                    count += 1
                else: # count == 1
                    cmd = last_input
                    #print(cmd) # does not print well
                    #print(cmd.encode())
                    count = 0
                    if cmd == 'e':
                        break
            else:
                last_input = input_one
        else:
            await asyncio.sleep_ms(10)
    print('keyboard_task End')

async def led_task():
    global cmd
    led = Pin(0, Pin.OUT)
    going = 1
    interval = 500 # ms
    while True:
        if cmd == 'e':
            break
        elif cmd == 's':
            going = 0
        elif cmd == 'r':
            going = 1
        elif cmd == 'i':
            interval += 100
        elif cmd == 'd':
            interval -= 100
            if interval < 100:
                interval = 100
        cmd = 'n' # back to default
        if going == 1:
            led.value(1)
            await asyncio.sleep_ms(interval)
            led.value(0)
            await asyncio.sleep_ms(interval)
        else:
            await asyncio.sleep_ms(10)
    print('led_task End')

async def main():
    task1 = asyncio.create_task(led_task())
    task2 = asyncio.create_task(keyboard_task())
    #await led_task() # run led_task() and wait until it ends
    await task1 # wait until task1 ends
    await task2 # wait until task2 ends
    print('main End')

asyncio.run(main())

検証

ほぼ自明なので省略

終わりに

時間がかかったが、理解してしまえば、使えるような気がする。スレッドとどのように使い分けるのかまでは至らず。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0