通常のPythonの「for row in df.iterrows():」のようなループをそのまま書いた場合、Daskを使っても並列化されず、むしろ非常に遅くなります。
Daskでループ処理を並列化するには、「Daskが並列管理できる形」に書き換える必要があります。主な3つのパターンを解説します。
1. 最推奨:map_partitions を使う
「行」ごとのループではなく、「データの塊(パーティション)」ごとのループとして処理します。これなら、各パーティションが別々のCPUコア(プロセス)で同時に実行されます。
import dask.dataframe as dd
from dask.distributed import Client
import multiprocessing
# 並列実行したい関数は、外側に定義しておく
# 例:各行に対して複雑な計算ループを行いたい場合
def my_custom_loop(pdf):
# pdf は「通常の Pandas DataFrame」として渡される
results = []
for row in pdf.itertuples():
# ここに重い処理を書く
results.append(row.age * 2)
pdf['new_col'] = results
return pdf
def main():
# 1. クライアントの起動(メモリ制限を考慮)
# 物理コア数に合わせて調整
cores = multiprocessing.cpu_count()
client = Client(n_workers=cores, threads_per_worker=1, memory_limit='4GB')
print(f"Dashboard link: {client.dashboard_link}")
# 2. データの読み込み
ddf = dd.read_csv("huge_data.csv").repartition(npartitions=cores * 2)
# 3. map_partitionsの適用
# map_partitions を使うと、各コアがそれぞれのパーティションを担当して並列実行する
# この時点では「予約」だけで、実際の計算は始まらない
result_ddf = ddf.map_partitions(my_custom_loop)
# 4. 実行と保存
# 大規模データなら .compute() でメモリに戻さず、直接ファイルに書き出すのが安全
result_ddf.to_parquet("output_directory/")
if __name__ == "__main__":
# 必ずこの中でメイン処理を呼び出す
main()
メモリ大量消費への追加アドバイス
map_partitions を使う際、さらにメモリを節約するためのポイントがあります。
-
pdf の活用: Daskの map_partitions を使う際、関数の中で扱う pdf は、「Daskが大きなデータを分割した、その一部分(1つのパーティション)」を、通常のPandas DataFrameとして取り出したものです。通常の for ループでプログラムを書くと、全データを一つの DataFrame に読み込もうとしてメモリがパンクします。しかし Dask の pdf 方式なら、必要な分だけ読み込み、処理が終了したらメモリから解放することが可能です。例えば16コアあれば、16個の pdf(Pandas DataFrame)が、それぞれ別々のメモリ空間(プロセス)で同時にループ処理されます。また、pdf が通常のPandasであるため、pdf.iterrows()、pdf.apply()、pdf.groupby()などのコードを書くことができます。
-
repartition の活用: 読み込んだ直後に ddf.repartition(npartitions=...) を使い、1つひとつのパーティションが小さくなるように調整してください(コア数の数倍〜10倍程度)。これにより、1コアあたりが一度に抱えるデータ量を抑えられます。
-
itertuples() よりもベクトル演算: もしループの内容が if 文や単純な計算であれば、itertuples() でループするよりも、関数内で pdf['a'] + pdf['b'] のようにPandasのベクトル演算を書くほうが、メモリ効率も速度も圧倒的に良くなります。
-
compute() を避ける: データ全体を result = ddf.compute() で Python の変数に戻すと、その瞬間に全データが 1 つのメモリ空間に集約され、クラッシュします。必ず to_parquet() や to_csv() でディスクに直接書き出すようにしてください。
2.「1プロセス=1スレッド」が基本
1️⃣ GILの性質
- GILは「プロセス単位」で存在
- 同一プロセス内のスレッドは、同時に1つしか Python バイトコードを実行できない
[Process A]
├─ Thread 1 ← 実行中
├─ Thread 2 ← GIL待ち
└─ Thread 3 ← GIL待ち
👉 スレッドを増やしてもCPUバウンド処理は速くならない
2️⃣ 正解パターン(CPUバウンド)
-
マルチプロセス × 1スレッド
Process 1 (Thread 1) → Core 1
Process 2 (Thread 1) → Core 2
Process 3 (Thread 1) → Core 3
Process 4 (Thread 1) → Core 4
Client(
n_workers=4,
threads_per_worker=1
)
3. 汎用的なループ:dask.delayed を使う
データフレーム操作に限らず、リストの要素に対するループなどを並列化したい場合は dask.delayed を使います。これが最も「Pythonのループ」をそのまま並列化する感覚に近いです。
import dask
@dask.delayed
def process_item(item):
# 重い処理
return item ** 2
items = [1, 2, 3, 4, 5, 6, 7, 8]
results = []
for x in items:
# 実際には計算せず、「予約」だけ入れる
y = process_item(x)
results.append(y)
# ここで一気にマルチプロセスで実行
final_results = dask.compute(*results)
4. 注意点:なぜ普通のループはダメなのか
Daskは「司令塔(Client)」が「作業員(Worker)」に仕事を振り分ける仕組みです。
-
普通のループ: 司令塔が「1行目やって」「終わった?じゃあ2行目やって」と指示を出すため、作業員が複数いても1人ずつしか働けません。さらに、指示を出す通信オーバーヘッドでかえって遅くなります。
-
Dask的なループ: 司令塔が「この1万行の塊を4人に配るから、各自でループ回して終わったら教えて」と指示を出すため、全員が一斉に動けます。
結論
「ループを使っているからダメ」ではなく、「ループを丸ごとDaskの関数に包んで、塊ごとに並列実行させる」という書き換えをすれば、マルチコアをフル活用できます。