0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Sparkのシャッフルについて

Posted at

参考(図解)

図解したものを用意したので理解促進に活用ください
https://claude.ai/public/artifacts/46ff7b5f-44aa-4662-b6c4-fe8c7ad498f2

シャッフルとは

Sparkにおけるシャッフルは、データをキーごとにまとめ直すためにノード間で再分配する処理です。groupBy()joindistinctrepartitionorderByなどのトランスフォーメーションで発生します。

ノード内でのデータの動き

シャッフルは以下のステップで実行されます。全体の流れを理解するため、まず図で確認してみましょう。

1. Map側の処理(データ送信側)

各ノードのタスクが、自分が持っているデータを処理します。

パーティション分割の決定

  • データの各レコードに対してハッシュ関数を適用し、どのパーティションに送るべきかを計算します
  • 例:hash(key) % パーティション数

ローカルでのグループ化

  • 同じパーティション番号のデータをメモリ上でまとめます
  • メモリが不足する場合はディスクに書き出します

シャッフルファイルの作成

  • パーティションごとにデータをディスクに書き出します
  • 各ノードのローカルディスク(ノード内のストレージ)に保存されます

2. シャッフルサービス

各ノードで動作しているシャッフルサービスが、他のノードからのデータ取得要求に応答します。

  • ローカルディスクに書き出されたシャッフルファイルを管理
  • ネットワーク経由でデータを提供

3. Reduce側の処理(データ受信側)

次のステージのタスクが、必要なデータを取得します。

データのフェッチ

  • 自分が処理すべきパーティション番号のデータを、すべてのノードから取得します
  • ネットワーク経由でデータを受信します

データの結合

  • 複数のノードから受け取ったデータを結合します
  • メモリ上でマージ処理を行います

後続処理の実行

  • 集約やジョインなどの実際の処理を実行します

シャッフルが重い理由

シャッフルが処理のボトルネックになりやすい理由は、主に以下の3つです。

ネットワーク通信

ノード間でデータを転送するため、大量データではボトルネックになりやすいです。特に数百GB〜数TB規模のデータを扱う場合、ネットワーク帯域が処理速度を左右します。

ディスクI/O

中間データを一度ディスクに書き出すため、処理時間が長くなります。メモリ上だけで処理できれば速いのですが、大量データの場合はディスクへの書き込みが避けられません。

同期コスト

シャッフルが終わるまで次のステージに進めないため、全体の待ち時間が増えてしまいます。すべてのノードがシャッフルを完了するまで、次の処理は開始できません。

具体例:groupByの場合

実際の動作をイメージしやすくするため、具体的な例を見てみましょう。

初期状態のデータ

【ノード1のデータ】
(A, 1), (B, 2), (A, 3)

【ノード2のデータ】
(B, 4), (C, 5), (A, 6)

Map側の処理

各ノードでパーティション分割を決定します。

  • ノード1:Aのデータ → パーティション0、Bのデータ → パーティション1
  • ノード2:Aのデータ → パーティション0、Bのデータ → パーティション1、Cのデータ → パーティション2

シャッフル後のデータ配置

キーごとに同じパーティションに集約されます。

  • パーティション0(ノードX):(A, 1), (A, 3), (A, 6)
  • パーティション1(ノードY):(B, 2), (B, 4)
  • パーティション2(ノードZ):(C, 5)

Reduce側の処理

各パーティションで集約処理を実行します。これで同じキーのデータがまとまり、効率的に集約できるようになります。

シャッフルを最小限にするポイント

シャッフルは処理コストが高いため、以下のような工夫で最小限にするようにコーディングしてください。

  • 可能な限りフィルタリングを先に実行し、シャッフルするデータ量を減らす
  • broadcast joinを活用し、小さなテーブルとのジョインではシャッフルを回避する
  • 同じキーで複数回操作する場合は、persist()cache()でデータを再利用する
  • 適切なパーティション数を設定し、処理を並列化する

これらの最適化を意識することで、Sparkアプリケーションのパフォーマンスを大幅に改善できます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?