Python
Spark
DataFrame
Pyspark

Sparkの出力で空ファイルができてしまう時の対処方法

本記事について

Sparkで結果を出力する際、データの入っていない空ファイルが出力されることがあります。本記事ではその空ファイルが出力されないようにする方法について記載します。

空ファイルが出力される原因

Sparkはデータを集計する際、1箇所で処理するのではなく、データをいくつかの固まりに分けて、並列分散処理しています。そして、その固まりをパーティションと呼びます。

集計結果を出力する際、各パーティションが集計して、その結果を各パーティションから出力するのですが、集計の結果、出力するデータが無いパーティションが存在することもあります。

出力するデータが無いパーティションは、出力をしないのではなく、レコードの無い空ファイルを出力します。これが空ファイル発生の原因となります。

ですので、全てのパーティションに集計結果が含まれるよう、パーティション数は、集計結果のレコード件数より十分小さい必要があります。

対処方法

集計結果のレコード数を調べ、その件数と比較して、パーティション数が多すぎるようでしたらパーティション数を減らして再配分します。

下記にPySparkのサンプルを記載します。(ここではレコード件数がパーティション数の4倍より少ない場合はパーティションの調整を行っています)

DataFrame.repartition(<N>)を呼ぶと、指定したパーティション数でDataFrameのデータを再配分してくれます。

num_recodes = df.count()
num_partitions = df.rdd.getNumPartitions()

if num_recodes<=4 :
    num_partitions = 1
elif (num_partitions*4)>num_recodes :
    num_partitions = int(num_recodes/4.0)

if num_partitions != df.rdd.getNumPartitions() :
    df = df.repartition( num_partitions )