はじめに
Pythonにおけるマルチスレッド処理について調べて学んだことをまとめました。
Python初学者の参考になれば幸いです。
マルチスレッド処理とは
複数の処理を並行して走らせることです。IOバウンドな処理の効率化が見込めます。
CPUバウンドな処理の効率化については、マルチスレッドではなくマルチプロセスが適しています。
マルチスレッドとマルチプロセスの違いについては、こちらの記事が分かりやすかったので、適宜参照してください。
マルチスレッド処理を実現するPython標準ライブラリ
Pythonでは、マルチスレッド処理を実現するための標準ライブラリがいくつか用意されています。
threading
Python2以前からある、マルチスレッドを実現するための標準ライブラリです。
公式ドキュメント
使い方
import threading
def func():
while True:
print("子スレッド")
if __name__ == "__main__":
thread = threading.Thread(target=func) # 処理を割り当てる
thread.start() # スレッドを起動する
print("親スレッド")
実行結果
子スレッド
親スレッド # 親スレッドの処理は継続される
子スレッド
子スレッド
子スレッド
... # "子スレッド"という出力が続く
無限ループ処理は親スレッドとは別のスレッドで行われるため、スレッドを起動して処理を割り当てた後は、親スレッドの処理が継続されます。
concurrent.futures.ThreadPoolExecutor
Python3.2で登場。
スレッドをあらかじめ複数起動しておき(=スレッドプール)、マルチスレッド処理が必要な場面で、既に起動済みのスレッドへ処理を割り当てるので、処理が実行されるまでが早いです。
公式ドキュメント
concurrent.futures.ThreadPoolExecutor
クラスのインスタンスを生成する際に、引数で同時実行スレッド数を指定できます。
使い方
import concurrent.futures
def func():
while True:
print("子スレッド")
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(1) # 指定した数のスレッドを常時起動しておく
executor.submit(func) # 起動済みのスレッドに処理を割り当てる
print("親スレッド")
実行結果
子スレッド
子スレッド
親スレッド
子スレッド
子スレッド
子スレッド
... # "子スレッド"という出力が続く
スレッドプールで起動中の全スレッドが処理を実行している(=新規に処理を割り当て可能なスレッドがない)タイミングで新しい処理をsubmit()
すると、スレッドが1つ以上解放されるまで、処理は順番待ちになります。
処理の待機順の管理、及び然るべきタイミングでの実行はconcurrent.futures.ThreadPoolExecutor
インスタンスがよしなにしてくれるので、親スレッドはsubmit()
だけして、その後の処理を継続することができます。
_thread
threadingに比べ低水準なスレッドAPI(threadingの下位互換)です。古いPythonバージョンだとこのモジュールしかなかったりします。
公式ドキュメント
子スレッド内で発生した例外のハンドリング
子スレッド内で例外が発生した場合、その例外が親スレッドへraise
されることはありません。
例えば、
import concurrent.futures
def func():
raise Exception("子スレッド内で例外が発生しました")
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(1)
try:
executor.submit(func)
except Exception as e:
print(e)
print("親スレッド")
と書いても、実行結果は
親スレッド
となり、try-except
節で例外をキャッチできません。
ではどうすれば、子スレッド内で発生した例外をハンドリングできるでしょうか。
親スレッドでハンドリングする
concurrent.futures.ThreadPoolExecutor.submit()
の戻り値であるconcurrent.futures.Futureオブジェクトは、submit()
したスレッドの状態(処理中、処理完了、例外が発生したかどうかなど)を保持しています。
Future
オブジェクトには以下のような関数が備わっています。
Future.result() # スレッドに割り当てた関数が値を返した場合、その値を返す/例外が発生した場合、その例外をraiseする
Future.canncelled() # スレッドがキャンセルされた場合、Trueを返す
Future.running() # スレッドが実行中の場合、Trueを返す
Future.done() # スレッドがキャンセルされたか終了した場合、Trueを返す
Future.exception() # スレッド内で例外が発生した場合、例外そのものを返す
Future.exception()
を使うと、以下のように子スレッド内で発生した例外を親スレッドで取得することができます。
import concurrent.futures
def func():
raise Exception("子スレッド内で例外が発生しました")
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(1)
future = executor.submit(func) # submit()はFutureオブジェクトを返す
try:
exception = future.exception() # 例外が発生しなかった場合はNoneを返す
if exception is not None:
raise exception
except Exception as e:
print(e)
print("親スレッド")
実行結果
子スレッド内で例外が発生しました
親スレッド
exception()
が例外 or None
を返すまで、親スレッドの処理はexception()
を呼び出した箇所でブロックされます。
親スレッドの処理をブロックしたくない場合は、以下のように子スレッド内で例外ハンドリングを完結させればよいです。
子スレッド内でハンドリングする
子スレッド内で例外が発生した旨を親スレッドに知らせる必要がない場合は、子スレッド内で発生した例外を全て子スレッド内でハンドリングすればよいです。
具体的には、子スレッドで実行する関数の内部を丸ごとtry-except
節で囲んでしまいます。
import concurrent.futures
def func():
# 関数内を丸ごとtry-exceptで囲む
try:
raise Exception("子スレッド内で例外が発生しました")
except Exception as e:
print(e)
# 必要に応じてスタックトレースの出力やメール通知など適切なハンドリングを行う
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(1)
executor.submit(func)
print("親スレッド")
実行結果
子スレッド内で例外が発生しました
親スレッド
当たり前ですが、子スレッド内で発生した例外を子スレッド内で拾うことができました。
スレッドセーフ
複数のスレッドが同じ処理を同時に行うと嬉しくない場合があります。
例えば以下のような場合です。
- 自スレッドが特定の変数にアクセスしている間は、他スレッドにその変数の値を書き換えてほしくない
- あるネットワークホストに対して、複数スレッドからの同時接続による負荷をかけたくない
このような場合に「処理をマルチスレッドで実行しても問題が発生しないような設計になっていること」を、スレッドセーフと言います。
Pythonでスレッドセーフを実現できるのが、threading.Lock
オブジェクトです。
使い方
import concurrent.futures
def func(lock):
with lock:
# スレッドセーフに行いたい処理
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(2) # 複数のスレッドを立ち上げる
lock = threading.Lock() # threading.Lockオブジェクトのインスタンスを1つ生成する
# 複数スレッドで同時に同じ処理を行う
executor.submit(func, lock)
executor.submit(func, lock)
with <threading.Lockインスタンス>
とすることで、あるスレッドがwith
文内の処理を実行している間は、他スレッドがwith
文内の処理を実行するのをブロックできます。
スレッドセーフに処理を行いたい箇所では、全てのスレッドが同じ1つのthreading.Lock
インスタンスを参照する必要があります。