3
5

More than 3 years have passed since last update.

Python-LEGO Mindstormsのセンサーをスレッドで非同期処理する その2

Last updated at Posted at 2020-11-30

今回は前回LEGO Mindstorms EV3(以降EV3)のセンサー複数処理に加えライントレースをスレッドに加えなおかつ、センサーがしきい値に達した際にはライントレースを停止しプログラムを終了する処理をさせていく。一つのスレッドが終了したときに他のスレッドも連動して終了させるためにthreadingEventメソッドを利用する。

EV3について

教育版 LEGO® MINDSTORMS EV3

本記事内での環境

環境構築やソースコードの作成、実行はこちら

今回利用するEV3のモデル

今回は超音波センサーとタッチセンサーに加えライントレースのためにカラーセンサー、モーターも利用していく。
また、ロボットのモデルとしてはベースロボと呼ばれるモデルでライントレースを行う。

65.png

ソースコード

今回もthreadingを用いて非同期処理を実装していくが、今回はタッチセンサーか超音波センサーのどちらか一方は検知したらプログラムを終了させたい。
前回のようにそのまま実装すると一つのセンサースレッドが終了しても、もう一方が終了するまでスレッドは継続されてしまう。

thread-test1.py
def wtouch():    
    while True:
        if touch.is_pressed:
            print('Touched')
            sound.tone(1000, 200)
            time.sleep(1.0)
            break


def wsonic():
    while True:
        if ultrasonic.distance_centimeters < 15:
            print('Detected')
            sound.tone(500, 200)
            time.sleep(1.0)
            break

上記が前回のセンサー検知部分の関数

そこで今回はthreading内に用意されているEventオブジェクトを使ってTrue、Falseのフラグのやり取りをスレッド間で行おうと思う。またEventのメソッドに用意されているwait()を利用することでスレッドの処理を待機させることもできる。今回はフラグの操作を行うset()とフラグの判定を行うis_set()を利用する。

threading公式ドキュメント

以下がソースコードになる。

thread_test2.py
from ev3dev2.button import Button
from ev3dev2.sound import Sound
from ev3dev2.motor import LargeMotor, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor import INPUT_1, INPUT_3, INPUT_4
from ev3dev2.sensor.lego import ColorSensor, TouchSensor, UltrasonicSensor
import time
import threading
import sys

button = Button()
sound = Sound()
sound.set_volume(20)
touch = TouchSensor()
color = ColorSensor()
ultrasonic = UltrasonicSensor()
lm_b = LargeMotor(OUTPUT_B)
lm_c = LargeMotor(OUTPUT_C)
event = threading.Event()


def wtouch():
    while not event.is_set():
        if touch.is_pressed:
            print('Touched')
            sound.tone(1000, 200)
            lm_b.stop(stop_action='brake')
            lm_c.stop(stop_action='brake')
            event.set()
    print('end-touch')


def wsonic():
    while not event.is_set():
        if ultrasonic.distance_centimeters < 15:
            print('Detected')
            sound.tone(500, 200)
            lm_b.stop(stop_action='brake')
            lm_c.stop(stop_action='brake')
            event.set()
    print('end-sonic')


def linetrace():
    while not event.is_set():
        if color.reflected_light_intensity > 30:  # White
            lm_b.run_forever(speed_sp=100)
            lm_c.run_forever(speed_sp=300)
        else:  # Black
            lm_b.run_forever(speed_sp=300)
            lm_c.run_forever(speed_sp=100)
        time.sleep(0.1)
    lm_b.stop(stop_action='brake')
    lm_c.stop(stop_action='brake')
    print('end-linetrace')


def main():
    th1 = threading.Thread(target = wtouch)
    th2 = threading.Thread(target = wsonic)
    th3 = threading.Thread(target = linetrace)

    th1.start()
    th2.start()
    th3.start()

    print('Thread is started')


if __name__ == '__main__':
    main()

event = threading.Event()でthreading.Event()をインスタンス化している。
関数としてdef wtouchでタッチセンサーの判定を用意し、def wsonicで超音波センサーの判定を用意している。
今回はWhileのループ条件としてEventのメソッドを条件にしている。

実行

実行してみると超音波センサーとタッチセンサーのどちらか一方が検知されるとライントレースを停止し、すべてのスレッドを終了後プログラムが終了しているのがわかる。

各センサーの関数内でevent.set()が呼び出されると各スレッドが終了するがコンソールへの出力を見てみると、どちらのセンサーで検出してもスレッド1から3へ順番に終了しているのがわかる。

1.png

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5