2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Queueを使う目的とマルチスレッドでの役割

Last updated at Posted at 2024-10-26

Queueを使う目的とマルチスレッドでの役割

1. Queueの目的

Queue は、複数のプロデューサ(データを供給する側)と複数のコンシューマ(データを処理する側)間でデータを安全にやり取りするためのデータ構造です。このコードでは、メインスレッドがタスクを Queue に追加し、複数のスレッドがそのタスクを並行して処理します。

2. マルチスレッドでQueueが使われる理由

マルチスレッド環境での課題として、データの競合レースコンディションがあります。複数のスレッドが同じデータにアクセスすると、データが破損するリスクがありますが、Queue はスレッドセーフな仕組みを提供するため、これらの問題を防ぐことができます。つまり、Queue は自動的にロック機構を持っており、複数のスレッドが同時にデータを取得しようとしたときにも安全に処理が行われます。

3. FIFO or LIFO

Queue のデフォルトの動作は FIFO(First In, First Out) です。つまり、最初に put されたデータが最初に get されます。ただし、queue.LifoQueue()queue.PriorityQueue() などを使用することで、LIFO(Last In, First Out)や優先度付きの動作も実現できます。


例題プログラムの紹介

このコードの目的は、複数のタスクを並列で効率よく処理することです。具体的には、キューを利用して複数のスレッドが同時に動作し、タスク(ここでは整数の値)を順番に取り出して処理します。

  1. 大量のデータを並列処理する:

    • このプログラムは、キューに大量のデータ(1から299までの整数)を追加し、それらを複数のスレッド(ここでは10個)で同時に処理します。
    • 各スレッドが独立して動作することで、全体の処理時間を短縮します。例えば、シングルスレッドで299のタスクを順番に処理するよりも、10個のスレッドが同時に処理を行うことで大幅に高速化されます。
  2. 安全に並列処理を行う:

    • Queue を使用することで、スレッドセーフな形でタスクを並列処理することができます。これにより、データの競合やレースコンディションといったマルチスレッド特有の問題を防ぎます。
    • スレッド間の競合を防ぐために、出力部分には Lock を使用し、コンソール出力がスレッドの実行順序によって混乱しないようにしています。
  3. タスク完了の同期:

    • メインスレッドは q.join() によって、すべてのタスクが処理されるまで待機します。この仕組みにより、全タスクが完了するまでメインスレッドが終了しないように制御しています。
    • 各スレッドは、処理が終わるたびに q.task_done() を呼び出し、タスクが正常に終了したことをキューに知らせます。

想定されるユースケース

  • データ処理パイプライン:

    • たとえば、画像のバッチ処理やログの解析など、膨大なデータを効率よく処理したい場合に利用できます。
    • メインスレッドが次々とタスク(データ)をキューに追加し、スレッドが並列で処理を進めていくことで全体の処理が高速化されます。
  • リアルタイムデータの消化:

    • ネットワークからのデータ受信や、センサーからのデータを並行して処理するシステムに応用できます。
    • キューがデータを受け取り、スレッドが処理を続けるので、スムーズにデータの流れを制御することが可能です。

全体の流れ

  1. メインスレッドがタスクをキューに追加。
  2. 10個のスレッドが並行してキューからタスクを取り出し、処理を実行。
  3. 各スレッドは処理完了ごとに q.task_done() で通知。
  4. メインスレッドは q.join() で全タスクが完了するまで待機し、終了。

このコードは、複数のタスクを効率的かつ安全に処理するための良い例であり、マルチスレッド環境でのキューの利用方法を理解する助けとなります。

コード全体と各部分の説明

from threading import Thread, Lock, current_thread
from queue import Queue
import time

def worker(q, lock):
    while True:
        value = q.get()  # キューからタスクを取得
        # タスクの処理(省略)
        with lock:  # ロックを使って出力を保護
            print(f"in {current_thread().name} got {value}")
        q.task_done()  # タスクが完了したことを通知

if __name__ == "__main__":

    q = Queue()  # スレッドセーフなキューの作成 (FIFO)
    lock = Lock()  # 出力の競合を防ぐためのロック
    num_threads = 10

    print(f" {num_threads} of threads will starts")

    # スレッドを作成し、起動する
    for i in range(num_threads):
        thread = Thread(target=worker, args=(q,lock))
        thread.daemon = True  # メインスレッドが終了したらスレッドも終了するように設定
        thread.start()

    print(f" put the values into the queue")

    # キューに値を入れる
    for i in range(1,300):
        q.put(i)

    # すべてのタスクが完了するまで待機
    q.join()

    print("end main")

1. from threading import Thread, Lock, current_thread

  • 目的: マルチスレッドを利用するために必要な機能をインポートします。Thread は新しいスレッドの作成、Lock はスレッド間の競合を防ぐためのロック機構、current_thread は現在実行中のスレッド情報を取得するために使います。

2. from queue import Queue

  • 目的: スレッドセーフな Queue をインポートします。FIFOの構造を持っており、スレッド間でデータを安全にやり取りできます。

3. def worker(q, lock):

  • 目的: 各スレッドが実行するタスクを定義しています。q.get() でキューからデータを取得し、with lock: によって出力の競合を防ぎながら処理を行います。
  • 重要ポイント: q.task_done() を呼び出すことで、キューにタスクが完了したことを通知しています。

4. if __name__ == "__main__":

  • 目的: メインプログラムのエントリーポイントを定義します。QueueLock のインスタンスを作成し、スレッドを起動します。

5. q = Queue()

  • 目的: FIFOのキューを作成します。Queue は自動的にスレッドセーフになっているため、複数のスレッドが同時にアクセスしても問題ありません。

6. for i in range(num_threads):

  • 目的: 複数のスレッドを起動します。各スレッドは worker 関数を実行します。
  • thread.daemon = True: メインスレッドが終了すると、自動的にサブスレッドも終了します。プログラム全体の終了を確実にするために設定しています。

7. for i in range(1,300):

  • 目的: キューにデータを投入します。q.put(i) でキューにタスク(整数値)を追加します。

8. q.join()

  • 目的: キューに入っているすべてのタスクが完了するまでメインスレッドを待機させます。スレッドがすべての q.get() で取得したタスクに対して q.task_done() を呼び出すと、q.join() がブロックを解除します。

まとめ

  • Queue の役割: 複数のスレッド間でタスクを安全に共有し、データの競合を防ぐために使用されます。Queue を使うことで、スレッド間でのデータのやり取りを容易にし、タスクの管理もシンプルに行えます。
  • q.join()q.task_done() の重要性: 正しくキューのタスクが管理され、メインスレッドがすべてのタスクが処理完了するまで待機するためのメカニズムを提供しています。
  • FIFOの動作: Queue はデフォルトでFIFO方式を採用しており、先に追加されたタスクが先に処理されます。

参考資料

Threading in Python - Advanced Python 16 - Programming Tutorial

PythonでリアルタイムWebカメラ処理を最適化する:スレッドとキューの利用

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?