こんにちは!アドベントカレンダーの枠が空いてたので,短い頻度での連投にはなりますが記事を上げさせてもらいます.普段からあまり文章を書かないので乱文になるとは思いますが,どうぞよろしくお願いします.
はじめに
この記事ではpythonで並列処理を行う方法について軽くまとめます.pythonで並列処理を行う方法には,threading
やmultiprocessing
,concurrent.future
などいくつかありますが,全てまとめると結構な量になるので,本記事ではthreadingモジュールの中でも基本的な部分だけを扱います.もっと詳しく知りたいという方はPythonの公式ドキュメントに目を通すといいかもしれません.
https://docs.python.org/ja/3.12/library/concurrency.html
環境
記事内のコードは次の環境で動作確認をしています
- デバイス MacBook Air (M2, 2022)
- OS:macOS Sequoia 15.1.1
- 実行環境:Docker Desktop 4.36.0
- コンテナ環境
- Ubuntu 24.04
- Python 3.12.3
- コンテナ環境
並列処理とは
そもそも並列処理って何?という話から.通常pythonで書いたコードは上から順番に逐次処理されていきます.しかし,並列処理だと複数の処理が同時並列で処理されるようになります.言葉だけだと分かりにくいと思うので簡単に図にしてみます.
pythonの処理はスレッドの上で動いています.通常はメインスレッド1つで動いていますが,threading
モジュールを使うことで,このスレッドを増やして複数の処理を並列で動かすことができます.
threadingモジュール
Threadオブジェクト
スレッドを表すオブジェクトです.このオブジェクトに呼び出し可能オブジェクトを登録すると別スレッドで処理を行うことができます.また別の方法として,このThreadオブジェクトを継承したクラスでrunメソッドをオーバーライドする方法もあります.
呼び出し可能オブジェクトを登録する方法
import threading
import time
def taskA():
print('Task A')
time.sleep(2)
print('Task A done')
def taskB():
print('Task B')
time.sleep(2)
print('Task B done')
if __name__ == '__main__':
t1 = threading.Thread(target=taskA)
t2 = threading.Thread(target=taskB)
t1.start()
t2.start()
実行結果
Task A
Task B
Task A done
Task B done
コードの説明
threading.Thread(target=taskA)
でスレッドの作成を行います.target=
の部分には呼び出し可能オブジェクトを登録する必要があるため,関数の場合は()
を省いて書きます.また,スレッドは作成しただけでは実行されないため,.start()
で処理を開始させる必要があります.
実行結果の通りtaskA
関数とtaskB
関数がそれぞれ別のスレッドで同時に処理されていることがわかります.
呼び出し可能オブジェクトに引数を渡す
スレッド上で動かす関数に引数を与えたい場合があると思います.そのときは次のようにします.
import threading
import time
def taskA(msg1, msg2, msg3, msg4=None):
print('Task A')
time.sleep(2)
print(msg1, msg2, msg3)
print(msg4)
print('Task A done')
if __name__ == '__main__':
t1 = threading.Thread(
target=taskA,
args=('Hello', 'from', 'Task A'),
kwargs={'msg4': 'Hello from Task A'})
t1.start()
実行結果
Task A
Hello from Task A
Hello from Task A
Task A done
引数はargs
にタプルの形で渡します.キーワード引数はkwargs
に辞書型で渡します.注意点として,args
はタプルである必要があるため,要素が一つでも末尾に,
が必要です.
Threadオブジェクトを継承したクラスでrunメソッドをオーバーライドする
別スレッドでの処理は,Thread
オブジェクトを継承したクラスを作ることでも実現できます.
from threading import Thread
import time
class TaskA(Thread):
def __init__(self):
super().__init__()
def run(self):
print('Task A')
time.sleep(2)
print('Task A done')
class TaskB(Thread):
def __init__(self):
super().__init__()
def run(self):
print('Task B')
time.sleep(2)
print('Task B done')
if __name__ == '__main__':
t1 = TaskA()
t2 = TaskB()
t1.start()
t2.start()
実行結果
Task A
Task B
Task B done
Task A done
具体的には,run
メソッドをオーバーライドして,そこに別スレッドで実行する処理を記述します.この方法を利用する際には注意点が2つあります.1つはコンストラクタとrun
メソッド以外をオーバーライドしてはいけないということ.もう1つはコンストラクタ内で継承元(Thread
オブジェクト)のコンストラクタを呼び出さないといけないことです.
threading.join
でスレッドの終了を待つ
別のスレッドで行っている処理が終わってから処理を行いたい.という場合に使用するメソッドです.join
メソッドを使うと,そのスレッドの処理が終わるまでブロックしてくれます.
import threading
import time
def taskA():
print('Task A')
time.sleep(2)
print('Task A done')
def taskB():
print('Task B')
time.sleep(2)
print('Task B done')
if __name__ == '__main__':
t1 = threading.Thread(target=taskA)
t2 = threading.Thread(target=taskB)
t1.start()
t1.join()
t2.start()
実行結果
Task A
Task A done
Task B
Task B done
実行結果から,今までと違いt1.start()
の後t1.join()
でスレッドの処理が完了するのを待つため,t2スレッドがスタートせずTaskA
の終了後にTaskB
が実行されていること事がわかります.
変数の共有
各スレッドで動いている関数間では,threading
を使用しないプログラムと同様にグローバル変数を参照することができます.
import threading
import time
msg = 'Hello!!'
def taskA():
print('Task A')
time.sleep(2)
print(msg)
print('Task A done')
def taskB():
print('Task B')
time.sleep(2)
print(msg)
print('Task B done')
if __name__ == '__main__':
t1 = threading.Thread(target=taskA)
t2 = threading.Thread(target=taskB)
t1.start()
t2.start()
実行結果
Task A
Task B
Hello!!
Task B done
Hello!!
Task A done
もちろん書き換えることもできます.
import threading
import time
msg = 'Hello!!'
def taskA():
global msg
print('Task A')
time.sleep(2)
print(msg)
msg = 'Hello from Task A'
print('Task A done')
def taskB():
global msg
print('Task B')
time.sleep(2)
print(msg)
msg = 'Hello from Task B'
print('Task B done')
if __name__ == '__main__':
t1 = threading.Thread(target=taskA)
t2 = threading.Thread(target=taskB)
t1.start()
t2.start()
実行結果
Task A
Task B
Hello!!
Task B done
Hello from Task B
Task A done
Lock
threading.Lock
オブジェクトでプリミティブロックを使用することができます.プリミティブロックとは,ロックが生じた際に特定のスレッドによって所有されない同期プリミティブです,
プリミティブロックには「ロック」と「アンロック」の2つの状態があります.また,メソッドとして,ロックを獲得するacquire()
とrelease()
があります.スレッドがacquire()
メソッドを利用して,「ロック」を獲得すると,「ロック」を獲得したスレッド以外は解放されるまでロックの獲得ができなくなります.
このロックを使うことで,複数のスレッドから同時にいじられると困る処理を排他的に行うことができます.この辺は排他制御とかでググればもっと詳細な説明が出てくると思います.
プリミティブロックを使用する
import threading
import time
def taskA(lock):
lock.acquire()
print('Task A')
time.sleep(2)
print('Task A done')
lock.release()
def taskB(lock):
lock.acquire()
print('Task B')
time.sleep(2)
print('Task B done')
lock.release()
if __name__ == '__main__':
lock = threading.Lock()
t1 = threading.Thread(target=taskA, args=(lock,))
t2 = threading.Thread(target=taskB, args=(lock,))
t1.start()
t2.start()
print('Main done')
実行結果
Task A
Main done
Task A done
Task B
Task B done
taskA
とtaskB
はそれぞれ処理の際に共通のロックを獲得します.このコードではtaskA
が先に実行されるため,taskA
が先にロックを獲得し,taskB
はtaskA
がロックをリリースするまで処理を実行できないまま止まります.Thread.join()
を使用した場合と異なりメインスレッドをブロックするようなことはありません.
withを使用した方法
acquire
メソッドやrelease
メソッドを使わなくてもwith文を使うと自動でロックの獲得と解放をやってくれます.わざわざrelease
を書くのは面倒(しかも忘れる)ので基本的にはこの方法でいいと思います.
import threading
import time
def taskA(lock):
with lock:
print('Task A')
time.sleep(2)
print('Task A done')
def taskB(lock):
with lock:
print('Task B')
time.sleep(2)
print('Task B done')
if __name__ == '__main__':
lock = threading.Lock()
t1 = threading.Thread(target=taskA, args=(lock,))
t2 = threading.Thread(target=taskB, args=(lock,))
t1.start()
t2.start()
print('Main done')
Event
Event
オブジェクトを使うことで,スレッド間で簡易的な通信を行うことができます.Event
オブジェクトはフラグのようなもので,set
メソッドでTrue
に,clear
メソッドでFalse
になります.フラグを確認するときはis_set
メソッドを使用します.
イベントが発生するまで待つような処理を書くときはwait
メソッドを呼ぶことで,set
されるまでブロックしてくれます.
イベントの発生を待つ
import threading
import time
def taskA(event):
print('Task A')
if not event.is_set():
print('Task A > waiting for event')
event.wait()
print('Task A done')
def taskB(event):
print('Task B')
time.sleep(2)
print('Task B done')
print('Task B > setting event')
event.set()
if __name__ == '__main__':
event = threading.Event()
t1 = threading.Thread(target=taskA, args=(event,))
t2 = threading.Thread(target=taskB, args=(event,))
t1.start()
t2.start()
実行結果
Task A
Task A > waiting for event
Task B
Task B done
Task B > setting event
Task A done
参考