1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Dask完全入門 ― Pythonで「大きなデータ」と「並列処理」を味方にする

1
Posted at

はじめに

Dask は、手元のノートPCからクラスタまでスケールする「並列処理・分散処理」向けの Python ライブラリで、Pandas や NumPy のような使い慣れた API をそのまま大規模データに広げてくれます。Dask の特徴は「大きすぎてメモリに載らないデータ」や「時間のかかる処理」を、小さな仕事のかたまり(タスク)に分割し、それらを複数コアや複数マシンで同時に実行する点にあります。ここでは、Dask の基本概念から DataFrame・Array・Bag・遅延評価(delayed)・分散スケジューラまで、15章構成で日本語で丁寧に解説し、各章でそのまま動かせるサンプルコードも示します。


第1章 Daskとは何か ― コンセプトと用途

Dask は「Python エコシステムをそのままスケールさせるための並列・分散計算フレームワーク」です。NumPy の配列、Pandas の DataFrame、Scikit‑learn の機械学習といった既存ライブラリの API を真似た高レベルコレクション(Dask Array・Dask DataFrame・Dask Bag)と、その裏側でタスクグラフを実行するスケジューラで構成されています。タスクグラフとは、計算の依存関係を表す有向非巡回グラフ(Directed Acyclic Graph; DAG)のことで、どの計算がどの結果に依存しているかを記述し、Dask はこのグラフを解析して「安全に並列できる部分」を同時に実行してくれます。

# インストール(まだの場合)
# pip install "dask[complete]" distributed

import dask
import dask.array as da
import dask.dataframe as dd

# バージョン確認
print("Dask version:", dask.__version__)

第2章 インストールと基本設定 ― ノートPCから始める

Dask は pip または conda で簡単にインストールでき、用途に応じて extras を指定して DataFrame・Array・分散処理など必要なコンポーネントを選べます。ローカル環境では特別なクラスタ構築を意識せずに、マルチコア CPU を活用する「シングルマシンスケジューラ」が自動的に使われ、より大きな処理が必要になったときに分散スケジューラに切り替える、という段階的な導入が可能です。

# 一般的なインストール例
pip install "dask[complete]" distributed

# DataFrame だけで良い場合
pip install "dask[dataframe]"
from dask.distributed import Client

# ローカルマシン上に分散クライアントを立ち上げる(全コア利用)
client = Client()
print(client)

第3章 Dask の高レベルコレクション ― Array, DataFrame, Bag

Dask の高レベルコレクションは、既存ライブラリを大規模データに拡張した 抽象データ構造 です。Dask Array は NumPy の ndarray とほぼ同じ API で、多数の小さな NumPy 配列(チャンク)の集合として巨大配列を扱います。Dask DataFrame は多数の小さな Pandas DataFrame の集合として、Pandas と同じ操作でメモリに載らない行数を扱い、Dask Bag は構造が一定でない JSON ログなどの「リスト的データ」を並列処理するために用いられます。

import dask.array as da
import dask.dataframe as dd
import dask.bag as db

# 1GB 規模のランダム配列(例)をチャンクに分割して作成
x = da.random.random((1_000_000,), chunks=(100_000,))
print(x)  # 実体はまだ計算されていない「遅延オブジェクト」

# 複数 CSV ファイルから Dask DataFrame を作成
df = dd.read_csv("data/2025_logs-*.csv")
print(df)

# JSON Lines 形式のログを Dask Bag として読み込む
logs = db.read_text("logs/*.jsonl")
print(logs)

第4章 遅延評価とタスクグラフ ― .compute() の意味

Dask の最大の特徴の一つは「遅延評価(lazy evaluation)」であり、Array や DataFrame に対して演算を書いても、その場では計算せず「タスクグラフ」に変換して溜めておきます。最終的な結果が必要になったときに .compute() を呼ぶことで、Dask のスケジューラがグラフを解析して並列実行し、結果を返します。この仕組みにより、不要な中間結果を省略してメモリ効率を高めたり、複数の処理を一度に最適化して実行時間を短縮できます。

import dask.array as da

x = da.random.random((1_000_000,), chunks=(100_000,))
y = (x - x.mean()) / x.std()   # 標準化(ここではまだ計算されない)

# 実際に値が必要になったタイミングで compute
result = y.sum().compute()
print("Result:", result)

第5章 Dask DataFrame 基本操作 ― Pandas ユーザー向け

Dask DataFrame は「インデックスに沿って分割された複数の Pandas DataFrame」の集合で、read_csvgroupby など、Pandas に似た API をそのまま利用できます。Pandas と異なる点は、オブジェクトを表示しても先頭数行しか読み込まれず、.compute().head() などの明示的なトリガーがあるまで完全なデータは読み込まれないことです。これにより、数千万行を超える CSV でも「必要な部分だけを段階的に読み込む」ことが可能になり、メモリ消費を抑えつつ分析できます。

import dask.dataframe as dd

# 複数の CSV ファイルをまとめて読み込み
df = dd.read_csv("data/sales-*.csv")  # sales-2025-01.csv など

# 集計処理(まだ遅延状態)
monthly = df.groupby("month")["amount"].sum()

# 計算を実行して結果を Pandas DataFrame / Series として取得
monthly_result = monthly.compute()
print(monthly_result)

# 先頭だけ簡単に確認
print(df.head())

第6章 Dask Array の並列数値計算

Dask Array は、多数の小さな NumPy 配列チャンクに対して同じ演算を並列に行うことで、大規模な数値計算を効率化する仕組みです。チャンクサイズは「一度にメモリに載せたいサイズ」を目安に決め、配列演算が自動的にチャンク単位に分解されてタスクグラフになります。NumPy と同様にユニバーサル関数、集約、線形代数などが利用できるため、既存の数値コードを比較的少ない変更で大規模化できるのが利点です。

import dask.array as da

# 2次元配列をチャンク分割して作成
x = da.random.normal(0, 1, size=(10_000, 1_000), chunks=(1_000, 1_000))

# 行方向の平均と標準偏差
row_mean = x.mean(axis=1)
row_std = x.std(axis=1)

# 結果の一部を計算して確認
sample_mean = row_mean[:10].compute()
sample_std = row_std[:10].compute()
print("mean:", sample_mean)
print("std :", sample_std)

第7章 Dask Bag と非構造データ処理

Dask Bag は、ログファイル・JSON Lines・スクレイピング結果など「一つ一つの要素が辞書や文字列で、表形式にまだ落とし込まれていないデータ」を扱うためのコレクションです。map・filter・groupby などの関数型操作を使い、要素単位の変換や集約を並列に行えるため、前処理の段階で Bag から DataFrame へ変換するパターンがよく使われます。これにより、大量ログを並列にパースして必要な情報だけ抽出し、その後の分析を Dask DataFrame に引き継ぐことができます。

import json
import dask.bag as db

# JSON Lines ログを読み込み
logs = db.read_text("logs/access-*.jsonl")

# 各行を JSON にパース
records = logs.map(json.loads)

# ステータスコードが 500 の行だけ抽出
errors = records.filter(lambda r: r.get("status") == 500)

# 件数を計算
error_count = errors.count().compute()
print("500 エラー件数:", error_count)

# DataFrame に変換して詳細分析も可能
df_errors = errors.to_dataframe()
print(df_errors.head())

第8章 dask.delayed による任意関数の並列化

dask.delayed は、任意の Python 関数を「遅延実行タスク」としてラップし、タスクグラフに組み込むための低レベル API です。通常どおり Python 関数を定義し、呼び出し時に delayed を通すだけで並列化が可能になり、for ループで逐次処理していたコードをほとんどそのまま並列版に書き換えられます。最終的に dask.compute を呼ぶと、すべての delayed オブジェクトの依存関係を解決しながら並列実行して結果を返します。

import time
import dask
from dask import delayed

def slow_square(x):
    time.sleep(1)
    return x * x

# 通常: 逐次実行(合計 5 秒)
# results_seq = [slow_square(i) for i in range(5)]

# Dask: 遅延タスクとして作成
tasks = [delayed(slow_square)(i) for i in range(5)]

# すべて並列実行(CPU コア数に応じて約 1~2 秒程度)
results = dask.compute(*tasks)
print(results)

第9章 分散スケジューラと Client/Future の基本

dask.distributed は、ローカル・クラスタ・クラウド上での分散実行を担うスケジューラで、Client を通してジョブを送信します。client.submit で関数を送ると Future オブジェクトが返され、これは「まだ計算中かもしれない将来の値」を表します。Future を使うことで、計算の状態をリアルタイムに監視したり、完了した結果から次のタスクを動的に発行するなど、より柔軟な並列処理フローを構築できます。

from dask.distributed import Client

client = Client()  # ローカル分散クライアント
print(client.dashboard_link)  # Web ダッシュボード URL

def inc(x):
    return x + 1

def double(x):
    return 2 * x

# 非同期に送信
future_x = client.submit(inc, 10)
future_y = client.submit(double, future_x)

# 実際の結果を取得
result = future_y.result()
print("Result:", result)

第10章 Dask での CSV/Parquet 大規模データ処理

Dask DataFrame は、複数ファイルに分割された CSV や Parquet を「1つの論理 DataFrame」として扱い、背後でファイルごとにチャンクを割り当てて並列に読み込みます。Parquet は列指向フォーマットであり、必要な列だけを読み込むことができるため、Dask と組み合わせると I/O とメモリの効率が非常に高くなります。多くの現場では、まず CSV を Parquet に変換し、その後 Dask + Parquet で本格分析するというパターンが採用されています。

import dask.dataframe as dd

# 複数 CSV を読み込み、前処理
df = dd.read_csv("data/raw-*.csv")
df = df[df["amount"] > 0]

# Parquet としてパーティション分割して保存
df.to_parquet("data_parquet/")

# 以降は Parquet から高速に読み込む
df_pq = dd.read_parquet("data_parquet/")
summary = df_pq.groupby("category")["amount"].sum().compute()
print(summary)

第11章 機械学習との連携 ― dask-ml と外部ライブラリ

Dask は dask-mlxgboost.dask などと組み合わせることで、機械学習の前処理や学習を並列・分散環境へと拡張できます。Dask DataFrame を特徴量行列とし、訓練・テスト分割やスケーリングを分散実行し、その後 XGBoost などに渡して分散学習させる流れが典型的です。これにより、単一のマシンでは扱いきれない規模のデータセットを複数ワーカーに分散して学習でき、モデル構築にかかる時間を大幅に短縮できます。

import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import StandardScaler
import xgboost.dask as dxgb
from dask.distributed import Client

client = Client()

# データ読み込み
df = dd.read_parquet("data_parquet/")

X = df[["feature1", "feature2", "feature3"]]
y = df["label"]

# 訓練・テスト分割(分散)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# 標準化(分散)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# XGBoost の分散 DMatrix
dtrain = dxgb.DaskDMatrix(client, X_train_scaled, y_train)

# 学習
params = {"objective": "binary:logistic", "tree_method": "hist"}
model = dxgb.train(client, params, dtrain, num_boost_round=100)
print(model)

第12章 Dask ダッシュボードで可視化する実行状況

dask.distributed の Client を立ち上げると、Web ブラウザからアクセスできるダッシュボードが起動し、タスクグラフの進行状況・ワーカーごとのメモリ使用量・CPU 使用率などをリアルタイムに確認できます。これにより、ボトルネックとなっているタスクやメモリ圧迫箇所を可視化し、チャンクサイズの見直しやアルゴリズム変更といったチューニングに役立ちます。開発中はダッシュボードを常に開いておき、コード変更がどのように実行パターンに影響するかを観察すると理解が深まります。

from dask.distributed import Client
import dask.array as da

client = Client()
print("Dashboard:", client.dashboard_link)

x = da.random.random((20_000, 2_000), chunks=(2_000, 2_000))
y = (x ** 2).sum(axis=1)

# ダッシュボードを見ながら計算
total = y.mean().compute()
print("Mean:", total)

第13章 ベストプラクティス ― チャンク・メモリ・I/O

Dask を効果的に使うには、チャンクサイズとメモリ使用量、I/O パターンのバランスを意識することが重要です。チャンクが大きすぎると並列度が下がり、逆に小さすぎるとタスク数が増えすぎてオーバーヘッドで遅くなります。また、可能な限り列指向フォーマット(Parquet)や圧縮を利用し、persist() を用いて中間結果をクラスタメモリに保持することで、同じ計算を何度もやり直すコストを削減できます。

import dask.array as da

# 悪い例:チャンクが小さすぎる(オーバーヘッド増)
x_small = da.random.random((1_000_000,), chunks=(1_000,))

# 良い例:CPU コア数やメモリに応じて適切なチャンクサイズ
x_good = da.random.random((1_000_000,), chunks=(100_000,))

# 計算結果を persist して再利用
y = (x_good ** 2).persist()    # 一度だけ計算してメモリに保持
z = (y + 1).mean().compute()   # y は再計算されない
print(z)

第14章 クラスタへのスケールアウト ― リモートワーカー利用

手元のマシンだけでは処理が追いつかなくなったら、Dask は Kubernetes やクラウド上のマシン群へスケールアウトできます。dask.distributed の Client はリモートの Scheduler に接続でき、そこからさらに複数の Worker プロセスが別々のノードで動作します。この構成により、ローカルで書いた Dask コードをほとんど変更せずに、そのまま大規模クラスタ上で実行できるため、開発環境と本番環境のギャップを小さく保てます。

from dask.distributed import Client

# すでに立ち上がっている Dask Scheduler に接続
# アドレスはクラスタ側の設定に応じて変更
client = Client("tcp://scheduler-address:8786")
print(client)

# あとはローカルと同じように Dask コードを書く
import dask.dataframe as dd

df = dd.read_parquet("s3://my-bucket/huge-data/")
result = df.groupby("user_id")["amount"].sum().compute()
print(result.head())

第15章 まとめと次の一歩 ― 実案件への適用

Dask は「いつもの Python コード」をなるべくそのままスケールさせるという思想で設計されており、Pandas/NumPy/Scikit‑learn の利用経験があれば比較的スムーズに習得できます。小規模データでは Pandas、メモリを超えたら Dask DataFrame へ切り替える、といった段階的導入が現実的であり、まずはローカル環境で .compute() やダッシュボードの挙動に慣れることが重要です。その上で、ログ解析・ETL パイプライン・機械学習前処理など、自身の業務でボトルネックになっている処理を一つ選び、Dask を使って並列化してみると、理論だけではわからない実践的な価値を体感できるでしょう。

# 小さく始めて、徐々に Dask 化する一例

import pandas as pd
import dask.dataframe as dd

# まずは従来どおり Pandas で試す
pdf = pd.read_csv("data/sample.csv")
print(pdf.groupby("category")["amount"].sum())

# データが増えてきたら Dask に切り替え
ddf = dd.read_csv("data/sample-*.csv")
print(ddf.groupby("category")["amount"].sum().compute())
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?