1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pythonの並列処理multiprocessingによる処理時間の短縮を計測

Last updated at Posted at 2021-12-25

はじめに

この記事ではPythonの標準機能であるmultiprocessingを実際に動作させて、その効果を確認する。
また、並列処理したい関数は複数の引数を持つことが多いと思うので、複数引数を扱うサンプルコードを作成した。

ドキュメントに記載の例で簡単に動作を試すことができるが、処理時間の短縮が確認しづらかったため簡単な処理サンプルを作成して処理速度の向上を測ることにする。

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

# 標準出力に以下が出力されます:
[1, 4, 9]

補足

2乗にする例では、おそらくmultiprocessingを処理するオーバーヘッドのため、以下サンプルより処理速度が遅くなる。
処理対象を10,000,000にして比較すると、multiprocessingの処理時間が約2倍になった。

[x * x for x in list(range(10000000))]

検証

動作環境

Python 3.6.9

処理サンプルの内容

対象ファイルから指定の文字列がある行番号を探す。
サンプルでは、1行ごとにReadさせて探索させる。

from datetime import datetime

def search_specified_id(file_name: str, target_id: str) -> list:
    print(f"{target_id}: {datetime.now()}")
    with open(file_name, "r") as tf:
        out = []
        count = 0
        for line in tf.readlines():
            count += 1
            parse_data = csv.reader([line], delimiter=",")
            for item in parse_data:
                if item[1] == target_id:
                    out.append(count)
    print(f"{target_id}: {datetime.now()} e")
    return out

検証データ作成

探索する英字を含むCSVファイルを作成する。

import string
import csv
ROW_NUM = 1_000_000
LETTERS = string.ascii_letters // a-zA-Z

with open("target.csv", "w", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["no", "item"])
    for i in range(ROW_NUM):
        writer.writerow([i, LETTERS[i % 52]])
"""
出力イメージ
no,item
0,a
1,b
2,c
...
"""

実行:直列処理(非並列)

対象の英字(49文字とする)を直列処理する。
処理結果として、各英字が存在するファイル行番号のListが格納される。

res_list = [search_specified_id("target.csv", LETTERS[i]) for i in range(49)]

実行:並列処理

直列処理と同じ処理内容を並列処理化する。
処理サンプルのコードで処理の開始と終了のタイミングをPrintをしており、4つのプロセスが同時に実行され次々と処理が進む様子が確認できる。

from multiprocessing import Pool

with Pool(processes=4) as p:
    res_list = p.starmap(
        func=search_specified_id,
        iterable=[("target.csv", LETTERS[i]) for i in range(49)],
    )
  • Poolのワーカープロセス数を省略すると、os.cpu_count()が設定される。
  • 複数の引数を持つ関数を並列処理する場合、starmap()を使用して引数をList形式のタプルで渡す。

処理時間

ファイル行数を3ケースに変更して、処理時間(秒)を測定した。
4並列処理化で処理時間がおよそ半分になることを確認できた。

ファイル行数 直列処理 並列処理 比率(%)
100K 12 7 58
1M 132 62 47
3M 462 208 45

まとめ

それほど記述量を増やすことなく並列処理を実現でき、処理時間の短縮を確認できた。
並列化によって処理時間の短縮を確認できたが、並列処理時にCPU使用率が90%近くになりノートPCの処理にラグが起こっていたので、PC性能(CPU、コア数など)を上げることで処理時間の短縮比率を上げられそう。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?