Pythonプログラムで、扱うファイル件数が多く、
バッチの実行にあまりに時間がかかってしまったので、並列処理化をしました。
並列処理化に至った経緯としては、
ローカル端末の環境から、ハイスペックのサーバーの環境へ変えて実行してみても、実行速度がほとんど変わりませんでした。
そこで、サーバーのメモリを確認してみたところ、メモリの占有率がほとんどあがっていませんでした。
また、今回の実行環境としてはWindowsサーバーだったのですが、
Linux環境だとメモリの上限設定をPythonコード上から設定できるようなのですが、
Windows環境ではメモリの上限設定をする方法がないようでした。
また、Pythonでは、CPUバウンドな処理はGILによる排他制御のために、シングルスレッドでの実行となるため、
処理の高速化をするにはマルチプロセス化以外の選択がないと思われたことから、
処理の並列化を実装することとしました。
マルチプロセス化をすることで、CPUの数だけ並列でプロセスを実行できるので相当な高速化が期待できます。
ただし、処理の高速化を試みるにあたり、
実際はボトルネックに対するチューニングが優先事項として高いので、
安易に処理を並列化することは注意が必要になります。
また、以下ソースの補足になりますが、
通常、サブプロセス内でエラーが発生時には、親プロセス側の処理が走り続けてしまいます。
そこで、process.join()
で処理同期時に、サブプロセスがエラーコード1を返した場合には、
親プロセス側でもエラーをスローするようにして回避しています。
・コメントをいただきました。今回は、multiprocessingを用いて実装しましたが、concurrent.futureなども良さそうです。
import glob
import multiprocessing
def process_hoge(input_file):
"""
並列処理で実行する処理
"""
for i, file in enumerate(input_file):
# ファイル処理をごちゃごちゃ
print(f"finished chunks: {i+1} / {len(iuput_file)}")
# 処理対象のファイル群
input_files = glob.glob("*.csv")
num_process = multiprocessing.cpu_count()
num_files = len(input_files)
if num_files < num_process:
num_process = num_files
# プロセスごとに分割
chunk_size = num_files // num_process
if chunk_size == 0:
chuk_size = 1
chunks = [(i * chunk_size, (i + 1) * chunk_size) for i range(num_process)]
chunks[-1] = (chunks[-1][0], num_files)
process = []
for c in chunks:
file = input_files[c[0]:c[1]]
# 並列処理で処理を実行
process = multiprocessing.Process(target=process_hoge, args=(file,))
processes.append(process)
process.start()
for process in processes:
process.join()
if process.exitcode == 1
raise Exception("Error catched in process_hoge")