1. ThreadPoolExecutor
の基本の使い方
まずは、ThreadPoolExecutor
を使って複数のスレッドを同時に動かす基本的な方法を見てみましょう。
Figure - Single-threaded and multithreaded processes
例: 基本の使い方
from concurrent.futures import ThreadPoolExecutor
import time
def func_1():
for n in range(3):
time.sleep(1)
print(f"func_1 - {n}")
return "result 1"
def func_2(x, y):
for n in range(3):
time.sleep(1)
print(f"func_2 - {n} ({x}, {y})")
return "result 2"
def main():
print("start")
# スレッドプールを作成し、最大4つのスレッドを使って並行処理を実行
with ThreadPoolExecutor(max_workers=4) as executor:
# 各関数をスレッドで非同期に実行
future_1 = executor.submit(func_1) # func_1を実行
future_2 = executor.submit(func_2, "A", "B") # func_2を"A", "B"の引数で実行
# スレッドの実行結果を取得
result_1 = future_1.result()
result_2 = future_2.result()
# 結果を出力
print(f"result 1 : {result_1}")
print(f"result 2 : {result_2}")
print("end")
if __name__ == "__main__":
main()
解説
-
max_workers=4
で最大4つのスレッドが並行して動きます。 -
submit()
を使うことで、指定した関数をスレッドで非同期実行します。 -
result()
メソッドを使うと、各スレッドの戻り値を取得できます。
2. submit()
を使うマルチスレッドの実行
submit()
は、個別のタスクをスレッドで非同期に実行するためのメソッドです。タスクごとに Future
オブジェクトが返され、処理の完了後に結果を取得できます。
例: submit()
を使った実行
from concurrent.futures import ThreadPoolExecutor
import time
def func_1(x):
for n in range(3):
time.sleep(1)
print(f"func_1 - {n} ({x})")
def main():
print("start")
# スレッドプールを作成し、最大4つのスレッドで並行処理を実行
with ThreadPoolExecutor(max_workers=4) as executor:
# "A", "B", "C", "D"の引数でfunc_1を並行実行
for arg in ["A", "B", "C", "D"]:
executor.submit(func_1, arg) # 各引数に対してfunc_1を非同期実行
print("end")
if __name__ == "__main__":
main()
解説
-
submit()
メソッドは各タスクを非同期で並行実行します。 - ここでは、
func_1
が "A", "B", "C", "D" という4つの引数でスレッドで実行されます。 -
submit()
はすべてのタスクを並行に動かすため、同時に実行されます。
3. map()
を使うマルチスレッドの実行
map()
は、複数のタスクを並行実行するために使うメソッドです。map()
はリストの要素ごとに関数を並行実行し、各関数の実行結果を一つのリストにまとめます。
例: map()
を使った実行
from concurrent.futures import ThreadPoolExecutor
import time
def func_1(x):
for n in range(3):
time.sleep(1)
print(f"func_1 - {n} ({x})")
return f"result - {x}"
def main():
print("start")
# スレッドプールを作成し、最大4つのスレッドで並行処理を実行
with ThreadPoolExecutor(max_workers=4) as executor:
# mapを使用して、["A", "B", "C", "D"]の各引数に対してfunc_1を並行実行
results = executor.map(func_1, ["A", "B", "C", "D"])
# mapの結果をリストに変換して出力
print(list(results))
print("end")
if __name__ == "__main__":
main()
解説
-
map()
はリストの要素ごとに指定した関数を並行実行します。 -
map()
の返り値は、各関数の戻り値を含むリストです。 - 例では、["A", "B", "C", "D"] それぞれが引数として
func_1
に渡され、並行に実行されます。
4. submit()
と map()
の違い
-
submit()
: 各タスクごとに非同期実行し、Future
オブジェクトが返されます。個別のタスクの結果を後で取得する場合に使います。 -
map()
: リストなどの反復可能なオブジェクトを一度に渡し、各タスクを並行実行し、すべての結果を一つのリストにまとめて返します。
5. concurrent.futures
での Lock()
の利用
concurrent.futures
の基本的な使い方では Lock()
は不要ですが、複数のスレッドで同じ変数にアクセスする場合、データ競合を防ぐために Lock()
が必要になります。concurrent.futures
自体はスレッド間の同期処理を直接サポートしませんが、必要に応じて threading.Lock()
を併用してデータの整合性を保つことができます。
参考資料
並列処理の基本を解説!マルチスレッド・マルチプロセスをconcurrent futuresで実装!
https://youtu.be/et-cDFbVkQw?si=6hdFZ3kWoGlYNorr