LoginSignup
5
4

More than 5 years have passed since last update.

PySparkで書き込むファイル数をいい感じにする方法

Last updated at Posted at 2018-12-11

はじめに

この記事はMobingi Advent Calendar 2018の12日目の記事です。

やりたかったこと

csvをparquetに変換する処理をAWS GlueのPython(ほぼPySpark)で実現していたのですが、出力されるファイル数が細かくなりすぎるという問題に悩まされていました。

この時細かすぎるファイルが作られる場合がある
output_df.write.save(path=output_path, format=output_format,
                        mode='overwrite', compression=output_compression)

細かすぎるファイルはパフォーマンス的によろしく無いですし、Athenaでのクエリ実行時などでtoo many open filesといったエラーが出やすくなってしまいます。

そこで出力するファイルサイズをもとに書き込むファイル数をいい感じにコントロールするオプションを探しました。しかしそんな方法はなさそうでした。。。
代わりに書き込むファイル数(厳密にはデータフレームの並列数?)を指定するrepartitionというメソッドがありました。

このメソッドを使えば出力されるファイル数は指定できるけど、データサイズに応じて数を調整できないと、データ量の増加に応じていちいちジョブをチューニングしなければいけなくなります。

解決策

ということで、以下のようなs3上のデータサイズを取得する関数と適切なファイル数を計算する関数を作って適切なpartition数を事前計算するようにしました。

def fetch_total_object_size(bucket, path):
    b = s3.Bucket(bucket)
    total_size = 0
    objects = b.objects.filter(Prefix=path)
    for o in objects:
        total_size += o.size    
    return total_size

def calculate_repartition_num(previous_total_size, tatget_size, default_num=3):
    # roundup
    num = -(-previous_total_size // tatget_size)
    if num == 0:
        return default_num
    return num
GlueScript抜粋
KB = 1024
MB = KB * 1024
TARGET_FILE_SIZE = 32 * MB

previous_size = fetch_total_object_size(bucket, path)
num = calculate_repartition_num(previous_size, TARGET_FILE_SIZE)
output_df.repartition(num).write.save(path=output_path, format=output_format,
                        mode='overwrite', compression=output_compression)

これにより、毎日上書き更新する系のデータの場合は書き込み先のs3パスを参考に、時系列データであれば前日のデータなどを参考にサイズを決定できるようになりました。上の例ではs3をストレージとしていますがHDFSでも似たようなことは容易かと思います。

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4