0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OECUAdvent Calendar 2024

Day 5

[Python]並列処理をthreadingモジュールで

Last updated at Posted at 2024-12-04

こんにちは!アドベントカレンダーの枠が空いてたので,短い頻度での連投にはなりますが記事を上げさせてもらいます.普段からあまり文章を書かないので乱文になるとは思いますが,どうぞよろしくお願いします.

はじめに

この記事ではpythonで並列処理を行う方法について軽くまとめます.pythonで並列処理を行う方法には,threadingmultiprocessingconcurrent.futureなどいくつかありますが,全てまとめると結構な量になるので,本記事ではthreadingモジュールの中でも基本的な部分だけを扱います.もっと詳しく知りたいという方はPythonの公式ドキュメントに目を通すといいかもしれません.
https://docs.python.org/ja/3.12/library/concurrency.html

環境

記事内のコードは次の環境で動作確認をしています

  • デバイス MacBook Air (M2, 2022)
  • OS:macOS Sequoia 15.1.1
  • 実行環境:Docker Desktop 4.36.0
    • コンテナ環境
      • Ubuntu 24.04
      • Python 3.12.3

並列処理とは

そもそも並列処理って何?という話から.通常pythonで書いたコードは上から順番に逐次処理されていきます.しかし,並列処理だと複数の処理が同時並列で処理されるようになります.言葉だけだと分かりにくいと思うので簡単に図にしてみます.

pythonの処理はスレッドの上で動いています.通常はメインスレッド1つで動いていますが,threadingモジュールを使うことで,このスレッドを増やして複数の処理を並列で動かすことができます.

threadingモジュール

Threadオブジェクト

スレッドを表すオブジェクトです.このオブジェクトに呼び出し可能オブジェクトを登録すると別スレッドで処理を行うことができます.また別の方法として,このThreadオブジェクトを継承したクラスでrunメソッドをオーバーライドする方法もあります.

呼び出し可能オブジェクトを登録する方法

import threading
import time


def taskA():
    print('Task A')
    time.sleep(2)
    print('Task A done')

def taskB():
    print('Task B')
    time.sleep(2)
    print('Task B done')


if __name__ == '__main__':
    t1 = threading.Thread(target=taskA)
    t2 = threading.Thread(target=taskB)
    t1.start()
    t2.start()

実行結果

Task A
Task B
Task A done
Task B done

コードの説明

threading.Thread(target=taskA)でスレッドの作成を行います.target=の部分には呼び出し可能オブジェクトを登録する必要があるため,関数の場合は() を省いて書きます.また,スレッドは作成しただけでは実行されないため,.start()で処理を開始させる必要があります.

実行結果の通りtaskA関数とtaskB関数がそれぞれ別のスレッドで同時に処理されていることがわかります.

呼び出し可能オブジェクトに引数を渡す

スレッド上で動かす関数に引数を与えたい場合があると思います.そのときは次のようにします.

import threading
import time


def taskA(msg1, msg2, msg3, msg4=None):
    print('Task A')
    time.sleep(2)
    print(msg1, msg2, msg3)
    print(msg4)
    print('Task A done')


if __name__ == '__main__':
    t1 = threading.Thread(
        target=taskA, 
        args=('Hello', 'from', 'Task A'),
        kwargs={'msg4': 'Hello from Task A'})
    t1.start()

実行結果

Task A
Hello from Task A
Hello from Task A
Task A done

引数はargsにタプルの形で渡します.キーワード引数はkwargsに辞書型で渡します.注意点として,argsはタプルである必要があるため,要素が一つでも末尾に,が必要です.

Threadオブジェクトを継承したクラスでrunメソッドをオーバーライドする

別スレッドでの処理は,Threadオブジェクトを継承したクラスを作ることでも実現できます.

from threading import Thread
import time


class TaskA(Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        print('Task A')
        time.sleep(2)
        print('Task A done')

class TaskB(Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        print('Task B')
        time.sleep(2)
        print('Task B done')


if __name__ == '__main__':
    t1 = TaskA()
    t2 = TaskB()
    t1.start()
    t2.start()

実行結果

Task A
Task B
Task B done
Task A done

具体的には,runメソッドをオーバーライドして,そこに別スレッドで実行する処理を記述します.この方法を利用する際には注意点が2つあります.1つはコンストラクタとrunメソッド以外をオーバーライドしてはいけないということ.もう1つはコンストラクタ内で継承元(Threadオブジェクト)のコンストラクタを呼び出さないといけないことです.

threading.join でスレッドの終了を待つ

別のスレッドで行っている処理が終わってから処理を行いたい.という場合に使用するメソッドです.joinメソッドを使うと,そのスレッドの処理が終わるまでブロックしてくれます.

import threading
import time


def taskA():
    print('Task A')
    time.sleep(2)
    print('Task A done')

def taskB():
    print('Task B')
    time.sleep(2)
    print('Task B done')


if __name__ == '__main__':
    t1 = threading.Thread(target=taskA)
    t2 = threading.Thread(target=taskB)
    t1.start()
    t1.join()
    t2.start()

実行結果

Task A
Task A done
Task B
Task B done

実行結果から,今までと違いt1.start()の後t1.join()でスレッドの処理が完了するのを待つため,t2スレッドがスタートせずTaskAの終了後にTaskBが実行されていること事がわかります.

変数の共有

各スレッドで動いている関数間では,threading を使用しないプログラムと同様にグローバル変数を参照することができます.

import threading
import time


msg = 'Hello!!'

def taskA():
    print('Task A')
    time.sleep(2)
    print(msg)
    print('Task A done')

def taskB():
    print('Task B')
    time.sleep(2)
    print(msg)
    print('Task B done')


if __name__ == '__main__':
    t1 = threading.Thread(target=taskA)
    t2 = threading.Thread(target=taskB)
    t1.start()
    t2.start()

実行結果

Task A
Task B
Hello!!
Task B done
Hello!!
Task A done

もちろん書き換えることもできます.

import threading
import time


msg = 'Hello!!'

def taskA():
    global msg
    print('Task A')
    time.sleep(2)
    print(msg)
    msg = 'Hello from Task A'
    print('Task A done')

def taskB():
    global msg
    print('Task B')
    time.sleep(2)
    print(msg)
    msg = 'Hello from Task B'
    print('Task B done')


if __name__ == '__main__':
    t1 = threading.Thread(target=taskA)
    t2 = threading.Thread(target=taskB)
    t1.start()
    t2.start()

実行結果

Task A
Task B
Hello!!
Task B done
Hello from Task B
Task A done

Lock

threading.Lockオブジェクトでプリミティブロックを使用することができます.プリミティブロックとは,ロックが生じた際に特定のスレッドによって所有されない同期プリミティブです,

プリミティブロックには「ロック」と「アンロック」の2つの状態があります.また,メソッドとして,ロックを獲得するacquire()release()があります.スレッドがacquire()メソッドを利用して,「ロック」を獲得すると,「ロック」を獲得したスレッド以外は解放されるまでロックの獲得ができなくなります.

このロックを使うことで,複数のスレッドから同時にいじられると困る処理を排他的に行うことができます.この辺は排他制御とかでググればもっと詳細な説明が出てくると思います.

プリミティブロックを使用する

import threading
import time

def taskA(lock):
    lock.acquire()
    print('Task A')
    time.sleep(2)
    print('Task A done')
    lock.release()

def taskB(lock):
    lock.acquire()
    print('Task B')
    time.sleep(2)
    print('Task B done')
    lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    t1 = threading.Thread(target=taskA, args=(lock,))
    t2 = threading.Thread(target=taskB, args=(lock,))
    t1.start()
    t2.start()
    print('Main done')

実行結果

Task A
Main done
Task A done
Task B
Task B done

taskAtaskBはそれぞれ処理の際に共通のロックを獲得します.このコードではtaskAが先に実行されるため,taskAが先にロックを獲得し,taskBtaskA がロックをリリースするまで処理を実行できないまま止まります.Thread.join()を使用した場合と異なりメインスレッドをブロックするようなことはありません.

withを使用した方法

acquireメソッドやreleaseメソッドを使わなくてもwith文を使うと自動でロックの獲得と解放をやってくれます.わざわざreleaseを書くのは面倒(しかも忘れる)ので基本的にはこの方法でいいと思います.

import threading
import time


def taskA(lock):
    with lock:
        print('Task A')
        time.sleep(2)
        print('Task A done')

def taskB(lock):
    with lock:
        print('Task B')
        time.sleep(2)
        print('Task B done')


if __name__ == '__main__':
    lock = threading.Lock()
    t1 = threading.Thread(target=taskA, args=(lock,))
    t2 = threading.Thread(target=taskB, args=(lock,))
    t1.start()
    t2.start()
    print('Main done')

Event

Eventオブジェクトを使うことで,スレッド間で簡易的な通信を行うことができます.Eventオブジェクトはフラグのようなもので,setメソッドでTrueに,clearメソッドでFalseになります.フラグを確認するときはis_setメソッドを使用します.
イベントが発生するまで待つような処理を書くときはwaitメソッドを呼ぶことで,setされるまでブロックしてくれます.

イベントの発生を待つ

import threading
import time


def taskA(event):
    print('Task A')
    if not event.is_set():
        print('Task A > waiting for event')
        event.wait()
    print('Task A done')

def taskB(event):
    print('Task B')
    time.sleep(2)
    print('Task B done')
    print('Task B > setting event')
    event.set()


if __name__ == '__main__':
    event = threading.Event()
    t1 = threading.Thread(target=taskA, args=(event,))
    t2 = threading.Thread(target=taskB, args=(event,))
    t1.start()
    t2.start()

実行結果

Task A
Task A > waiting for event
Task B
Task B done
Task B > setting event
Task A done

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?