今回は前回LEGO Mindstorms EV3(以降EV3)のセンサー複数処理に加えライントレースをスレッドに加えなおかつ、センサーがしきい値に達した際にはライントレースを停止しプログラムを終了する処理をさせていく。一つのスレッドが終了したときに他のスレッドも連動して終了させるためにthreading
のEvent
メソッドを利用する。
EV3について
本記事内での環境
PC
Windows10
Python 3.7.3
開発環境 VisualStudioCodeEV3
ev3dev
APIリファレンス
環境構築やソースコードの作成、実行はこちら
今回利用するEV3のモデル
今回は超音波センサーとタッチセンサーに加えライントレースのためにカラーセンサー、モーターも利用していく。
また、ロボットのモデルとしてはベースロボと呼ばれるモデルでライントレースを行う。
ソースコード
今回もthreading
を用いて非同期処理を実装していくが、今回はタッチセンサーか超音波センサーのどちらか一方は検知したらプログラムを終了させたい。
前回のようにそのまま実装すると一つのセンサースレッドが終了しても、もう一方が終了するまでスレッドは継続されてしまう。
def wtouch():
while True:
if touch.is_pressed:
print('Touched')
sound.tone(1000, 200)
time.sleep(1.0)
break
def wsonic():
while True:
if ultrasonic.distance_centimeters < 15:
print('Detected')
sound.tone(500, 200)
time.sleep(1.0)
break
上記が前回のセンサー検知部分の関数
そこで今回はthreading
内に用意されているEvent
オブジェクトを使ってTrue、Falseのフラグのやり取りをスレッド間で行おうと思う。またEventのメソッドに用意されているwait()
を利用することでスレッドの処理を待機させることもできる。今回はフラグの操作を行うset()
とフラグの判定を行うis_set()
を利用する。
以下がソースコードになる。
from ev3dev2.button import Button
from ev3dev2.sound import Sound
from ev3dev2.motor import LargeMotor, OUTPUT_B, OUTPUT_C
from ev3dev2.sensor import INPUT_1, INPUT_3, INPUT_4
from ev3dev2.sensor.lego import ColorSensor, TouchSensor, UltrasonicSensor
import time
import threading
import sys
button = Button()
sound = Sound()
sound.set_volume(20)
touch = TouchSensor()
color = ColorSensor()
ultrasonic = UltrasonicSensor()
lm_b = LargeMotor(OUTPUT_B)
lm_c = LargeMotor(OUTPUT_C)
event = threading.Event()
def wtouch():
while not event.is_set():
if touch.is_pressed:
print('Touched')
sound.tone(1000, 200)
lm_b.stop(stop_action='brake')
lm_c.stop(stop_action='brake')
event.set()
print('end-touch')
def wsonic():
while not event.is_set():
if ultrasonic.distance_centimeters < 15:
print('Detected')
sound.tone(500, 200)
lm_b.stop(stop_action='brake')
lm_c.stop(stop_action='brake')
event.set()
print('end-sonic')
def linetrace():
while not event.is_set():
if color.reflected_light_intensity > 30: # White
lm_b.run_forever(speed_sp=100)
lm_c.run_forever(speed_sp=300)
else: # Black
lm_b.run_forever(speed_sp=300)
lm_c.run_forever(speed_sp=100)
time.sleep(0.1)
lm_b.stop(stop_action='brake')
lm_c.stop(stop_action='brake')
print('end-linetrace')
def main():
th1 = threading.Thread(target = wtouch)
th2 = threading.Thread(target = wsonic)
th3 = threading.Thread(target = linetrace)
th1.start()
th2.start()
th3.start()
print('Thread is started')
if __name__ == '__main__':
main()
event = threading.Event()
でthreading.Event()をインスタンス化している。
関数としてdef wtouch
でタッチセンサーの判定を用意し、def wsonic
で超音波センサーの判定を用意している。
今回はWhileのループ条件としてEventのメソッドを条件にしている。
実行
実行してみると超音波センサーとタッチセンサーのどちらか一方が検知されるとライントレースを停止し、すべてのスレッドを終了後プログラムが終了しているのがわかる。
各センサーの関数内でevent.set()
が呼び出されると各スレッドが終了するがコンソールへの出力を見てみると、どちらのセンサーで検出してもスレッド1から3へ順番に終了しているのがわかる。