概要
- HDD内の大量の画像を数枚ずつ読み込んで処理する必要があった1
- 画像読み込みと処理を交互にやっていると時間がかかるので、画像の読み込み部分をマルチスレッド化したかった
- with句を使って既存のジェネレーターを簡単にマルチスレッド化できるようにした
やったこと
フィボナッチ数列ジェネレータのように何かの値を生成し続けるとき、ジェネレーターが使われます。ジェネレーターは値が無限に生成される場合や、リスト化するとメモリに乗り切らなくなってしまう場合 (たとえば画像パスのリストから順に画像を読み込み逐次処理するとき) に有効です2。
一方、ブロッキングI/Oを扱う場合はマルチスレッドの出番です。たとえば画像の読み込みと処理を交互に行う場合は、読み込みを別スレッドで実行することによって無駄なI/O待ちの時間を節約することができます。
よって、ジェネレーターとマルチスレッドを組み合わせると、非同期で何かの値を生成し続けることが可能になります。具体的には、ユーザーが next(generator)
を呼び出すより前に、次に yield
したい値を準備して待機しておくことができます。今回はこれを作りました。
必要なもの
-
threading.Thread
... スレッドを作成します -
threading.Event
... スレッドの通信に使います3-
Event.set()
... 実行すると内部フラグの値をTrueにセットします -
Event.is_set()
...Event.set()
により内部フラグがtrueにセットされている場合はTrueを返します
-
-
queue.Queue
... スレッド間で値をやりとりするキューです -
contextlib
... with句を楽に作りたいときに必要です
できたもの
from threading import Thread, Event
from queue import Queue
import contextlib
@contextlib.contextmanager
def thread_generator(generator, max_queue_size, *args, **kwargs):
"""予め次にyieldする値を別スレッドで用意するgenerator
マルチスレッド処理なので計算速度は向上しない。バッチをロードする
ジェネレーターなど、ブロッキングI/Oが発生しうるものなどに対して有効
Args:
generator (function): 非同期で処理したいジェネレーター
max_queue_size (int): 保存するキューの最大サイズ
*args: generatorに引き渡す引数
**kwargs: generatorに引き渡すキーワード引数
Yields:
第一引数で渡したgeneratorがyieldする値
Examples:
>>> import time
>>> t_start = time.time()
>>> def sample_generator(x, n): # n回xを返すジェネレーター
... for i in range(n):
... time.sleep(1)
... yield x
>>> with thread_generator(sample_generator, 3, x=5, n=3) as gen:
... for i in gen:
... time.sleep(1)
... print(i)
5
5
5
>>> t_end = time.time()
>>> assert round(t_end - t_start) == 4 # 約4秒で実行できる
Note:
Examplesのように、1秒かかる処理を3回実行する場合最大4秒かかります。
これは、サブスレッドの終了タイミングが、with句を抜け、かつ、
その時点でスレッド内で実行されていた処理が「終了した後」であるからです。
"""
queue = Queue(maxsize=max_queue_size)
event = Event()
def queuerunner(*args, **kwargs):
"""generatorが止まるまで値を取得してenqueueし続ける処理"""
for i in generator(*args, **kwargs):
if not event.is_set():
queue.put(i)
else: # (finnalyで) eventフラグが立ったら終了する
return
thread = Thread(target=queuerunner, args=args, kwargs=kwargs, daemon=True)
def queuegetter():
"""dequeueするだけのジェネレーター"""
# runnerが実行されているかキューが残っている間はyieldし続ける
while thread.is_alive() or not queue.empty():
yield queue.get()
thread.start()
try:
yield queuegetter()
finally:
# with句を抜けるタイミングで確実にスレッドを終わらせる処理
event.set()
thread.join()
sample_generator
内部と with
句内部にそれぞれ time.sleep(1)
があります。つまり、普通のジェネレーターとして実行すると1ループあたり2秒かかります。しかし上のコードでは非同期で sample_generator
内部を処理しているので、1ループあたり1秒で済みます。
with
を使う理由は、thread
の終了タイミングを明確にするためです。最初はデコレーターによってジェネレーターをラップすれば良いと考えていましたが、それだと、
-
KeyboardInterrupt
などで処理を中断してもスレッドが走り続けてしまう - 外部から
import
した関数を後から非同期化できない
という問題があったので、断念しました。
また、非同期でキューを貯めるので send
は (当然) 使えません。 typing
でいうところの Generator[Any, None, None]
のみに対応しています。今回はシンプルさを目指したので書いていませんが実際にはこのような型アノテーションをつけておきたいですね。
ちなみに、Pythonのマルチスレッドはマルチプロセスではないので (参考: グローバルインタプリタロック (Global Interpreter Lock) を取り除くことはできないのですか?)、今回のスクリプトで並列計算によるプログラムのスピードアップを期待することはできないです。それらしいものを作りたいなら threading
ライブラリの代わりに multiprocessing
ライブラリを使うと実現できます。ただし multiprocessing
を使った場合、このwith句を if __name__ == '__main__':
節以外で呼び出すことはできなくなります。これは今回の「ジェネレーターを気軽に非同期化できるようにする」という設計と反しているので、どうせやるなら書き直したほうがいいと思います。なお、Jupyter Notebookを使っている場合 multiprocessing
の扱いはさらに難しくなります (たとえばノートブック全体が fork
されて実行されます)。
-
深層学習ライブラリになるべく依存しないような画像バッチローダーを自作しようと思ったのがきっかけでした。どうせI/Oまわりがボトルネックになるのは目に見えているので、多少パフォーマンスが下がってでも、標準ライブラリだけで作った扱いやすいバッチローダーを用意しておきたかったという動機です。後で見返すと関数が
queuerunner
になってたりしてtensorflow.train.QueueRunner
を使ってた頃の影響が感じられますね。 ↩ -
本当はこの場合素直に
for path in paths:
とするほうがシンプルだと思いますが、そこは導入ということで目を瞑ってもらいたいです。 ↩ -
参考にさせていただいたおまいらのthreading.Eventの使い方は間違っているでは「
wait()
を呼ばなければEvent
を使う意味がない」とする記述があります。確かにwait()
を呼ばずに単純なフラグとしてEvent
を使うのは旨味がないかもしれませんが、公式にあるようにEvent
自体は「スレッド間で通信を行うための最も単純なメカニズム」なので、ここでは使用目的に十分合致しているだろうと判断し使っています。 ↩