はじめに

Pythonのthreading.Eventを使っているサンプルはないかとググっていたら上位に間違った使い方をしているものが出てきました。
Qiitaでthreading.Eventを使った投稿でも3件全てが間違った使い方をしているという悲惨な状況たったので、正しい使い方を説明します。

threading.Eventとは

イベントが発生するまでスレッドを待機させ、他のスレッドからイベントを発生させると待機スレッドが再開する、という使い方をする為のクラスです。

最も重要なメソッドは以下の2つです。

  • wait()
    • イベントが発生するかタイムアウトになるまで現在のスレッドを待機させる。
  • set()
    • イベントを発生させ、待機スレッドを再開させる。

他にもclear()とis_set()があります。
詳細はPythonドキュメントを参照してください。

正しい使い方

まずは正しい使い方を見てみましょう。

単純な例

wait()とset()のみ使った最も単純な例です。
スレッドではイベント発生まで待機、メインスレッドはスレッド開始から3秒後にイベントを発生させています。

前半はログに時刻とスレッド名を出力するための設定をしているので少し長くなってしまいましたが、気にしないで下さい。

from logging import (getLogger, StreamHandler, INFO, Formatter)

# ログの設定
handler = StreamHandler()
handler.setLevel(INFO)
handler.setFormatter(Formatter("[%(asctime)s] [%(threadName)s] %(message)s"))
logger = getLogger()
logger.addHandler(handler)
logger.setLevel(INFO)


from threading import (Event, Thread)
import time


event = Event()


def event_example1():
    logger.info("スレッド開始")
    event.wait()
    logger.info("スレッド終了")

thread = Thread(target=event_example1)
thread.start()
time.sleep(3)
logger.info("イベント発生")
event.set()
実行結果
[2016-09-27 00:04:18,400] [Thread-6] スレッド開始
[2016-09-27 00:04:21,406] [MainThread] イベント発生
[2016-09-27 00:04:21,407] [Thread-6] スレッド終了

時刻を見るとスレッドがイベント発生まで待機していることがわかります。

timeoutを指定してwaitを呼ぶ

タイムアウトを指定すると、イベントが発生しなくても指定の秒数が経過するとスレッドが再開します。
wait()の戻り値はイベントが発生した時にはTrue、それ以外ではFalseになります。

event = Event()


def event_example2():
    logger.info("スレッド開始")
    while not event.wait(2):
        logger.info("まーだだよ")
    logger.info("スレッド終了")

thread = Thread(target=event_example2)
thread.start()
time.sleep(5)
logger.info("イベント発生")
event.set()
実行結果
[2016-09-27 00:04:21,407] [Thread-7] スレッド開始
[2016-09-27 00:04:23,412] [Thread-7] まーだだよ
[2016-09-27 00:04:25,419] [Thread-7] まーだだよ
[2016-09-27 00:04:26,409] [MainThread] イベント発生
[2016-09-27 00:04:26,409] [Thread-7] スレッド終了

イベントが発生しなくてもタイムアウトしているのがわかります。
また、イベント発生時にはタイムアウト前に再開しています。

Eventを繰り返し使う

ドキュメントにも書いてありますが、set()を一度呼んだ後だとwait()を呼んでもスレッドは待機せずに処理を戻してしまいます。
従ってEventを繰り返し使う場合にはclear()を呼んでイベントをクリアする必要があります。

以下はclear()を使った例です。
また、スレッドを終了するためにeventとは別にboolのフラグstopを使用しています。

event = Event()

# イベント停止のフラグ
stop = False


def event_example3():
    logger.info("スレッド開始")
    count = 0
    while not stop:
        event.wait()
        event.clear()
        count += 1
        logger.info(count)
    logger.info("スレッド終了")

thread = Thread(target=event_example3)
thread.start()

time.sleep(1)
event.set()
time.sleep(1)
event.set()
time.sleep(1)
stop = True
event.set()

thread.join()
実行結果
[2016-09-27 00:04:26,410] [Thread-8] スレッド開始
[2016-09-27 00:04:27,415] [Thread-8] 1
[2016-09-27 00:04:28,417] [Thread-8] 2
[2016-09-27 00:04:29,421] [Thread-8] 3
[2016-09-27 00:04:29,421] [Thread-8] スレッド終了

間違った使い方

次に検索して出てきた間違った使い方を見ていきます。

間違いパターン1 - threading.Eventを単なるフラグとして使っている

無限ループするスレッドを複数持つプロセスを終了する方法 - Qiita
実行中のスレッドに対し外から操作をする - Qiita
EventとQueueを使ってマルチスレッドなズンドコキヨシ - Qiita

set()とis_set()のみ使用してEventを単なるフラグとしてしか使っていないパターンです。
間違っているというのは言い過ぎかも知れませんが、wait()を呼ばなければEventを使う意味がありません。

Eventを単なるフラグとして使った例
event = Event()


def bad_example1():
    logger.info("スレッド開始")
    while not event.is_set():
        logger.info("まーだだよ")
        time.sleep(2)
    logger.info("スレッド終了")

thread = Thread(target=bad_example1)
thread.start()
time.sleep(5)
logger.info("イベント発生")
event.set()

このようなケースではスレッド内のtime.sleep()とis_set()の部分をEvent.wait()に置き換えるか、あるいはEventを使わずにboolを使う方がシンプルになります。

wait()を使う
event = Event()


def bad_example1():
    logger.info("スレッド開始")
    while not event.wait(timeout=2):
        logger.info("まーだだよ")
    logger.info("スレッド終了")

thread = Thread(target=bad_example1)
thread.start()
time.sleep(5)
logger.info("イベント発生")
event.set()
boolのフラグを使う
stop = False


def bad_example1():
    logger.info("スレッド開始")
    while not stop:
        logger.info("まーだだよ")
        time.sleep(2)
    logger.info("スレッド終了")

thread = Thread(target=bad_example1)
thread.start()
time.sleep(5)
logger.info("イベント発生")
stop = True

間違いパターン2 - clear()を忘れている

Python の threading.Event を試してみる | CUBE SUGAR STORAGE

先ずは上記リンクのサンプルを見てください。

BlockingQueueクラスではpop()を呼び出した時にキューが空だとイベント発生まで待機、push()ではキューに値を追加するとイベントを発生させてpop()を呼び出した待機スレッドを再開させています。

Consumerがpop()を繰り返し呼び出し、Producerは1秒間隔でpush()を呼び出してランダムな値をキューに追加しています。

ソースを見るとwait()を使っているし、実行しても1秒間隔でランダムな値が表示されるので期待する動作をしているように見えます。

実行結果
27
88
53
148
:

しかし、wait()の箇所にログを入れてみるととんでもない動きをしている事がわかります。

    def pop(self):
        # wait() を抜けたスレッドが復帰するループ
        while True:
            # ロックを取得する
            with self.lock:
                if self.queue:
                    # キューに要素があればそれを返す
                    return self.queue.pop()
                else:
                    print("waiting...")  # <--- 追加
                    # キューが空なら別のスレッドが要素を追加して通知してくるまで待つ
                    self.event.wait()
実行結果
waiting...
waiting...
waiting...
waiting...
:

waiting...が大量に表示され、wait()でスレットが全く待機していないことがわかります。

原因と修正

これはclear()を呼んでいないために、一度set()を呼び出した後はwait()を呼び出しても待機してくれないからです。

これを正しく動作させるためにはclear()を呼び出せばいいのですが、他にもロックを保持したままwait()しているのでデッドロック状態になるというバグもあったりして、以下のように修正する必要がありました。

修正後
    def pop(self):
        # wait() を抜けたスレッドが復帰するループ
        while True:
            self.event.clear()
            # ロックを取得する
            with self.lock:
                if self.queue:
                    # キューに要素があればそれを返す
                    return self.queue.pop()
            print "waiting..."
            # キューが空なら別のスレッドが要素を追加して通知してくるまで待つ
            self.event.wait()
実行結果
7
waiting...
169
waiting...
113
waiting...

これできちんとwaitしてくれるようになりました。

余談ですが

Python標準モジュールにはqueueというのがあります。

これはマルチスレッドに対応したキューで、以下のようなことができます。

  • pop()を呼び出した時にキューが空ならスレッドを待機する
  • push()を呼び出した時にキューが満杯なら空きができるまで待機する

このサンプルで出てくるBlockingQueueではpop()を呼び出した時にキューが空ならスレッドを待機するというのをやっていますが、これは既に標準モジュールで実現済みということです。

このサンプルはあくまでEventの説明として書かれているだけだと思います。
マルチスレッドでキューを使いたい時は自分で作らずに標準モジュールのqueueを使いましょう。

まとめ

  • threading.Eventはwait()を使わないと意味がない
  • フラグが使いたいならboolを使う
  • 繰り返し使用する場合はclear()を忘れずに呼ぶ
  • マルチスレッドでキューを使いたい時は標準モジュールのqueueを使う
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.