14
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

pythonで予め次のyieldを準備するgeneratorを作りたい

Last updated at Posted at 2018-03-24

概要

  • HDD内の大量の画像を数枚ずつ読み込んで処理する必要があった1
  • 画像読み込みと処理を交互にやっていると時間がかかるので、画像の読み込み部分をマルチスレッド化したかった
  • with句を使って既存のジェネレーターを簡単にマルチスレッド化できるようにした

やったこと

フィボナッチ数列ジェネレータのように何かの値を生成し続けるとき、ジェネレーターが使われます。ジェネレーターは値が無限に生成される場合や、リスト化するとメモリに乗り切らなくなってしまう場合 (たとえば画像パスのリストから順に画像を読み込み逐次処理するとき) に有効です2

一方、ブロッキングI/Oを扱う場合はマルチスレッドの出番です。たとえば画像の読み込みと処理を交互に行う場合は、読み込みを別スレッドで実行することによって無駄なI/O待ちの時間を節約することができます。

よって、ジェネレーターとマルチスレッドを組み合わせると、非同期で何かの値を生成し続けることが可能になります。具体的には、ユーザーが next(generator) を呼び出すより前に、次に yield したい値を準備して待機しておくことができます。今回はこれを作りました。

必要なもの

  • threading.Thread ... スレッドを作成します
  • threading.Event ... スレッドの通信に使います3
    • Event.set() ... 実行すると内部フラグの値をTrueにセットします
    • Event.is_set() ... Event.set() により内部フラグがtrueにセットされている場合はTrueを返します
  • queue.Queue ... スレッド間で値をやりとりするキューです
  • contextlib ... with句を楽に作りたいときに必要です

できたもの

from threading import Thread, Event
from queue import Queue
import contextlib


@contextlib.contextmanager
def thread_generator(generator, max_queue_size, *args, **kwargs):
    """予め次にyieldする値を別スレッドで用意するgenerator

    マルチスレッド処理なので計算速度は向上しない。バッチをロードする
    ジェネレーターなど、ブロッキングI/Oが発生しうるものなどに対して有効

    Args:
        generator (function): 非同期で処理したいジェネレーター
        max_queue_size (int): 保存するキューの最大サイズ
        *args: generatorに引き渡す引数
        **kwargs: generatorに引き渡すキーワード引数

    Yields:
        第一引数で渡したgeneratorがyieldする値

    Examples:
        >>> import time
        >>> t_start = time.time()
        >>> def sample_generator(x, n):  # n回xを返すジェネレーター
        ...    for i in range(n):
        ...        time.sleep(1)
        ...        yield x
        >>> with thread_generator(sample_generator, 3, x=5, n=3) as gen:
        ...        for i in gen:
        ...            time.sleep(1)
        ...            print(i)
        5
        5
        5
        >>> t_end = time.time()
        >>> assert round(t_end - t_start) == 4  # 約4秒で実行できる

    Note:
        Examplesのように1秒かかる処理を3回実行する場合最大4秒かかります
        これはサブスレッドの終了タイミングがwith句を抜けかつ
        その時点でスレッド内で実行されていた処理が終了した後であるからです
    """
    queue = Queue(maxsize=max_queue_size)
    event = Event()

    def queuerunner(*args, **kwargs):
        """generatorが止まるまで値を取得してenqueueし続ける処理"""
        for i in generator(*args, **kwargs):
            if not event.is_set():
                queue.put(i)
            else:  # (finnalyで) eventフラグが立ったら終了する
                return

    thread = Thread(target=queuerunner, args=args, kwargs=kwargs, daemon=True)

    def queuegetter():
        """dequeueするだけのジェネレーター"""
        # runnerが実行されているかキューが残っている間はyieldし続ける
        while thread.is_alive() or not queue.empty():
            yield queue.get()

    thread.start()

    try:
        yield queuegetter()
    finally:
        # with句を抜けるタイミングで確実にスレッドを終わらせる処理
        event.set()
        thread.join()

sample_generator 内部と with 句内部にそれぞれ time.sleep(1) があります。つまり、普通のジェネレーターとして実行すると1ループあたり2秒かかります。しかし上のコードでは非同期で sample_generator 内部を処理しているので、1ループあたり1秒で済みます。

with を使う理由は、thread の終了タイミングを明確にするためです。最初はデコレーターによってジェネレーターをラップすれば良いと考えていましたが、それだと、

  • KeyboardInterrupt などで処理を中断してもスレッドが走り続けてしまう
  • 外部から import した関数を後から非同期化できない

という問題があったので、断念しました。

また、非同期でキューを貯めるので send は (当然) 使えません。 typing でいうところの Generator[Any, None, None] のみに対応しています。今回はシンプルさを目指したので書いていませんが実際にはこのような型アノテーションをつけておきたいですね。

ちなみに、Pythonのマルチスレッドはマルチプロセスではないので (参考: グローバルインタプリタロック (Global Interpreter Lock) を取り除くことはできないのですか?)、今回のスクリプトで並列計算によるプログラムのスピードアップを期待することはできないです。それらしいものを作りたいなら threading ライブラリの代わりに multiprocessing ライブラリを使うと実現できます。ただし multiprocessing を使った場合、このwith句を if __name__ == '__main__': 節以外で呼び出すことはできなくなります。これは今回の「ジェネレーターを気軽に非同期化できるようにする」という設計と反しているので、どうせやるなら書き直したほうがいいと思います。なお、Jupyter Notebookを使っている場合 multiprocessing の扱いはさらに難しくなります (たとえばノートブック全体が fork されて実行されます)。

  1. 深層学習ライブラリになるべく依存しないような画像バッチローダーを自作しようと思ったのがきっかけでした。どうせI/Oまわりがボトルネックになるのは目に見えているので、多少パフォーマンスが下がってでも、標準ライブラリだけで作った扱いやすいバッチローダーを用意しておきたかったという動機です。後で見返すと関数が queuerunner になってたりして tensorflow.train.QueueRunner を使ってた頃の影響が感じられますね。

  2. 本当はこの場合素直に for path in paths: とするほうがシンプルだと思いますが、そこは導入ということで目を瞑ってもらいたいです。

  3. 参考にさせていただいたおまいらのthreading.Eventの使い方は間違っているでは「 wait() を呼ばなければ Event を使う意味がない」とする記述があります。確かに wait() を呼ばずに単純なフラグとして Event を使うのは旨味がないかもしれませんが、公式にあるように Event 自体は「スレッド間で通信を行うための最も単純なメカニズム」なので、ここでは使用目的に十分合致しているだろうと判断し使っています。

14
13
11

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?