16
13

More than 3 years have passed since last update.

Pythonのスレッド間の協調作業にはQueueを使う

Posted at

はじめに

Twitterで一時期流行していた 100 Days Of Code なるものを先日知りました。本記事は、初学者である私が100日の学習を通してどの程度成長できるか記録を残すこと、アウトプットすることを目的とします。誤っている点、読みにくい点多々あると思います。ご指摘いただけると幸いです!

今回学習する教材

  • Effective Python

    • 8章構成
    • 本章216ページ
  • 第5章:並行性と並列性

スレッド間の協調作業にはQueueを使う

並行作業を行うのに役立つ方法として関数パイプラインがあります。
パイプラインは、製造における組み立てラインのように働きます。
このような動作を実現するためにqueueを使います。
例として、デジタルカメラから一連の画像を取り出し、サイズを変更して、オンラインのフォトギャラリーに投稿するというシステムを考えます。
このシステムは、3段に分けたパイプラインで作れます。
第1段階では、新たな画像を取り出します。そのダウンロードした画像を第2段階のサイズ変更関数に引き渡します。サイズ変更した写真が、第3段階で使われてアップロードされます。最後に、すべての処理が終了したデータをdoneに格納します。
このプログラムのイメージは以下のようになります。
並行処理プログラムが第1段階から第3段階までの処理をチェックし、プログラムを処理します。

image.png

実は、自作のQueueクラスでこのプログラムを単純に実装しようとすると、多くの問題に直面します。

まず初めに、プログラムの終了条件を確認するために、doneのqueueをビジーウェイト(常に状態を確認すること)する必要があります。

image.png

第二に、前半のパイプラインが渋滞すると、後半のパイプラインは空になる頻度が高くなります。そうすると、後半のスレッドは空のキューをチェックするだけの、無駄な仕事をすることになります。

image.png

第三に、すべての仕事が終了した際に、各スレッドにこのループを終了したことを伝える必要があります。
単純な実装では、仕事がすべて終わっても download, resize, upload は終わったことに気付かず、ループを抜け出せない状態になります。

image.png

第四に、パイプラインの重体によって、プログラムがどこかでクラッシュする可能性があります。

image.png

Pythonの組み込みモジュールqueueのQueueクラスは、このような多くの問題をすべて解決する機能を持っています。
Queueは新たなデータが得られるまでgetメソッドをブロックすることで、ビジーウェイトを使わずに処理できます。

from threading import Thread
from queue import Queue

queue = Queue()

def consumer():
    print('consumer waiting') 
    queue.get()                  # put() の後で実行される
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
# 実データの代わりにオブジェクトインスタンスを使用
queue.put(object())              # 上のget() より先に実行される
thread.join()
print('Producer done')

実行結果

consumer waiting
Producer putting
Consumer done
Producer done

パイプライン渋滞を解消するために、Queueクラスでは、2つのスレッド間での処理待ちデータの最大量を指定できます。

queue = Queue(1)                 # スレッド間での処理待ちデータは1つ

def consumer():
    time.sleep(0.1)
    queue.get()                  # 2番目の実行
    print('Consumer got 1')
    queue.get()                  # 4番目の実行
    print('Consumer got 2')

thread = Thread(target=consumer)
thread.start()

# 処理待ちデータが1なので、putを実行した後に一度getが実行されないと2つ目のputは実行されない
queue.put(object())
print('Producer put 1')          # 1番目の実行
queue.put(object())
print('Producer put 2')          # 3番目の実行
thread.join()
print('Producer done')

実行結果

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done

Queueクラスのtask_doneメソッドを使うことで作業進捗の追跡をすることで、パイプラインの終端で、ポーリング(データ数を常に確認)する必要が無くなります。

in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()        # 2番目 完了
    print('Consumer working')
    # 作業中
    print('Consumer done')
    in_queue.task_done()         # 3番目 完了

Thread(target=consumer).start()

# Producerは、Queueインスタンスでjoinを呼び出し、in_queueが終了するのを待つ
# 空になっていても、in_queueは、すべてのデータでtask_doneが呼ばれるまでjoinできない

in_queue.put(object())
print('Producer waiting')        # 1番目 完了
in_queue.join()
print('Producer done')           # 4番目 完了

実行結果

Consumer waiting
Producer waiting
Consumer working
Consumer done
Producer done

キューに対して終端を知らせる特別な要素を追加するcloseメソッドを実装することで、download, resize, upload スレッドに対していつ処理を止めるべきかを通知するようにできます。また、__iter__メソッドを定義することで、適当な時期にtask_doneを呼び出して、キューの作業進捗を確認します。

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # スレッドを終了させる
                yield item
            finally:
                self.task_done()

スレッドがClosableQueueクラスの振る舞いに依存するように定義し、一連の動作をするコードを書きます。

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # スレッドを終了させる
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

# download, resize, upload の疑似関数
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

# すべての入力作業が投入されたら、第一段階の入力キューを閉じて停止信号を送る
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close()

# 各段階を結合してたキューをジョインして作業終了を待つ

download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')
# 1000 items finished

Queueクラスを使うことで、並行作業でキューを扱う際の問題点を気にすることなく、コードを書くことができるようになります。

16
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
13