参考(図解)
図解したものを用意したので理解促進に活用ください
https://claude.ai/public/artifacts/46ff7b5f-44aa-4662-b6c4-fe8c7ad498f2
シャッフルとは
Sparkにおけるシャッフルは、データをキーごとにまとめ直すためにノード間で再分配する処理です。groupBy()、join、distinct、repartition、orderByなどのトランスフォーメーションで発生します。
ノード内でのデータの動き
シャッフルは以下のステップで実行されます。全体の流れを理解するため、まず図で確認してみましょう。
1. Map側の処理(データ送信側)
各ノードのタスクが、自分が持っているデータを処理します。
パーティション分割の決定
- データの各レコードに対してハッシュ関数を適用し、どのパーティションに送るべきかを計算します
- 例:
hash(key) % パーティション数
ローカルでのグループ化
- 同じパーティション番号のデータをメモリ上でまとめます
- メモリが不足する場合はディスクに書き出します
シャッフルファイルの作成
- パーティションごとにデータをディスクに書き出します
- 各ノードのローカルディスク(ノード内のストレージ)に保存されます
2. シャッフルサービス
各ノードで動作しているシャッフルサービスが、他のノードからのデータ取得要求に応答します。
- ローカルディスクに書き出されたシャッフルファイルを管理
- ネットワーク経由でデータを提供
3. Reduce側の処理(データ受信側)
次のステージのタスクが、必要なデータを取得します。
データのフェッチ
- 自分が処理すべきパーティション番号のデータを、すべてのノードから取得します
- ネットワーク経由でデータを受信します
データの結合
- 複数のノードから受け取ったデータを結合します
- メモリ上でマージ処理を行います
後続処理の実行
- 集約やジョインなどの実際の処理を実行します
シャッフルが重い理由
シャッフルが処理のボトルネックになりやすい理由は、主に以下の3つです。
ネットワーク通信
ノード間でデータを転送するため、大量データではボトルネックになりやすいです。特に数百GB〜数TB規模のデータを扱う場合、ネットワーク帯域が処理速度を左右します。
ディスクI/O
中間データを一度ディスクに書き出すため、処理時間が長くなります。メモリ上だけで処理できれば速いのですが、大量データの場合はディスクへの書き込みが避けられません。
同期コスト
シャッフルが終わるまで次のステージに進めないため、全体の待ち時間が増えてしまいます。すべてのノードがシャッフルを完了するまで、次の処理は開始できません。
具体例:groupByの場合
実際の動作をイメージしやすくするため、具体的な例を見てみましょう。
初期状態のデータ
【ノード1のデータ】
(A, 1), (B, 2), (A, 3)
【ノード2のデータ】
(B, 4), (C, 5), (A, 6)
Map側の処理
各ノードでパーティション分割を決定します。
- ノード1:Aのデータ → パーティション0、Bのデータ → パーティション1
- ノード2:Aのデータ → パーティション0、Bのデータ → パーティション1、Cのデータ → パーティション2
シャッフル後のデータ配置
キーごとに同じパーティションに集約されます。
- パーティション0(ノードX):(A, 1), (A, 3), (A, 6)
- パーティション1(ノードY):(B, 2), (B, 4)
- パーティション2(ノードZ):(C, 5)
Reduce側の処理
各パーティションで集約処理を実行します。これで同じキーのデータがまとまり、効率的に集約できるようになります。
シャッフルを最小限にするポイント
シャッフルは処理コストが高いため、以下のような工夫で最小限にするようにコーディングしてください。
- 可能な限りフィルタリングを先に実行し、シャッフルするデータ量を減らす
-
broadcast joinを活用し、小さなテーブルとのジョインではシャッフルを回避する - 同じキーで複数回操作する場合は、
persist()やcache()でデータを再利用する - 適切なパーティション数を設定し、処理を並列化する
これらの最適化を意識することで、Sparkアプリケーションのパフォーマンスを大幅に改善できます。