Help us understand the problem. What is going on with this article?

Pythonでconcurrent.futuresを使った並列タスク実行

More than 3 years have passed since last update.

concurrent.futuresモジュールの概要

Python3.2で追加されたconcurrent.futuresモジュールは、複数の処理を並列実行するための機能を提供します。

Pythonには他にthreadingmultiprocessingというモジュールがありますが、これらが1つのスレッド・プロセスを扱うのに対して、concurrent.futuresモジュールは複数のスレッド・プロセスを扱うことを目的としています。

Executor

concurrent.futuresモジュールには抽象クラスとしてExecutorクラスがあり、実装クラスとして2つのクラスが提供されています。
並列タスクを実行するにはこの2つのうちどちらかを使用します。

  • ThreadPoolExecutor
    • スレッドを使って並列タスクを実行します。
    • ネットワークアクセスなどCPUに負荷がかからない処理の並列実行に適しています。
  • ProcessPoolExecutor
    • プロセスを使って並列タスクを実行します。
    • CPUに負荷がかかる計算処理などの並列実行に適しています。

max_workers

Executorのコンストラクタでは同時に実行可能なタスクの最大数を引数max_workersで指定します。
同時実行可能数より多いタスクの実行を要求すると、タスクはキューに追加されて他のタスクの終了を待ってから順次実行します。

タスクを実行するメソッドsubmitとmap

Executorには並列タスクを実行する以下のメソッドがあります。

  • submit
    • 1つのタスクを実行キューに追加します。
    • 実行中のタスクがmax_workers未満であれば追加されたタスクは即実行が開始されます。
    • 戻り値のFutureオブジェクトでタスクのキャンセルや実行結果の取得を行います。
  • map
    • 実行タスクをiteratorで渡します。
    • 戻り値はタスクの実行結果を取得するためのgeneratorです。

サンプル

concurrent.futuresを使ったサンプルです。Python3.6で書いてますが、3.2以降の環境なら少し直せば動かせると思います。

  • 01_thread.py
    • ThreadPoolExecutorを使ったシンプルな例
  • 02_future.py
    • submitの結果を受け取る例
  • 03_map.py
    • mapでタスクを一括追加/結果を一括取得する例
  • 04_process.py
    • ProcessPoolExecutorで重い処理を実行する例

ThreadPoolExecutorを使ったシンプルな例

01_thread.py
2つのスレッドで5つのタスクを実行します。タスクは1秒間スリープするだけの処理です。

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        for i in range(5):
            executor.submit(task, i)
        getLogger().info("submit end")
    getLogger().info("main end")
[2017-04-02 12:01:39,747] [MainThread] main start
[2017-04-02 12:01:39,748] [thread_0] 0 start
[2017-04-02 12:01:39,749] [thread_1] 1 start
[2017-04-02 12:01:39,750] [MainThread] submit end
[2017-04-02 12:01:40,755] [thread_0] 0 end
[2017-04-02 12:01:40,755] [thread_0] 2 start
[2017-04-02 12:01:40,756] [thread_1] 1 end
[2017-04-02 12:01:40,756] [thread_1] 3 start
[2017-04-02 12:01:41,761] [thread_0] 2 end
[2017-04-02 12:01:41,761] [thread_0] 4 start
[2017-04-02 12:01:41,761] [thread_1] 3 end
[2017-04-02 12:01:42,764] [thread_0] 4 end
[2017-04-02 12:01:42,765] [MainThread] main end

submitで結果を受け取る

02_future.py

submitの結果を受け取る例です。

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        futures = []
        for i in range(5):
            futures.append(executor.submit(task, i))
        getLogger().info("submit end")
        getLogger().info([f.result() for f in futures])
    getLogger().info("main end")
[2017-04-02 12:08:23,853] [MainThread] main start
[2017-04-02 12:08:23,854] [thread_0] 0 start
[2017-04-02 12:08:23,855] [thread_1] 1 start
[2017-04-02 12:08:23,856] [MainThread] submit end
[2017-04-02 12:08:24,856] [thread_0] 0 end
[2017-04-02 12:08:24,856] [thread_0] 2 start
[2017-04-02 12:08:24,857] [thread_1] 1 end
[2017-04-02 12:08:24,857] [thread_1] 3 start
[2017-04-02 12:08:25,863] [thread_0] 2 end
[2017-04-02 12:08:25,864] [thread_0] 4 start
[2017-04-02 12:08:25,864] [thread_1] 3 end
[2017-04-02 12:08:26,867] [thread_0] 4 end
[2017-04-02 12:08:26,868] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:08:26,868] [MainThread] main end

mapでタスクを一括追加/結果を一括取得する例

03_map.py
タスクを一括で処理する場合はsubmitよりmapを使った方が楽に書けます。

def task(v):
    getLogger().info("%s start", v)
    time.sleep(1.0)
    getLogger().info("%s end", v)
    return v * 2

def main():
    init_logger()
    getLogger().info("main start")
    with ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread") as executor:
        results = executor.map(task, range(5))
        getLogger().info("map end")
    getLogger().info(list(results))
    getLogger().info("main end")
[2017-04-02 12:10:03,997] [MainThread] main start
[2017-04-02 12:10:03,998] [thread_0] 0 start
[2017-04-02 12:10:04,000] [thread_1] 1 start
[2017-04-02 12:10:04,000] [MainThread] map end
[2017-04-02 12:10:05,005] [thread_0] 0 end
[2017-04-02 12:10:05,006] [thread_0] 2 start
[2017-04-02 12:10:05,006] [thread_1] 1 end
[2017-04-02 12:10:05,006] [thread_1] 3 start
[2017-04-02 12:10:06,007] [thread_0] 2 end
[2017-04-02 12:10:06,007] [thread_0] 4 start
[2017-04-02 12:10:06,007] [thread_1] 3 end
[2017-04-02 12:10:07,014] [thread_0] 4 end
[2017-04-02 12:10:07,014] [MainThread] [0, 2, 4, 6, 8]
[2017-04-02 12:10:07,014] [MainThread] main end

ProcessPoolExecutorで重い処理を実行する例

04_process.py
最後にProcessPoolExecutorを使った例です。
パラメータを変えてパフォーマンスの違いを見てみます。

def task(params):
    (v, num_calc) = params
    a = float(v)
    for _ in range(num_calc):
        a = pow(a, a)
    return a

def main():
    init_logger()

    if len(sys.argv) != 5:
        print("usage: 05_process.py max_workers chunk_size num_tasks num_calc")
        sys.exit(1)
    (max_workers, chunk_size, num_tasks, num_calc) = map(int, sys.argv[1:])

    start = time()

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        params = map(lambda _: (random(), num_calc), range(num_tasks))
        results = executor.map(task, params, chunksize=chunk_size)
    getLogger().info(sum(results))

    getLogger().info("{:.3f}".format(time() - start))

パラメータの説明

  • max_workers
    • 同時に実行する最大プロセス数
  • chunk_size
    • プロセスに一度に渡すタスク数?
  • num_tasks
    • タスクの実行回数
  • num_calc
    • 1タスク内で実行する計算の回数

実行環境

  • CPU:Intel Core i5(2.6 GHz)
  • コア数:2

実行結果

上の4つが大きいタスク、下が小さいタスクを大量に実行した結果です。
小さいタスクを大量に実行する場合にはchunk_sizeの指定が重要なようです。

max_workers chunk_size num_tasks num_calc 実行時間(sec)
1 1 100 100,000 1.954
2 1 100 100,000 1.042
1 10 100 100,000 1.922
2 10 100 100,000 1.071
1 1 10,000 1,000 3.295
2 1 10,000 1,000 3.423
1 10 10,000 1,000 2.272
2 10 10,000 1,000 1.279
1 100 10,000 1,000 2.126
2 100 10,000 1,000 1.090
tag1216
Qiita戦闘力はキュイレベルです! /作ったもの ◆QiiTrend:https://qiitrend.herokuapp.com/ ◆Qiiner:https://qiiner.tag1216.net/ ◆Qiitaでいいねしたら草生えるページ:https://qiiner.tag1216.net/likes-heatmap
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away