0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Spark RDDの基本的な動作の確認をしてみた

Last updated at Posted at 2025-04-02

Apache Sparkは、大規模なデータ処理を分散環境で効率的に行うためのフレームワークであり、データ処理の基本となるのがRDD(Resilient Distributed Dataset)です。この記事では、PySparkを使った簡単な実践例を通じて、RDDのパーティションとSparkの並列処理について観察します。

  • 対象読者:Apache Spark初学者
  • 前提知識:python、Sparkのアーキテクチャ

PySparkを使った簡単な実践例

Azure Databricksのノートブック機能を使用して、以下の2つの処理を実際に実行し、Apache Sparkの動作を観察します。

  • RDDの生成とパーティション分割
  • データの並列処理

前提条件

この記事で使用しているコードは、Azure DatabricksのNotebook環境で実行しています。
※コンピュートリソースのアクセスモードについて、標準(旧「共有済み」)ではSparkContextの取得ができないため、専用 (旧称: シングルユーザー) を使用しています。

RDDの生成とパーティション分割

ここでは、1 ~ 10 までの数値を含むRDDを生成し、4つのパーティションの分割します。このとき、各パーティションにどのデータが格納されたか、分散具合を観察します。

実行したPySparkのコード1

python
# SparkContextを取得
sc = spark.sparkContext

# range(1, 11) は、1から10までの数値を生成
# sc.parallelize() は、その数値をRDDとして分散
numbers_rdd = sc.parallelize(range(1, 11),4)
# numbers_rdd = sc.parallelize(range(1, 11),8)

# パーティション数を確認
num_partitions = numbers_rdd.getNumPartitions()
print(f"パーティション数: {num_partitions}")

# 各パーティションのデータを確認
# glom() は各パーティションのデータを1つのリストとしてまとめて返す操作
# collect() はその結果を全てドライバーに収集して、ローカルで見ることができる
partitions_data = numbers_rdd.glom().collect()

# enumerate() でパーティションインデックスとデータを取得
for idx, partition in enumerate(partitions_data):
    # 各パーティションの内容を表示
    print(f"パーティション {idx}: {partition}")

実行結果1
image.png

実行結果を見ると、1 ~ 10 のデータが4つのパーティションに分散されていることが分かります。

また、parallelizeメソッドの第二引数に8を指定すると、8つのパーティション数にデータを分散させることができます。

実行結果2
image.png

2つの実行結果から、データができるだけ均等にパーティションに分散されて格納されていることが分かります。

データの並列処理

ここでは、8つのパーティションに格納されたデータの値を2倍する変換処理を行い、並列処理が行われているかSpark UIで確認します。

実行したPySparkのコード2

python
sc = spark.sparkContext

numbers_rdd = sc.parallelize(range(1, 11),8)

num_partitions = numbers_rdd.getNumPartitions()
print(f"パーティション数: {num_partitions}")

partitions_data = numbers_rdd.glom().collect()

for idx, partition in enumerate(partitions_data):
    print(f"変換前 Partition {idx}: {partition}")

# 各要素に対して2倍の処理を行うmap操作を適用
numbers_rdd = numbers_rdd.map(lambda x: x * 2)

partitions_data = numbers_rdd.glom().collect()
# 各パーティションのデータを再確認
for idx, partition in enumerate(partitions_data):
    print(f"変換後 Partition {idx}: {partition}")

実行結果3
image.png

実行結果3をみると、各パーティションのデータが変換前と変換後で2倍されたことが分かります。
さらに、Spark UIを見ることで、この処理がどのように処理されたかを確認できます。

image.png
上図は、Spark UIのjobタブの一部ですが、Job Id列には、0と1の2つのジョブがあり、Tasks列には8つのタスクが含まれていることが確認できます。このことから、それぞれ8つのタスクを含んだ2つのジョブが実行されていたことが分かります。

これは、実行したPySparkのコード2において、8つのパーティションに対し、2つのアクションを実行しているため、それぞれ8つのタスクを含んだ2つのジョブが生成されたと考えられます。
2つのアクションとは、collect()が該当します。Sparkは遅延評価(Lazy Evaluation)を採用しており、データに対する処理は、アクションが呼ばれたときにはじめて実行されるので、2つのアクションに対して、2つのジョブが計画されます。
また、データはパーティションごとに処理されますが、ここでは8つのパーティションを生成しているので、それぞれのジョブに対して8つのタスクが内包されています。

次に、これらのタスクがどのように実行されたかを見ていきます。

image.png
上図は、Spark UIのexecutorタブの一部ですが、Executor ID列には、0と1の2つのExecutorが稼働しており、Cores列には、それぞれ4つのコアを持っていることが分かります。
また、Total Tasks列を見ると、それぞれ8つずつ処理を実行していたことが分かります。

Sparkが実行できる並列度の最大値はExecutorのコア数によりますが、筆者の構成では8つのコアが使用できるので、最大8並列で処理が実行できることが分かります。また、一つのジョブに対し8つのタスクが含まれていたので、8つのコアに対して、8つのタスクが同時に実行され、8並列で処理が行われると考えられます。

実際に、8並列で処理が行われていたかをjobタブに戻って確認します。
先ほどのSpark UIのjobタブの一部のDescription列の青字のリンクをクリックすると、ジョブの実行計画(DAG Visualization)を見ることができます。
image.png
上図では、job Id 0の実行計画(DAG Visualization)を表していますが、さらに、実行計画(DAG Visualization)の図形をクリックすることで、ジョブに内包されたタスクの挙動を詳しく見ることができます。
image.png
上図のExecutor ID列とLaunch Time列を見ると、8つのタスクが2つのExecutorで同時に実行されていたことが分かります。
よって、8並列で処理が行われていたことが分かります。

考察

PySparkを使った簡単な実践例では、Apache Sparkを使用して、RDDのパーティション分割と並列処理の挙動を確認しました。具体的には、1〜10までの数値を含むRDDを生成し、指定したのパーティションに分割した後、そのパーティションにデータがどのように分散されるかを観察しました。また、パーティションごとに簡単なデータ変換を行い、何並列で処理が行われたかをSpark UIで確認しました。
これらの観察から、以下の考察を得ることができました。

  • sc.parallelize() は、パーティションにできるだけ偏りがないようにデータを格納する
  • 遅延評価によって、アクションの数だけジョブが生成される
  • パーティションごとにタスクが割り当てられるので、タスク数はパーティション数に一致する
  • Sparkの並列度はExecutorのコア数によって決定される
    ※ただし、これらの考察は簡単な処理に基づいており、異なる処理内容や、リソースが十分でない場合には、成り立たない場合があります。

終わりに

本記事では、小規模なデータを用いて、RDDのデータの管理やSparkの並列処理についてSpark UIを用いて調査しました。
今後も、本記事のようなSpark機能の簡単な実験をまとめた記事を発信予定です。
お楽しみに!

関連記事

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?