Apache Sparkは、大規模なデータ処理を分散環境で効率的に行うためのフレームワークであり、データ処理の基本となるのがRDD(Resilient Distributed Dataset)です。この記事では、PySparkを使った簡単な実践例を通じて、RDDのパーティションとSparkの並列処理について観察します。
- 対象読者:Apache Spark初学者
- 前提知識:python、Sparkのアーキテクチャ
PySparkを使った簡単な実践例
Azure Databricksのノートブック機能を使用して、以下の2つの処理を実際に実行し、Apache Sparkの動作を観察します。
- RDDの生成とパーティション分割
- データの並列処理
前提条件
この記事で使用しているコードは、Azure DatabricksのNotebook環境で実行しています。
※コンピュートリソースのアクセスモードについて、標準(旧「共有済み」)ではSparkContext
の取得ができないため、専用 (旧称: シングルユーザー) を使用しています。
RDDの生成とパーティション分割
ここでは、1 ~ 10 までの数値を含むRDDを生成し、4つのパーティションの分割します。このとき、各パーティションにどのデータが格納されたか、分散具合を観察します。
実行したPySparkのコード1
# SparkContextを取得
sc = spark.sparkContext
# range(1, 11) は、1から10までの数値を生成
# sc.parallelize() は、その数値をRDDとして分散
numbers_rdd = sc.parallelize(range(1, 11),4)
# numbers_rdd = sc.parallelize(range(1, 11),8)
# パーティション数を確認
num_partitions = numbers_rdd.getNumPartitions()
print(f"パーティション数: {num_partitions}")
# 各パーティションのデータを確認
# glom() は各パーティションのデータを1つのリストとしてまとめて返す操作
# collect() はその結果を全てドライバーに収集して、ローカルで見ることができる
partitions_data = numbers_rdd.glom().collect()
# enumerate() でパーティションインデックスとデータを取得
for idx, partition in enumerate(partitions_data):
# 各パーティションの内容を表示
print(f"パーティション {idx}: {partition}")
実行結果を見ると、1 ~ 10 のデータが4つのパーティションに分散されていることが分かります。
また、parallelizeメソッドの第二引数に8を指定すると、8つのパーティション数にデータを分散させることができます。
2つの実行結果から、データができるだけ均等にパーティションに分散されて格納されていることが分かります。
データの並列処理
ここでは、8つのパーティションに格納されたデータの値を2倍する変換処理を行い、並列処理が行われているかSpark UIで確認します。
実行したPySparkのコード2
sc = spark.sparkContext
numbers_rdd = sc.parallelize(range(1, 11),8)
num_partitions = numbers_rdd.getNumPartitions()
print(f"パーティション数: {num_partitions}")
partitions_data = numbers_rdd.glom().collect()
for idx, partition in enumerate(partitions_data):
print(f"変換前 Partition {idx}: {partition}")
# 各要素に対して2倍の処理を行うmap操作を適用
numbers_rdd = numbers_rdd.map(lambda x: x * 2)
partitions_data = numbers_rdd.glom().collect()
# 各パーティションのデータを再確認
for idx, partition in enumerate(partitions_data):
print(f"変換後 Partition {idx}: {partition}")
実行結果3をみると、各パーティションのデータが変換前と変換後で2倍されたことが分かります。
さらに、Spark UIを見ることで、この処理がどのように処理されたかを確認できます。
上図は、Spark UIのjobタブの一部ですが、Job Id列には、0と1の2つのジョブがあり、Tasks列には8つのタスクが含まれていることが確認できます。このことから、それぞれ8つのタスクを含んだ2つのジョブが実行されていたことが分かります。
これは、実行したPySparkのコード2において、8つのパーティションに対し、2つのアクションを実行しているため、それぞれ8つのタスクを含んだ2つのジョブが生成されたと考えられます。
2つのアクションとは、collect()が該当します。Sparkは遅延評価(Lazy Evaluation)を採用しており、データに対する処理は、アクションが呼ばれたときにはじめて実行されるので、2つのアクションに対して、2つのジョブが計画されます。
また、データはパーティションごとに処理されますが、ここでは8つのパーティションを生成しているので、それぞれのジョブに対して8つのタスクが内包されています。
次に、これらのタスクがどのように実行されたかを見ていきます。
上図は、Spark UIのexecutorタブの一部ですが、Executor ID列には、0と1の2つのExecutorが稼働しており、Cores列には、それぞれ4つのコアを持っていることが分かります。
また、Total Tasks列を見ると、それぞれ8つずつ処理を実行していたことが分かります。
Sparkが実行できる並列度の最大値はExecutorのコア数によりますが、筆者の構成では8つのコアが使用できるので、最大8並列で処理が実行できることが分かります。また、一つのジョブに対し8つのタスクが含まれていたので、8つのコアに対して、8つのタスクが同時に実行され、8並列で処理が行われると考えられます。
実際に、8並列で処理が行われていたかをjobタブに戻って確認します。
先ほどのSpark UIのjobタブの一部
のDescription列の青字のリンクをクリックすると、ジョブの実行計画(DAG Visualization)を見ることができます。
上図では、job Id 0の実行計画(DAG Visualization)を表していますが、さらに、実行計画(DAG Visualization)の図形をクリックすることで、ジョブに内包されたタスクの挙動を詳しく見ることができます。
上図のExecutor ID列とLaunch Time列を見ると、8つのタスクが2つのExecutorで同時に実行されていたことが分かります。
よって、8並列で処理が行われていたことが分かります。
考察
PySparkを使った簡単な実践例では、Apache Sparkを使用して、RDDのパーティション分割と並列処理の挙動を確認しました。具体的には、1〜10までの数値を含むRDDを生成し、指定したのパーティションに分割した後、そのパーティションにデータがどのように分散されるかを観察しました。また、パーティションごとに簡単なデータ変換を行い、何並列で処理が行われたかをSpark UIで確認しました。
これらの観察から、以下の考察を得ることができました。
- sc.parallelize() は、パーティションにできるだけ偏りがないようにデータを格納する
- 遅延評価によって、アクションの数だけジョブが生成される
- パーティションごとにタスクが割り当てられるので、タスク数はパーティション数に一致する
- Sparkの並列度はExecutorのコア数によって決定される
※ただし、これらの考察は簡単な処理に基づいており、異なる処理内容や、リソースが十分でない場合には、成り立たない場合があります。
終わりに
本記事では、小規模なデータを用いて、RDDのデータの管理やSparkの並列処理についてSpark UIを用いて調査しました。
今後も、本記事のようなSpark機能の簡単な実験をまとめた記事を発信予定です。
お楽しみに!
関連記事