Pythonでのマルチプロセス実行では、メモリ内にプロセスとスレッドが割り当てられ、OSからの命令によって、CPU内のコアで実際の処理が行われます。
1. 厳密なプロセスの固定実行
以下に、Pythonプログラム内のメソッドAとメソッドBを、Daskを使って物理的に異なる2つのプロセスへ確実に送り込んで処理を行うコード例を示します。
from dask.distributed import Client, LocalCluster
import os
def method_A(x):
return f"Method A (PID {os.getpid()}) processed: {x}"
def method_B(y):
return f"Method B (PID {os.getpid()}) processed: {y}"
if __name__ == "__main__":
# 2つのプロセスを起動
cluster = LocalCluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
# 各Workerのアドレス(識別子)を取得
workers = list(client.scheduler_info()['workers'].keys())
worker_1 = workers[0]
worker_2 = workers[1]
# それぞれ別のプロセスへ任務を投げ分ける
# allow_other_workers=False で「絶対に他へ行かない」よう強制
future_a = client.submit(method_A, 10, workers=worker_1, allow_other_workers=False)
future_b = client.submit(method_B, 20, workers=worker_2, allow_other_workers=False)
# 結果を回収
results = client.gather([future_a, future_b])
print(results)
2. 実運用でのポイント:データの依存関係がない場合
「別の処理」である場合、以下の2点を確認しておくと効率が上がります。
① 起動時のオーバーヘッド
Daskは「分散処理」を前提としているため、プロセスを立ち上げて通信路を確保するのに数秒のオーバーヘッドがかかります。もし、「1秒で終わる処理を1回だけやりたい」のであれば、Daskを使わず標準の multiprocessing を使ったほうが速いです。 逆に、「重い処理を何度も投げる」「プロセスを常駐させておきたい」という場合はDaskが圧倒的に便利です。
② メモリの独立性
プロセスを分けているため、Method Aで大きな変数を作っても、Method Bのプロセスには一切影響(メモリ消費)を与えません。これにより、一方の処理がメモリ不足でクラッシュしても、もう一方のプロセスを道連れにするリスクを下げられます。
3. もし「ずっと動かし続けたい」なら?
もしこれらのメソッドが「サーバーの待ち受け」や「監視」のようにずっと動かし続けるものなら、fire_and_forget という機能が使えます。
from dask.distributed import fire_and_forget
# これを実行すると、メインのプログラムが次に進んでも、
# Worker上ではバックグラウンドで処理が走り続けます
fire_and_forget(future_a)
fire_and_forget(future_b)
