0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonのDaskを使って、ループ処理をマルチコアで並列化させる方法

0
Last updated at Posted at 2026-01-10

通常のPythonの「for row in df.iterrows():」のようなループをそのまま書いた場合、Daskを使っても並列化されず、むしろ非常に遅くなります。

Daskでループ処理を並列化するには、「Daskが並列管理できる形」に書き換える必要があります。主な3つのパターンを解説します。

1. 最推奨:map_partitions を使う

「行」ごとのループではなく、「データの塊(パーティション)」ごとのループとして処理します。これなら、各パーティションが別々のCPUコア(プロセス)で同時に実行されます。

Python
import dask.dataframe as dd
from dask.distributed import Client
import multiprocessing

# 並列実行したい関数は、外側に定義しておく
# 例:各行に対して複雑な計算ループを行いたい場合
def my_custom_loop(pdf):
    # pdf は「通常の Pandas DataFrame」として渡される
    results = []
    for row in pdf.itertuples():
        # ここに重い処理を書く
        results.append(row.age * 2) 
    pdf['new_col'] = results
    return pdf

def main():
    # 1. クライアントの起動(メモリ制限を考慮)
    # 物理コア数に合わせて調整
    cores = multiprocessing.cpu_count()
    client = Client(n_workers=cores, threads_per_worker=1, memory_limit='4GB')
    print(f"Dashboard link: {client.dashboard_link}")

    # 2. データの読み込み
    ddf = dd.read_csv("huge_data.csv").repartition(npartitions=cores * 2)

    # 3. map_partitionsの適用
    # map_partitions を使うと、各コアがそれぞれのパーティションを担当して並列実行する
    # この時点では「予約」だけで、実際の計算は始まらない
    result_ddf = ddf.map_partitions(my_custom_loop)

    # 4. 実行と保存
    # 大規模データなら .compute() でメモリに戻さず、直接ファイルに書き出すのが安全
    result_ddf.to_parquet("output_directory/")

if __name__ == "__main__":
    # 必ずこの中でメイン処理を呼び出す
    main()

メモリ大量消費への追加アドバイス
map_partitions を使う際、さらにメモリを節約するためのポイントがあります。

  • pdf の活用: Daskの map_partitions を使う際、関数の中で扱う pdf は、「Daskが大きなデータを分割した、その一部分(1つのパーティション)」を、通常のPandas DataFrameとして取り出したものです。通常の for ループでプログラムを書くと、全データを一つの DataFrame に読み込もうとしてメモリがパンクします。しかし Dask の pdf 方式なら、必要な分だけ読み込み、処理が終了したらメモリから解放することが可能です。例えば16コアあれば、16個の pdf(Pandas DataFrame)が、それぞれ別々のメモリ空間(プロセス)で同時にループ処理されます。また、pdf が通常のPandasであるため、pdf.iterrows()、pdf.apply()、pdf.groupby()などのコードを書くことができます。

  • repartition の活用: 読み込んだ直後に ddf.repartition(npartitions=...) を使い、1つひとつのパーティションが小さくなるように調整してください(コア数の数倍〜10倍程度)。これにより、1コアあたりが一度に抱えるデータ量を抑えられます。

  • itertuples() よりもベクトル演算: もしループの内容が if 文や単純な計算であれば、itertuples() でループするよりも、関数内で pdf['a'] + pdf['b'] のようにPandasのベクトル演算を書くほうが、メモリ効率も速度も圧倒的に良くなります。

  • compute() を避ける: データ全体を result = ddf.compute() で Python の変数に戻すと、その瞬間に全データが 1 つのメモリ空間に集約され、クラッシュします。必ず to_parquet() や to_csv() でディスクに直接書き出すようにしてください。

2.「1プロセス=1スレッド」が基本

1️⃣ GILの性質

  • GILは「プロセス単位」で存在
  • 同一プロセス内のスレッドは、同時に1つしか Python バイトコードを実行できない

[Process A]
├─ Thread 1 ← 実行中
├─ Thread 2 ← GIL待ち
└─ Thread 3 ← GIL待ち
👉 スレッドを増やしてもCPUバウンド処理は速くならない

2️⃣ 正解パターン(CPUバウンド)

  • マルチプロセス × 1スレッド
    Process 1 (Thread 1) → Core 1
    Process 2 (Thread 1) → Core 2
    Process 3 (Thread 1) → Core 3
    Process 4 (Thread 1) → Core 4
Python
Client(
    n_workers=4,
    threads_per_worker=1
)

3. 汎用的なループ:dask.delayed を使う

データフレーム操作に限らず、リストの要素に対するループなどを並列化したい場合は dask.delayed を使います。これが最も「Pythonのループ」をそのまま並列化する感覚に近いです。

Python
import dask

@dask.delayed
def process_item(item):
    # 重い処理
    return item ** 2

items = [1, 2, 3, 4, 5, 6, 7, 8]
results = []

for x in items:
    # 実際には計算せず、「予約」だけ入れる
    y = process_item(x)
    results.append(y)

# ここで一気にマルチプロセスで実行
final_results = dask.compute(*results)

4. 注意点:なぜ普通のループはダメなのか

Daskは「司令塔(Client)」が「作業員(Worker)」に仕事を振り分ける仕組みです。

  • 普通のループ: 司令塔が「1行目やって」「終わった?じゃあ2行目やって」と指示を出すため、作業員が複数いても1人ずつしか働けません。さらに、指示を出す通信オーバーヘッドでかえって遅くなります。

  • Dask的なループ: 司令塔が「この1万行の塊を4人に配るから、各自でループ回して終わったら教えて」と指示を出すため、全員が一斉に動けます。

結論
「ループを使っているからダメ」ではなく、「ループを丸ごとDaskの関数に包んで、塊ごとに並列実行させる」という書き換えをすれば、マルチコアをフル活用できます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?