LoginSignup
1
0

Sparkのファイルとパーティションの関係について確認してみた

Last updated at Posted at 2023-01-24

背景・目的

Sparkでファイルを読み込んだ直後に、RDDで展開されるパーティション数について気になったので整理してみました。

まとめ

  • Sparkでファイルを読み込んだ直後のパーティション数は、ファイル数とファイルフォーマット、ファイルサイズとSparkパラメータの組み合わせで決まります。
    • ファイル数
      • 基本的に1ファイル
    • 分割可能なファイルフォーマットである。下記のようなフォーマットが対象
      • JSON
      • Parquet
      • ORC
    • ファイルサイズ > パラメータ(spark.sql.files.maxPartitionBytes)のときに、ファイル分割されます。
  • 以下のような検証を行い、想定通りの結果になりました。
    シナリオ ファイル数 ファイルフォーマット ファイルサイズ Sparkパラメータ
    spark.sql.files.maxPartitionBytes
    結果
    (パーティション数)
    128MB以下の場合 1 CSV 約1MB 128MB 1
    1ファイル1パーティションである 2 CSV 約1MB 128MB 2
    分割されないフォーマット 1 CSV 約1MB 1KB 1
    分割されるフォーマット 1 Parquet 約1MB 1KB 840
    パラメータの有効性の確認 1 Parquet 約1MB 2KB 420

概要

ファイル分割について

Other Configuration Optionsには、下記の記述があります。

spark.sql.files.maxPartitionBytes

ファイルの読み取り時に 1 つのパーティションにパックする最大バイト数。 この構成は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。

  • パラメータが有効なファイルフォーマットは下記の通り
    • Parquet
    • JSON
    • ORC
  • デフォルトは、134217728 (128 MB)

実装

事前準備

  • S3に、下記のファイルを配置し検証します。
    • parquetとtextファイル(CSV)を配置するディレクトリを2つ用意します。
      image.png
    • textディレクトリ(CSVファイルを1つのみ配置)
      image.png
    • text-2ディレクトリ(CSVファイルを2つ配置)
      image.png
    • Paruetディレクトリ(Parquetファイルを1つ配置)
      image.png
  • Glue Jupyter Notebookで試しています。

シナリオ

下記の検証を行い、パーティション数の確認を行います。

  1. 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
  2. 1MBのCSVファイル2つを読み込んだときのパーティション数→2パーティションになる想定
  3. spark.sql.files.maxPartitionBytesを1024に設定& 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定
  4. spark.sql.files.maxPartitionBytesを1024に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→複数パーティションになる想定
  5. spark.sql.files.maxPartitionBytesを2048に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→上記4の半分になる想定

1. 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定

ファイルが1つのため、想定通り1パーティション

# 1MBファイルを一つ
from pyspark.sql import types as T, functions as F

print(spark.conf.get('spark.sql.files.maxPartitionBytes'))

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://XXXX/text/"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

S3bucket_node1.printSchema()
S3bucket_node1.toDF().rdd.getNumPartitions()

===

1024
root
|-- col0: string

1

2.1MBのCSVファイル2つを読み込んだときのパーティション数→2パーティションになる想定

ファイルが2つのため、想定通り2パーティション

# 1MBファイルを2つ→2パーティション
from pyspark.sql import types as T, functions as F

print(spark.conf.get('spark.sql.files.maxPartitionBytes'))


S3bucket_node2 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://XXXX/text-2/"], "recurse": True},
    transformation_ctx="S3bucket_node2",
)

S3bucket_node2.printSchema()
S3bucket_node2.toDF().rdd.getNumPartitions()

===

1024
root
|-- col0: string

2

3. spark.sql.files.maxPartitionBytesを1024に設定& 1MBのCSVファイルを読み込んだときのパーティション数→1パーティションになる想定

分割不可能なファイルのため、想定通り1パーティション

# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F

# 1024 Byte単位にファイル分割できるか?
spark.conf.set('spark.sql.files.maxPartitionBytes',1024)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))

S3bucket_node3 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://XXXX/text/"], "recurse": True},
    transformation_ctx="S3bucket_node3",
)

S3bucket_node3.printSchema()
S3bucket_node3.toDF().rdd.getNumPartitions()

===
変更後:1024
root
|-- col0: string

1

4. spark.sql.files.maxPartitionBytesを1024に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→複数パーティションになる想定

分割可能なファイルフォーマットで、ファイルサイズ >spark.sql.files.maxPartitionBytesが成り立つため、パーティション数は分割される。

# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F

# 1024 Byte単位にファイル分割できるか?→840パーティション
spark.conf.set('spark.sql.files.maxPartitionBytes',1024)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))

S3bucket_node4 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://XXXX/parquet/"], "recurse": True},
    transformation_ctx="S3bucket_node4",
)

S3bucket_node4.printSchema()
S3bucket_node4.toDF().rdd.getNumPartitions()

===
変更後:1024
root
|-- col0: string

840

5. spark.sql.files.maxPartitionBytesを2048に設定& 1MB弱(840KB)のParquetファイルを読み込んだときのパーティション数→上記4の半分になる想定

4と比較して、パラメータの値を倍にしたため、パーティション数は想定通り半分になる。

# maxPartitionBytesを設定する
from pyspark.sql import types as T, functions as F

# 2048 Byte単位にファイル分割できるか?
spark.conf.set('spark.sql.files.maxPartitionBytes',2048)
print("変更後:{}".format(spark.conf.get('spark.sql.files.maxPartitionBytes')))

S3bucket_node5 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://XXXX/parquet/"], "recurse": True},
    transformation_ctx="S3bucket_node5",
)

S3bucket_node5.printSchema()
S3bucket_node5.toDF().rdd.getNumPartitions()

===

変更後:2048
root
|-- col0: string

420

考察

ファイルフォーマットとファイルサイズにより、Sparkのパーティション数が変わることがわかりました。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0