3
5

More than 1 year has passed since last update.

SparkにおけるpartitionByによるディスク上のパーティショニング

Posted at

Partitioning on Disk with partitionBy - MungingDataの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Sparkライターを用いることで、partitionByによってディスク上のデータをパーティショニングすることができます。パーティション分けされたデータレイクにおいては、いくつかのクエリーが50から100倍高速になるので、パーティショニングは特定のクエリーにおいては重要となります。

パーティション分けされたデータレイクの作成、維持は困難なものです。

このブログ記事では、partitionByの使い方を議論し、ディスク上のプロダクション規模のデータセットをパーティショニングすることにおける課題を説明します。partitionByをより効率的に行えるようにする別のメモリーのパーティショニング戦術も議論します。

特に高いカーディナリティ、あるいは高い偏りを持つパーティションキーを取り扱う際、大規模なデータセットに対してパーティション分けされたデータレイクを作成するためには、この記事でカバーされている概念をマスターする必要があります。

プロダクションレベルのパーティション分けされたレイクをどのように作成するのかに関する詳細な外観については、Writing Beautiful Spark Codeを読むようにしてください。

メモリーのパーティショニング vs. ディスクのパーティショニング

coalesce()repartition()はデータフレームに対するメモリーのパーティションを変更します。

partitionBy()はディスクのフォルダー内にデータを書き込む必要があるかどうかを指定するDataFrameWriterのメソッドです。デフォルトでは、ディスク上のネストされたフォルダーにデータを書き込みません。

メモリーのパーティショニングは、ディスクのパーティショニングとは独立して重要になることが度々あります。適切にディスクにデータを書き込むためには、まず間違いなく最初にメモリー上のデータを再パーティションする必要があります。

シンプルな例

first_namelast_namecountryカラムを持つ以下のCSVファイルがあるものとします。

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

countryをパーティションキーとして、ディスク上のこのデータをパーティショニングしてみましょう。パーティションごとに1ファイルを作成します。

Scala
val path = new java.io.File("./src/main/resources/ss_europe/").getCanonicalPath
val df = spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)
val outputPath = new java.io.File("./tmp/partitioned_lake1/").getCanonicalPath
df
  .repartition(col("country"))
  .write
  .partitionBy("country")
  .parquet(outputPath)

ディスク上のデータは以下のようになります。

partitioned_lake1/
  country=Argentina/
    part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=China/
    part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=Russia/
    part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet

ディスクのパーティションごとに1つのファイルを作成しても、プロダクション規模のサイズのデータセットではうまくいかないでしょう。Chinaパーティションに100GBのデータが含まれるとしたら、単一ファイルにこのようなデータ全てを書き込むことはできないでしょう。

repartition(5)とpartitionBy

partitionByを実行する前に、データのそれぞれの行を別々のメモリーパーティションにするために、repartition(5)を実行して、ディスクへのファイル書き込みにどのようなインパクトがあるのかを見てみましょう。

Scala
val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
  .repartition(5)
  .write
  .partitionBy("country")
  .parquet(outputPath)

ディスク上のファイルは以下のようになります。

partitioned_lake2/
  country=Argentina/
    part-00003-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
  country=China/
    part-00000-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00004-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
  country=Russia/
    part-00001-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00002-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet

partitionByライターは、それぞれのメモリーパーティションごとにディスク上のファイルとして書き出します。書き出されるファイルの最大数は、ユニークな国の数 x メモリーパーティションの数となります。

この例では、3つのユニークな国 x 5つのメモリーパーティションとなり、最大15ファイルが書き出されます(それぞれのメモリーパーティションにアルゼンチン人、中国人、ロシア人が一人づつ含まれていた場合)。この例では、5行のデータのみが含まれているので、5ファイルのみが書き出されます。

repartition(1)とpartitionBy

partitionByによるディスク上のパーティショニングの前に、データを1つのメモリーパーティションに再パーティショニングんした場合、最大3ファイルを書き出すことになります。numMemoryPartitions * numUniqueCountries = max-Numfilesとなります。1 * 3 = 3です。

コードを見てみましょう。

Scala
val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
  .repartition(1)
  .write
  .partitionBy("country")
  .parquet(outputPath)

ディスク上のファイルは以下のようになります。

partitioned_lake3/
  country=Argentina/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
  country=China/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
  country=Russia/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet

パーティションごとの最大ファイル数を用いたデータセットのパーティショニング

Chinaから80人のデータセット、Franceから15人、Cubaから5人分のデータセットを使ってみましょう。データへのリンクはこちらです。

データは以下のようになります。

person_name,person_country
a,China
b,China
c,China
...77 more China rows
a,France
b,France
c,France
...12 more France rows
a,Cuba
b,Cuba
c,Cuba
...2 more Cuba rows

8つのメモリーパーティションを作成し、メモリーパーティションに対してランダムにデータを分散させます(データをディスクに書き出すので、メモリーパーティションの中身を調査することができます)。

Scala
val outputPath = new java.io.File("./tmp/repartition_for_lake4/").getCanonicalPath
df
  .repartition(8, col("person_country"), rand)
  .write
  .csv(outputPath)

出力されたCSVファイルの一つを見てみましょう。

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

このテクニックは、パーティション分けされたレイクを作成する際に、パーティションごとの最大ファイル数を設定する助けになります。ディスクにデータを書き出して、出力を見てみましょう。

val outputPath = new java.io.File("./tmp/partitioned_lake4/").getCanonicalPath
df
  .repartition(8, col("person_country"), rand)
  .write
  .partitionBy("person_country")
  .csv(outputPath)

ディスク上のファイルは以下のようになります。

partitioned_lake4/
  person_country=China/
    part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 6 more files
  person_country=Cuba/
    part-00002-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00003-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 2 more files
  person_country=France/
    part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 5 more files

それぞれのディスクパーティションは最大8ファイルとなります。データは8つの8つのメモリーパーティションにランダムに分割されます。特定の国に対するデータがメモリーパーティションに存在しない場合には、その国のディスクパーティションに対する出力は存在しません。

これは良いものではありますが、まだ理想的なものではありません。Cubaに対して4ファイル、Franceに対して7ファイルがあり、小さなファイルがたくさん作成されてしまっています。

上述したメモリーパーティションの中身を確認してみましょう。

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

partitionByはこの特定のメモリーパーティションを3つのファイルに分割しています:7行のデータを持つChinaファイル、1行のデータを持つFranceファイル、そして、1行データを持つCubaファイルです。

ファイルごとの最大行数を用いたデータセットのパーティショニング

ファイルごとに10行のデータを持つパーティションを作成するコードを書いてみましょう。Chinaに対して8ファイル、Cubaに1ファイル、Franceに2ファイルとして、データを格納したいと考えています。

10行でファイルに出力するようにmaxRecordsPerFileオプションを使うことができます。

Scala
val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)

このテクニックは、パーティションキーに大きな偏りがある場合に特に重要となります。国ごとの住人の数は、大きな偏りを持つパーティションキーの良い例となります。例えば、ジャマイカは300万人ですが、中国は14億人であり、ジャマイカのパーティションよりも中国のパーティションにおいては467倍のファイルがあって欲しいと考えます。

Spark 2.2より前のSparkでのファイルごとの最大行数を用いたデータセットのパーティショニング

maxRecordsPerFileオプションはSpark 2.2で追加されたので、それより以前のバージョンのSparkを使っている場合には、自身でカスタムソリューションを書く必要があります。

Scala
val countDF = df.groupBy("person_country").count()
val desiredRowsPerPartition = 10
val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )
val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)

パーティションキーごとの合計レコード数を計算し、固定数のパーティションに頼るのではなく、my_secret_partition_keyカラムを作成しています。

desiredRowsPerPartitionは1GBファイル程度を提供する数に基づいて選択する必要があります。7.5億レコードを持つ500GBデータセットを持っているとしたら、desiredRowsPerPartitionを1,500,000に設定します。

小さなファイル問題

パーティション分けされたデータレイクは、インクリメンタルに更新を行うと、すぐに小さなファイル問題を引き起こします。パーティション分けされたデータレイクをコンパクトにするのは困難です。ここまでで見てきたように、パーティション分けされたデータレイクを作ることも困難です!

ご自身でパーティション分けされたデータレイクを構築するためにこの記事で説明された先述を使い、小さなファイル問題を引き起こすことなしにスタートしてみてください!

結論

パーティション分けされたデータレイクは大量のデータをスキップできるので(パーティションキーでフィルタリングを行った際に)クエリーが高速になります。

パーティション分けされたデータレイクの作成、維持は困難なものですが、得られるパフォーマンス改善は労力を費やす価値があるものです。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5