背景・目的
Sparkでファイルを読み込んだ直後に、RDDで展開されるパーティション数について気になったので整理してみました。
まとめ
- Sparkでファイルを読み込んだ直後のパーティション数は、ファイル数とファイルフォーマット、ファイルサイズとSparkパラメータの組み合わせで決まります。
- ファイル数
- 基本的に1ファイル
- 分割可能なファイルフォーマットである。下記のようなフォーマットが対象
- JSON
- Parquet
- ORC
- ファイルサイズ > パラメータ(spark.sql.files.maxPartitionBytes)のときに、ファイル分割されます。
- ファイル数
- 以下のような検証を行い、想定通りの結果になりました。
シナリオ ファイル数 ファイルフォーマット ファイルサイズ Sparkパラメータ spark.sql.files.maxPartitionBytes
結果
(パーティション数)128MB以下の場合 1 CSV 約1MB 128MB 1 1ファイル1パーティションである 2 CSV 約1MB 128MB 2 分割されないフォーマット 1 CSV 約1MB 1KB 1 分割されるフォーマット 1 Parquet 約1MB 1KB 840 パラメータの有効性の確認 1 Parquet 約1MB 2KB 420
概要
ファイル分割について
Other Configuration Optionsには、下記の記述があります。
spark.sql.files.maxPartitionBytes
ファイルの読み取り時に 1 つのパーティションにパックする最大バイト数。 この構成は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。
- パラメータが有効なファイルフォーマットは下記の通り
- Parquet
- JSON
- ORC
- デフォルトは、134217728 (128 MB)
実装
事前準備
- S3に、下記のファイルを配置し検証します。
- Glue Jupyter Notebookで試しています。
シナリオ
下記の検証を行い、パーティション数の確認を行います。
- 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
- 1MBのCSVファイル2つを読み込んだときのパーティション数→2パーティションになる想定
- spark.sql.files.maxPartitionBytesを1024に設定& 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
- spark.sql.files.maxPartitionBytesを1024に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→複数パーティションになる想定
- spark.sql.files.maxPartitionBytesを2048に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→上記4の半分になる想定
1. 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
ファイルが1つのため、想定通り1パーティション
# 1MBファイルを一つ
from pyspark.sql import types as T, functions as F
print(spark.conf.get('spark.sql.files.maxPartitionBytes'))
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://XXXX/text/"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
S3bucket_node1.printSchema()
S3bucket_node1.toDF().rdd.getNumPartitions()
===
1024
root
|-- col0: string
1
2.1MBのCSVファイル2つを読み込んだときのパーティション数→2パーティションになる想定
ファイルが2つのため、想定通り2パーティション
# 1MBファイルを2つ→2パーティション
from pyspark.sql import types as T, functions as F
print(spark.conf.get('spark.sql.files.maxPartitionBytes'))
S3bucket_node2 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://XXXX/text-2/"], "recurse": True},
transformation_ctx="S3bucket_node2",
)
S3bucket_node2.printSchema()
S3bucket_node2.toDF().rdd.getNumPartitions()
===
1024
root
|-- col0: string
2
3. spark.sql.files.maxPartitionBytesを1024に設定& 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
分割不可能なファイルのため、想定通り1パーティション
# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F
# 1024 Byte単位にファイル分割できるか?
spark.conf.set('spark.sql.files.maxPartitionBytes',1024)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))
S3bucket_node3 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://XXXX/text/"], "recurse": True},
transformation_ctx="S3bucket_node3",
)
S3bucket_node3.printSchema()
S3bucket_node3.toDF().rdd.getNumPartitions()
===
変更後:1024
root
|-- col0: string
1
4. spark.sql.files.maxPartitionBytesを1024に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→複数パーティションになる想定
分割可能なファイルフォーマットで、ファイルサイズ >spark.sql.files.maxPartitionBytesが成り立つため、パーティション数は分割される。
# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F
# 1024 Byte単位にファイル分割できるか?→840パーティション
spark.conf.set('spark.sql.files.maxPartitionBytes',1024)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))
S3bucket_node4 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="parquet",
connection_options={"paths": ["s3://XXXX/parquet/"], "recurse": True},
transformation_ctx="S3bucket_node4",
)
S3bucket_node4.printSchema()
S3bucket_node4.toDF().rdd.getNumPartitions()
===
変更後:1024
root
|-- col0: string
840
5. spark.sql.files.maxPartitionBytesを2048に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→上記4の半分になる想定
4と比較して、パラメータの値を倍にしたため、パーティション数は想定通り半分になる。
# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F
# 2048 Byte単位にファイル分割できるか?
spark.conf.set('spark.sql.files.maxPartitionBytes',2048)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))
S3bucket_node5 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="parquet",
connection_options={"paths": ["s3://XXXX/parquet/"], "recurse": True},
transformation_ctx="S3bucket_node5",
)
S3bucket_node5.printSchema()
S3bucket_node5.toDF().rdd.getNumPartitions()
===
変更後:2048
root
|-- col0: string
420
考察
ファイルフォーマットとファイルサイズにより、Sparkのパーティション数が変わることがわかりました。
参考