Edited at
MobingiDay 12

PySparkで書き込むファイル数をいい感じにする方法


はじめに

この記事はMobingi Advent Calendar 2018の12日目の記事です。


やりたかったこと

csvをparquetに変換する処理をAWS GlueのPython(ほぼPySpark)で実現していたのですが、出力されるファイル数が細かくなりすぎるという問題に悩まされていました。


この時細かすぎるファイルが作られる場合がある

output_df.write.save(path=output_path, format=output_format,

mode='overwrite', compression=output_compression)

細かすぎるファイルはパフォーマンス的によろしく無いですし、Athenaでのクエリ実行時などでtoo many open filesといったエラーが出やすくなってしまいます。

そこで出力するファイルサイズをもとに書き込むファイル数をいい感じにコントロールするオプションを探しました。しかしそんな方法はなさそうでした。。。

代わりに書き込むファイル数(厳密にはデータフレームの並列数?)を指定するrepartitionというメソッドがありました。

このメソッドを使えば出力されるファイル数は指定できるけど、データサイズに応じて数を調整できないと、データ量の増加に応じていちいちジョブをチューニングしなければいけなくなります。


解決策

ということで、以下のようなs3上のデータサイズを取得する関数と適切なファイル数を計算する関数を作って適切なpartition数を事前計算するようにしました。

def fetch_total_object_size(bucket, path):

b = s3.Bucket(bucket)
total_size = 0
objects = b.objects.filter(Prefix=path)
for o in objects:
total_size += o.size
return total_size

def calculate_repartition_num(previous_total_size, tatget_size, default_num=3):
# roundup
num = -(-previous_total_size // tatget_size)
if num == 0:
return default_num
return num


GlueScript抜粋

KB = 1024

MB = KB * 1024
TARGET_FILE_SIZE = 32 * MB

previous_size = fetch_total_object_size(bucket, path)
num = calculate_repartition_num(previous_size, TARGET_FILE_SIZE)
output_df.repartition(num).write.save(path=output_path, format=output_format,
mode='overwrite', compression=output_compression)


これにより、毎日上書き更新する系のデータの場合は書き込み先のs3パスを参考に、時系列データであれば前日のデータなどを参考にサイズを決定できるようになりました。上の例ではs3をストレージとしていますがHDFSでも似たようなことは容易かと思います。