1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2024

Day 11

Parquetでデータをロードする際にSparkのパーティションがどのように影響を受けるのか

Posted at

こちらの続きです。

ノートブックはこちらで動画はこちらです。

なぜこれが必要なのか?

  • Sparkがどのようにパーティションを作成し、それがどのようにパフォーマンスに影響を与えるかを理解することは、パフォーマンスとデバッグを改善します。
  • これまでに、パーティションの数、空のパーティション、およびパーティション内のデータの分布がパフォーマンスにどのように影響するかを学びました。(以下に以前のコメントを追加しています)
  • Coalesceや特にRepartitionは高コストな操作です。ロード時にすでにSparkのパーティションに影響を与えることができれば、大きなメリットとなります。

0. セットアップ

このノートブックの一般的なヒント:

  • Spark UIはクラスター -> Spark UIでアクセス可能
  • Spark UIの詳細な調査は後のエピソードで行います
  • sc.setJobDescription("Description")はSpark UIのアクションのジョブ説明を独自のものに置き換えます
  • sdf.rdd.getNumPartitions()は現在のSpark DataFrameのパーティション数を返します
  • sdf.write.format("noop").mode("overwrite").save()は、実際の書き込み中に副作用なしで変換を分析および開始するための良い方法です
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import gresearch.spark.parquet
import math
import time

Spark拡張はクラスターライブラリとしてインストールしておきます。

Screenshot 2024-12-09 at 17.25.21.png

# AQEをオフにする。ここではより多くのジョブが生成され、混乱を招く可能性があるため。
spark.conf.set("spark.sql.adaptive.enabled", "false")
# データフレームをキャッシュしないようにする。これにより、結果が再現可能でない場合がある。
spark.conf.set("spark.databricks.io.cache.enabled", "false")

行数とパーティション数を指定してSparkデータフレームを作成する関数。

def sdf_generator(num_rows: int, num_partitions: int = None) -> "DataFrame":
    return (
        spark.range(num_rows, numPartitions=num_partitions)
        .withColumn("date", f.current_date())
        .withColumn("timestamp",f.current_timestamp())
        .withColumn("idstring", f.col("id").cast("string"))
        .withColumn("idfirst", f.col("idstring").substr(0,1))
        .withColumn("idlast", f.col("idstring").substr(-1,1))
        )

パーティションの分布を表示する関数。

def rows_per_partition(sdf: "DataFrame") -> None:
    num_rows = sdf.count()
    sdf_part = sdf.withColumn("partition_id", f.spark_partition_id())
    sdf_part_count = sdf_part.groupBy("partition_id").count()
    sdf_part_count = sdf_part_count.withColumn("count_perc", 100*f.col("count")/num_rows)
    sdf_part_count.orderBy("partition_id").show()

パーティションと特定列の値ごとの分布を表示する関数。

def rows_per_partition_col(sdf: "DataFrame", num_rows: int, col: str) -> None:
    sdf_part = sdf.withColumn("partition_id", f.spark_partition_id())
    sdf_part_count = sdf_part.groupBy("partition_id", col).count()
    sdf_part_count = sdf_part_count.withColumn("count_perc", 100*f.col("count")/num_rows)
    sdf_part_count.orderBy("partition_id", col).show()
#BASE_DIR = "/Volumes/takaakiyayoi_catalog/spark/data" # Volume(Managed Storage)ではファイル操作系の関数は動きません。
BASE_DIR = "/FileStore/takaaki.yayoi/data" # DBFSを使います
results_dict = {}
results_list = []
dbutils.fs.rm(BASE_DIR, True)

指定された行数、パーティション数(ファイル数)でデータフレームを書き出す関数。

def write_generator(num_rows, num_files):
    sdf = sdf_generator(num_rows, num_files)
    path = f"{BASE_DIR}/{num_files}_files_{num_rows}_rows.parquet"
    sc.setJobDescription(f"Write {num_files} files, {num_rows} rows")
    sdf.write.format("parquet").mode("overwrite").save(path)
    sc.setJobDescription("None")
    print(f"Num partitions written: {sdf.rdd.getNumPartitions()}")
    print(f"Saved Path: {path}")
    return path

Spark Configを設定する関数。

def set_configs(maxPartitionsMB = 128, openCostInMB = 4, minPartitions = 4):
    maxPartitionsBytes = math.ceil(maxPartitionsMB*1024*1024)
    openCostInBytes = math.ceil(openCostInMB*1024*1024)
    spark.conf.set("spark.sql.files.maxPartitionBytes", str(maxPartitionsBytes)+"b")
    spark.conf.set("spark.sql.files.openCostInBytes", str(openCostInBytes)+"b")
    spark.conf.set("spark.sql.files.minPartitionNum", str(minPartitions))
    print(" ")
    print("******** SPARK CONFIGURATIONS ********")
    print(f"MaxPartitionSize {maxPartitionsMB} MB or {maxPartitionsBytes} bytes")
    print(f"OpenCostInBytes {openCostInMB} MB or {openCostInBytes} bytes")
    print(f"Min Partitions: {minPartitions}")

    results_dict["maxPartitionsBytes"] = maxPartitionsMB

Parquetのメタデータを取得する関数。

def get_parquet_meta_data(path):
    sdf = (
        spark.read.parquet_metadata(path)
        .select("filename", "blocks", "compressedBytes", "rows")
        .dropDuplicates(["filename"])
        .withColumn("compressedMB", f.round(f.col("compressedBytes")/1024/1024, 1))
        .withColumn("calcNumBlocks", f.col("compressedMB")/128)
    )
    sdf.show(20, truncate=False)
def get_parquet_blocks(path):
    sdf = (
        spark.read.parquet_blocks(path)
        .dropDuplicates(["filename","block"])
        .orderBy("filename", "block")
        .withColumn("blockEnd", f.col("blockStart") + f.col("compressedBytes") - 1)
        .withColumn("blockMiddle", f.col("blockStart") + 0.5 * f.col("compressedBytes"))
        .withColumn("compressedMB", f.round(f.col("compressedBytes")/1024/1024, 1))
        .select("filename", "block", "blockStart", "blockEnd", "blockMiddle", "compressedBytes", "compressedMB", "rows")
    )

    sdf.show(20, truncate=False)
def get_spark_partitions(path):
    sdf = (
        spark.read.parquet_partitions(path)
        .withColumn("compressedMB", f.round(f.col("compressedBytes")/1024/1024, 1))
        .select("partition", "start", "end", "length", "blocks", "compressedBytes", "compressedMB", "rows", "filename")
    )

    sdf.show(20, truncate=False)
def get_parquet_window_length(path):
    sdf = spark.read.parquet_partitions(path)
    val = sdf.select(f.max(sdf["length"]))
    max_length = val.collect()[0][0]
    print(f"Max Parquet window length: {round(max_length/1024/1024, 1)} MB or {max_length} bytes")
def get_parquet_file_size(path):
    sdf = (
        spark.read.parquet_metadata(path)
        .select("filename", "blocks", "compressedBytes", "rows")
        .dropDuplicates(["filename"])
    )
    sum = sdf.select(f.sum(sdf["compressedBytes"]))
    size = sum.collect()[0][0]
    return size
def round_half_up(n, decimals=0):
    multiplier = 10**decimals
    return math.floor(n * multiplier + 0.5) / multiplier

#source: https://realpython.com/python-rounding/
def estimate_num_partitions(file_size, num_files):
    """
    参考コード:
    - Stackoverflow: https://stackoverflow.com/questions/70985235/what-is-opencostinbytes
    - GitHub: https://github.com/apache/spark/blob/v3.3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala#L86-L97
    """
    # Spark設定の値を取得
    # ファイルを読み込む際に単一のパーティションにまとめる際の最大バイト数
    maxPartitionBytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes")[:-1])
    # ファイルをオープンする際の推定コスト。同時にスキャンできるバイト数 
    openCostInBytes = int(spark.conf.get("spark.sql.files.openCostInBytes")[:-1])
    # ファイルのパーティションを分割する際の推奨(保証はされません)最小分割数 
    minPartitionNum = int(spark.conf.get("spark.sql.files.minPartitionNum"))

    # maxSplitPartitionBytesを計算
    # a) ファイルサイズが大きい場合、bytesPerCorePadded は openCostInBytes よりも大きく、maxPartitionBytes よりも大きくなります。
    # この場合、maxSplitPartitionBytes は maxPartitionBytes でサイズを制限します。例: 1 GBのデータセット、4コア、最大パーティションサイズ 128 MB
    # b) maxSplitPartitionBytes の結果が bytesPerCorePadded である場合、すべてのコアにデータが公平に分割されます。例: 1 GBのデータセット、4コア、最大パーティションサイズ 300 MB
    # c) bytesPerCore が小さすぎる場合、開くパーティションの数を制限したいです。これがコストです。

    # 各ファイルにコストを加えるパディングを行う
    paddedFileSize = file_size + num_files * openCostInBytes
    # パディングされたサイズをコアごとに均等に分配
    bytesPerCorePadded = paddedFileSize / minPartitionNum
    
    # maxSplitPartitionBytes は maxPartitionBytes と (openCostInBytes と bytesPerCorePaddedの大きい方) とで小さい方
    maxSplitPartitionBytes = min(maxPartitionBytes, max(openCostInBytes, bytesPerCorePadded))
    # インターネットの情報に基づくパーティション数の推定
    estimated_num_partitions_int = paddedFileSize / maxSplitPartitionBytes

    # 個々のファイルサイズに対する独自の推定 (パディングしたファイルサイズをファイル数で割る)
    avg_file_size_padded = paddedFileSize / num_files
    bytesPerCore = file_size / minPartitionNum
    # 1つのパーティションに収まるファイル数を計算。その後、パーティション数を計算
    files_per_partition = max(1, math.floor(maxSplitPartitionBytes / avg_file_size_padded))
    estimated_num_partitions = num_files / files_per_partition

    print(" ")
    print("******** MAX SPLIT PARTITION BYTESとパーティション数の推定 ********")
    print(f"avg_file_size_padded(paddedFileSize / num_files) ファイルごとの平均サイズ(パディング後): {round(avg_file_size_padded / 1024 / 1024, 1)} MB または {avg_file_size_padded} バイト")
    print(f"paddedFileSize(file_size + num_files * openCostInBytes) パディング後のファイルサイズ: {round(paddedFileSize / 1024 / 1024, 1)} MB または {paddedFileSize} バイト")
    print(f"bytesPerCore(file_size / minPartitionNum) コアごとのサイズ: {round(bytesPerCore / 1024 / 1024, 1)} MB または {bytesPerCore} バイト")
    print(f"bytesPerCorePadded(paddedFileSize / minPartitionNum) コアごとのサイズ(パディング後): {round(bytesPerCorePadded / 1024 / 1024, 1)} MB または {bytesPerCorePadded} バイト")
    print(" ")
    print(f"maxSplitPartitionBytes 最大分割パーティションサイズ: {round(maxSplitPartitionBytes / 1024 / 1024, 1)} MB または {maxSplitPartitionBytes} バイト")
    print(f"estimated_num_partitions_int(paddedFileSize / maxSplitPartitionBytes) インターネットの情報に基づく推定パーティション数: {math.ceil(estimated_num_partitions_int)}, 切り上げ前: {estimated_num_partitions_int}")
    print(" ")
    print(f"files_per_partition パーティションごとの最大ファイル数: {files_per_partition}")
    print(f"estimated_num_partitions 独自に推定したパーティション数: {math.ceil(estimated_num_partitions)}, 切り上げ前: {estimated_num_partitions}")

    results_dict["paddedFileSize"] = round(paddedFileSize / 1024 / 1024, 1)
    results_dict["MBPerCore"] = round(bytesPerCore / 1024 / 1024, 1)
    results_dict["MBPerCorePadded"] = round(bytesPerCorePadded / 1024 / 1024, 1)
    results_dict["maxSplitPartitionBytes"] = round(maxSplitPartitionBytes / 1024 / 1024, 1)
    results_dict["avg_file_size_padded"] = round(avg_file_size_padded / 1024 / 1024, 1)
    results_dict["Maxfiles_per_partition"] = files_per_partition
    results_dict["MyEstimationPartitions"] = math.ceil(estimated_num_partitions)
    results_dict["InternetEstimationPartitions"] = math.ceil(estimated_num_partitions_int)

パーティションごとのバイト数、行数の集計

def bytes_rows_per_partition(path):
    sdf = (
        spark.read.parquet_partitions(path)
        .groupBy("partition").agg(f.sum("compressedBytes"), f.sum("rows"), f.count("partition"))
        .withColumnRenamed("sum(compressedBytes)", "compressedBytes")
        .withColumnRenamed("sum(rows)", "rows")
        .withColumnRenamed("count(partition)", "numFiles")
        .withColumn("compressedMB", f.round(f.col("compressedBytes")/1024/1024, 1))
        .select("partition", "numFiles", "compressedBytes","compressedMB","rows")
        .orderBy("partition")
    )
    sdf.show(20)
    return sdf
def avg_bytes_rows_partition(sdf):
    sdf = (
        sdf.select(f.mean("numFiles"), f.mean("compressedBytes"), f.mean("rows"))
        .withColumn("avg(compressedMB)", f.round(f.col("avg(compressedBytes)")/1024/1204, 1))
        .select("avg(numFiles)", "avg(compressedBytes)", "avg(compressedMB)", "avg(rows)")
    )
    sdf.show()
def file_analysis(path, num_files):
    file_size = get_parquet_file_size(path)
    avg_file_size = file_size/num_files
    print(" ")
    print("******** ファイルサイズの分析 ********")
    print(f"ファイルサイズ: {round(file_size/1024/1024, 1)} MB or {file_size} bytes")
    print(f"ファイル数: {num_files}")
    print(f"平均ファイルサイズ: {round(avg_file_size/1024/1024, 1)} MB or {avg_file_size} bytes")
def row_count_analysis(num_files, num_rows):
    print(" ")
    print("******** 行数の分析 ********")    
    print(f"書き込みファイル数: {num_files}")
    print(f"書き込み行数: {num_rows}")
    print(f"ファイルあたりの行数: {int(num_rows/num_files)}")
def get_actual_num_partitions(path):
    sdf = spark.read.parquet(path)
    print(" ")
    print("******** 実際の結果 ********")   
    print(f"実際のパーティション数: {sdf.rdd.getNumPartitions()}")
    results_dict["ActualNumPartitions"] = sdf.rdd.getNumPartitions()

pathのデータを読み込んで、noopで書き込み処理

def noop_write(path):
    sdf = spark.read.parquet(path)
    sc.setJobDescription("WRITE")
    start_time = time.time()
    sdf.write.format("noop").mode("overwrite").save()
    end_time = time.time()
    sc.setJobDescription("None")
    duration = round(end_time - start_time, 2)
    results_dict["ExecutionTime"] = duration
    print(f"Duration: {duration} sec")

1. Parquetファイルを読み込む際のパーティションの数に影響を与える要因

  • クラスター内のコア数
    • 正確には "spark.sql.files.minPartitionNum" 設定
    • デフォルトはデフォルトの並列度で、これはコア数 = 4
  • ファイルサイズまたは推定ファイルサイズ
  • Parquetファイルの数
  • Parquetファイル内のブロック/行グループの数
  • 最大パーティションサイズ:
    • パーティションのサイズに影響を与える
    • 設定 "spark.sql.files.maxPartitionBytes" に基づく
    • デフォルトは128 MB
  • バイトあたりの最大コスト
    • 新しいパーティションを作成するコストを表す
    • 設定 "spark.sql.files.openCostInBytes" に基づく
    • デフォルトは 4 MB
    • 技術的には、各ファイルにコスト(例:4 MB)を追加することをパディングと呼ぶ
    • これにより、オープンコスト程度のサイズの、少数で大きなパーティションが作成される
    • 通常は影響なし、小さいファイルを除いて、4MBのデフォルトが機能する
    • 公式の説明: ファイルを開くための推定コストで、同じ時間にスキャンできるバイト数で測定される。これは複数のファイルをパーティションに入れるときに使用される。過大評価する方が良い、小さいファイルのパーティションは大きいファイルのパーティション(最初にスケジュールされる)よりも速くなる。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効。

参考文献:

2. まとめ: なぜこれが必要なのか?

  • Sparkがどのようにパーティショニングを作成し、それがパフォーマンスにどのような影響を与えるかを理解することで、パフォーマンスの向上やデバッグが容易になります。
  • パーティションの数、空のパーティション、およびパーティション内のデータの分布がパフォーマンスにどのように影響するかを学びました(以下に以前のコメントを追加しました)。
  • Coalesceや特にRepartitionは高コストな操作です。読み込み時にSparkのパーティションに影響を与えることができれば、大きなメリットとなります。

3. 読み込みのまとめ: パーティションがパフォーマンスに与える影響

最も重要なことは、良い並列化を実現することです。

  • これは、パーティションの数が常に利用可能なコアの数に依存するべきであることを意味します。Sparkの言語では: spark.sparkContext.defaultParallelism。推奨されるのは2-4倍の係数ですが、実際にはメモリとデータサイズに依存します。小さなデータサイズでは1倍の係数で完璧に動作します。
  • 良い並列化を実現するためには、均等(最良は均一、最悪でも正規分布)に分布したデータセットを持つべきです。データの偏りは、狭い変換でも1つのパーティションやタスクに依存することがあり、全体の実行が遅くなることがあります。

パーティションサイズ

  • パーティションサイズが非常に大きい場合(> 1GB)、メモリ不足(OOM)、ガベージコレクション(GC)やその他のエラーが発生する可能性があります。
  • インターネット上の推奨では、100-1000 MBの範囲が良いとされています。Sparkは最大パーティションバイト数のパラメータを128 MBに設定しています。もちろん、これはマシンと利用可能なメモリに依存します。利用可能なメモリの限界に近づかないようにしましょう。

分散オーバーヘッド

  • 前の実験で見たように、パーティションの数が多すぎると、スケジューリングと分散のオーバーヘッドが増加します。
  • 実行時間が全タスク時間の少なくとも90%を占めていない場合や、タスクが100 ms未満の場合は、通常短すぎます。

こちらも参照してください: https://stackoverflow.com/questions/64600212/how-to-determine-the-partition-size-in-an-apache-spark-dataframe

4. 基本的なアルゴリズム

# 基本的なアルゴリズム
def basic_algorithm(file_size):
    maxPartitionBytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes")[:-1])    
    minPartitionNum = int(spark.conf.get("spark.sql.files.minPartitionNum"))
    size_per_core = file_size/minPartitionNum # コアあたりのサイズ
    partition_size = min(maxPartitionBytes, size_per_core) # コアあたりのサイズか maxPartitionBytes の小さい方
    no_partitions = file_size/partition_size # 切り上げ前の値
    
    print(" ")
    print("******** 基本アルゴリズムによるパーティション数の推定 ********")
    print(f"ファイルサイズ: {round(file_size/1024/1024, 1)} MB または {file_size} バイト")
    print(f"コアごとのサイズ: {round(size_per_core/1024/1024, 1)} MB または {size_per_core} バイト")
    print(f"パーティションサイズ: {round(partition_size/1024/1024, 1)} MB または {partition_size} バイト")
    print(f"推定パーティション数: {math.ceil(no_partitions)}, 切り上げ前: {no_partitions}")

#参考: https://www.linkedin.com/pulse/how-initial-number-partitions-determined-pyspark-sugumar-srinivasan#:~:text=Ideally%20partitions%20will%20be%20created,resource%20will%20get%20utilised%20properly

基本的なアルゴリズムによる推定

file_size = 200
set_configs(maxPartitionsMB=200000, openCostInMB=4, minPartitions=4)
basic_algorithm(file_size*1024*1024)
 
******** SPARK CONFIGURATIONS ********
MaxPartitionSize 200000 MB or 209715200000 bytes
OpenCostInBytes 4 MB or 4194304 bytes
Min Partitions: 4
 
******** 基本アルゴリズムによるパーティション数の推定 ********
ファイルサイズ: 200.0 MB または 209715200 バイト
コアごとのサイズ: 50.0 MB または 52428800.0 バイト
パーティションサイズ: 50.0 MB または 52428800.0 バイト
推定パーティション数: 4, 切り上げ前: 4.0

5. 簡単な実験

  • 実験 1: 4 ファイル, 各 64.8 MB, 合計 259.3 MB
  • 実験 2: 8 ファイル, 各 64.9 MB, 合計 518.9 MB
  • 実験 3: 8 ファイル, 各 47.5 MB, 合計 380 MB
spark.sparkContext.defaultParallelism
4
num_files = 4
num_rows = 32000000
path = write_generator(num_rows, num_files)
Num partitions written: 4
Saved Path: /FileStore/takaaki.yayoi/data/4_files_32000000_rows.parquet

書き込まれたファイルの分析

num_files = 4
num_rows = 32000000
# ファイルサイズの分析
file_analysis(path, num_files)
# 行数の分析
row_count_analysis(num_files, num_rows)
# Spark Confの変更
set_configs(maxPartitionsMB=50, openCostInMB=4, minPartitions=4)
# ファイルのメタデータの読み取り
size = get_parquet_file_size(path)

# 基本的なアルゴリズムで推定したパーティション数
basic_algorithm(size)
# インターネットの情報に基づいて推定したパーティション数
estimate_num_partitions(size, num_files)

# 実際の値の確認
get_actual_num_partitions(path)
# noopでの書き込み(実際には書き込みません)
noop_write(path)
# パーティションごとの分布を表示
 
******** ファイルサイズの分析 ********
ファイルサイズ: 259.3 MB or 271863687 bytes
ファイル数: 4
平均ファイルサイズ: 64.8 MB or 67965921.75 bytes
 
******** 行数の分析 ********
書き込みファイル数: 4
書き込み行数: 32000000
ファイルあたりの行数: 8000000
 
******** SPARK CONFIGURATIONS ********
MaxPartitionSize 50 MB or 52428800 bytes
OpenCostInBytes 4 MB or 4194304 bytes
Min Partitions: 4
 
******** 基本アルゴリズムによるパーティション数の推定 ********
ファイルサイズ: 259.3 MB または 271863687 バイト
コアごとのサイズ: 64.8 MB または 67965921.75 バイト
パーティションサイズ: 50.0 MB または 52428800 バイト
推定パーティション数: 6, 切り上げ前: 5.1853883171081545
 
******** MAX SPLIT PARTITION BYTESとパーティション数の推定 ********
avg_file_size_padded(paddedFileSize / num_files) ファイルごとの平均サイズ(パディング後): 68.8 MB または 72160225.75 バイト
paddedFileSize(file_size + num_files * openCostInBytes) パディング後のファイルサイズ: 275.3 MB または 288640903 バイト
bytesPerCore(file_size / minPartitionNum) コアごとのサイズ: 64.8 MB または 67965921.75 バイト
bytesPerCorePadded(paddedFileSize / minPartitionNum) コアごとのサイズ(パディング後): 68.8 MB または 72160225.75 バイト
 
maxSplitPartitionBytes 最大分割パーティションサイズ: 50.0 MB または 52428800 バイト
estimated_num_partitions_int(paddedFileSize / maxSplitPartitionBytes) インターネットの情報に基づく推定パーティション数: 6, 切り上げ前: 5.505388317108154
 
files_per_partition パーティションごとの最大ファイル数: 1
estimated_num_partitions 独自に推定したパーティション数: 4, 切り上げ前: 4.0
 
******** 実際の結果 ********
実際のパーティション数: 6
Duration: 3.59 sec
+---------+--------+---------------+------------+-------+
|partition|numFiles|compressedBytes|compressedMB|   rows|
+---------+--------+---------------+------------+-------+
|        0|       1|       67726804|        64.6|8000000|
|        1|       1|       68008197|        64.9|8000000|
|        2|       1|       68064946|        64.9|8000000|
|        3|       1|       68063740|        64.9|8000000|
|        4|       2|              0|         0.0|      0|
|        5|       2|              0|         0.0|      0|
+---------+--------+---------------+------------+-------+
num_files = 8
num_rows = 64000000
path = write_generator(num_rows, num_files)
path = f"{BASE_DIR}/8_files_64000000_rows.parquet"
num_files = 8
num_rows = 64000000
file_analysis(path, num_files)
row_count_analysis(num_files, num_rows)
set_configs(maxPartitionsMB=130, openCostInMB=4, minPartitions=4)
size = get_parquet_file_size(path)
basic_algorithm(size)
estimate_num_partitions(size, num_files)
get_actual_num_partitions(path)
noop_write(path)
bytes_rows_per_partition(path)
 
******** ファイルサイズの分析 ********
ファイルサイズ: 518.9 MB or 544117191 bytes
ファイル数: 8
平均ファイルサイズ: 64.9 MB or 68014648.875 bytes
 
******** 行数の分析 ********
書き込みファイル数: 8
書き込み行数: 64000000
ファイルあたりの行数: 8000000
 
******** SPARK CONFIGURATIONS ********
MaxPartitionSize 130 MB or 136314880 bytes
OpenCostInBytes 4 MB or 4194304 bytes
Min Partitions: 4
 
******** 基本アルゴリズムによるパーティション数の推定 ********
ファイルサイズ: 518.9 MB または 544117191 バイト
コアごとのサイズ: 129.7 MB または 136029297.75 バイト
パーティションサイズ: 129.7 MB または 136029297.75 バイト
推定パーティション数: 4, 切り上げ前: 4.0
 
******** MAX SPLIT PARTITION BYTESとパーティション数の推定 ********
avg_file_size_padded(paddedFileSize / num_files) ファイルごとの平均サイズ(パディング後): 68.9 MB または 72208952.875 バイト
paddedFileSize(file_size + num_files * openCostInBytes) パディング後のファイルサイズ: 550.9 MB または 577671623 バイト
bytesPerCore(file_size / minPartitionNum) コアごとのサイズ: 129.7 MB または 136029297.75 バイト
bytesPerCorePadded(paddedFileSize / minPartitionNum) コアごとのサイズ(パディング後): 137.7 MB または 144417905.75 バイト
 
maxSplitPartitionBytes 最大分割パーティションサイズ: 130.0 MB または 136314880 バイト
estimated_num_partitions_int(paddedFileSize / maxSplitPartitionBytes) インターネットの情報に基づく推定パーティション数: 5, 切り上げ前: 4.237773770552415
 
files_per_partition パーティションごとの最大ファイル数: 1
estimated_num_partitions 独自に推定したパーティション数: 8, 切り上げ前: 8.0
 
******** 実際の結果 ********
実際のパーティション数: 8
Duration: 6.87 sec
+---------+--------+---------------+------------+-------+
|partition|numFiles|compressedBytes|compressedMB|   rows|
+---------+--------+---------------+------------+-------+
|        0|       1|       68064945|        64.9|8000000|
|        1|       1|       68063739|        64.9|8000000|
|        2|       1|       68063633|        64.9|8000000|
|        3|       1|       68063526|        64.9|8000000|
|        4|       1|       68063233|        64.9|8000000|
|        5|       1|       68063116|        64.9|8000000|
|        6|       1|       68008196|        64.9|8000000|
|        7|       1|       67726803|        64.6|8000000|
+---------+--------+---------------+------------+-------+
num_files = 8
num_rows = 47000000
path = write_generator(num_rows, num_files)
Num partitions written: 8
Saved Path: /FileStore/takaaki.yayoi/data/8_files_47000000_rows.parquet
num_files = 8
num_rows = 47000000
path = f"{BASE_DIR}/8_files_47000000_rows.parquet"
file_analysis(path, num_files)
row_count_analysis(num_files, num_rows)
set_configs(maxPartitionsMB=104, openCostInMB=4, minPartitions=4)
size = get_parquet_file_size(path)
basic_algorithm(size)
estimate_num_partitions(size, num_files)
get_actual_num_partitions(path)
noop_write(path)
bytes_rows_per_partition(path)
 
******** ファイルサイズの分析 ********
ファイルサイズ: 380.0 MB or 398415685 bytes
ファイル数: 8
平均ファイルサイズ: 47.5 MB or 49801960.625 bytes
 
******** 行数の分析 ********
書き込みファイル数: 8
書き込み行数: 47000000
ファイルあたりの行数: 5875000
 
******** SPARK CONFIGURATIONS ********
MaxPartitionSize 104 MB or 109051904 bytes
OpenCostInBytes 4 MB or 4194304 bytes
Min Partitions: 4
 
******** 基本アルゴリズムによるパーティション数の推定 ********
ファイルサイズ: 380.0 MB または 398415685 バイト
コアごとのサイズ: 95.0 MB または 99603921.25 バイト
パーティションサイズ: 95.0 MB または 99603921.25 バイト
推定パーティション数: 4, 切り上げ前: 4.0
 
******** MAX SPLIT PARTITION BYTESとパーティション数の推定 ********
avg_file_size_padded(paddedFileSize / num_files) ファイルごとの平均サイズ(パディング後): 51.5 MB または 53996264.625 バイト
paddedFileSize(file_size + num_files * openCostInBytes) パディング後のファイルサイズ: 412.0 MB または 431970117 バイト
bytesPerCore(file_size / minPartitionNum) コアごとのサイズ: 95.0 MB または 99603921.25 バイト
bytesPerCorePadded(paddedFileSize / minPartitionNum) コアごとのサイズ(パディング後): 103.0 MB または 107992529.25 バイト
 
maxSplitPartitionBytes 最大分割パーティションサイズ: 103.0 MB または 107992529.25 バイト
estimated_num_partitions_int(paddedFileSize / maxSplitPartitionBytes) インターネットの情報に基づく推定パーティション数: 4, 切り上げ前: 4.0
 
files_per_partition パーティションごとの最大ファイル数: 2
estimated_num_partitions 独自に推定したパーティション数: 4, 切り上げ前: 4.0
 
******** 実際の結果 ********
実際のパーティション数: 4
Duration: 5.04 sec
+---------+--------+---------------+------------+--------+
|partition|numFiles|compressedBytes|compressedMB|    rows|
+---------+--------+---------------+------------+--------+
|        0|       2|       99956574|        95.3|11750000|
|        1|       2|       99707361|        95.1|11750000|
|        2|       2|       99455919|        94.8|11750000|
|        3|       2|       99295831|        94.7|11750000|
+---------+--------+---------------+------------+--------+

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?