concurrent.futuresモジュールの概要
Python3.2で追加されたconcurrent.futuresモジュールは、複数の処理を並列実行するための機能を提供します。
Pythonには他にthreadingとmultiprocessingというモジュールがありますが、これらが1つのスレッド・プロセスを扱うのに対して、concurrent.futuresモジュールは複数のスレッド・プロセスを扱うことを目的としています。
Executor
concurrent.futuresモジュールには抽象クラスとしてExecutorクラスがあり、実装クラスとして2つのクラスが提供されています。
並列タスクを実行するにはこの2つのうちどちらかを使用します。
-
ThreadPoolExecutor
- スレッドを使って並列タスクを実行します。
- ネットワークアクセスなどCPUに負荷がかからない処理の並列実行に適しています。
-
ProcessPoolExecutor
- プロセスを使って並列タスクを実行します。
- CPUに負荷がかかる計算処理などの並列実行に適しています。
max_workers
Executorのコンストラクタでは同時に実行可能なタスクの最大数を引数max_workersで指定します。
同時実行可能数より多いタスクの実行を要求すると、タスクはキューに追加されて他のタスクの終了を待ってから順次実行します。
タスクを実行するメソッドsubmitとmap
Executorには並列タスクを実行する以下のメソッドがあります。
-
submit
- 1つのタスクを実行キューに追加します。
- 実行中のタスクがmax_workers未満であれば追加されたタスクは即実行が開始されます。
- 戻り値のFutureオブジェクトでタスクのキャンセルや実行結果の取得を行います。
-
map
- 実行タスクをiteratorで渡します。
- 戻り値はタスクの実行結果を取得するためのgeneratorです。
サンプル
concurrent.futuresを使ったサンプルです。Python3.6で書いてますが、3.2以降の環境なら少し直せば動かせると思います。
-
01_thread.py
- ThreadPoolExecutorを使ったシンプルな例
-
02_future.py
- submitの結果を受け取る例
-
03_map.py
- mapでタスクを一括追加/結果を一括取得する例
-
04_process.py
- ProcessPoolExecutorで重い処理を実行する例
ThreadPoolExecutorを使ったシンプルな例
01_thread.py
2つのスレッドで5つのタスクを実行します。タスクは1秒間スリープするだけの処理です。
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
for i in range(5):
executor.submit(task, i)
getLogger().info("submit end")
getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end
submitで結果を受け取る
submitの結果を受け取る例です。
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
futures = []
for i in range(5):
futures.append(executor.submit(task, i))
getLogger().info("submit end")
getLogger().info([f.result() for f in futures])
getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end
mapでタスクを一括追加/結果を一括取得する例
03_map.py
タスクを一括で処理する場合はsubmitよりmapを使った方が楽に書けます。
def task(v):
getLogger().info("%s start", v)
time.sleep(1.0)
getLogger().info("%s end", v)
return v * 2
def main():
init_logger()
getLogger().info("main start")
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
results = executor.map(task, range(5))
getLogger().info("map end")
getLogger().info(list(results))
getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end
ProcessPoolExecutorで重い処理を実行する例
04_process.py
最後にProcessPoolExecutorを使った例です。
パラメータを変えてパフォーマンスの違いを見てみます。
def task(params):
(v, num_calc) = params
a = float(v)
for _ in range(num_calc):
a = pow(a, a)
return a
def main():
init_logger()
if len(sys.argv) != 5:
print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
sys.exit(1)
(max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])
start = time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
params = map(lambda _: (random(), num_calc), range(num_tasks))
results = executor.map(task, params, chunksize=chunk_size)
getLogger().info(sum(results))
getLogger().info("{:.3f}".format(time() - start))
パラメータの説明
- max_workers
- 同時に実行する最大プロセス数
- chunk_size
- プロセスに一度に渡すタスク数?
- num_tasks
- タスクの実行回数
- num_calc
- 1タスク内で実行する計算の回数
実行環境
- CPU:Intel Core i5(2.6 GHz)
- コア数:2
実行結果
上の4つが大きいタスク、下が小さいタスクを大量に実行した結果です。
小さいタスクを大量に実行する場合にはchunk_sizeの指定が重要なようです。
max_workers | chunk_size | num_tasks | num_calc | 実行時間(sec) |
---|---|---|---|---|
1 | 1 | 100 | 100,000 | 1.954 |
2 | 1 | 100 | 100,000 | 1.042 |
1 | 10 | 100 | 100,000 | 1.922 |
2 | 10 | 100 | 100,000 | 1.071 |
1 | 1 | 10,000 | 1,000 | 3.295 |
2 | 1 | 10,000 | 1,000 | 3.423 |
1 | 10 | 10,000 | 1,000 | 2.272 |
2 | 10 | 10,000 | 1,000 | 1.279 |
1 | 100 | 10,000 | 1,000 | 2.126 |
2 | 100 | 10,000 | 1,000 | 1.090 |