Partitioning on Disk with partitionBy - MungingDataの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Sparkライターを用いることで、partitionBy
によってディスク上のデータをパーティショニングすることができます。パーティション分けされたデータレイクにおいては、いくつかのクエリーが50から100倍高速になるので、パーティショニングは特定のクエリーにおいては重要となります。
パーティション分けされたデータレイクの作成、維持は困難なものです。
このブログ記事では、partitionBy
の使い方を議論し、ディスク上のプロダクション規模のデータセットをパーティショニングすることにおける課題を説明します。partitionBy
をより効率的に行えるようにする別のメモリーのパーティショニング戦術も議論します。
特に高いカーディナリティ、あるいは高い偏りを持つパーティションキーを取り扱う際、大規模なデータセットに対してパーティション分けされたデータレイクを作成するためには、この記事でカバーされている概念をマスターする必要があります。
プロダクションレベルのパーティション分けされたレイクをどのように作成するのかに関する詳細な外観については、Writing Beautiful Spark Codeを読むようにしてください。
メモリーのパーティショニング vs. ディスクのパーティショニング
coalesce()
とrepartition()
はデータフレームに対するメモリーのパーティションを変更します。
partitionBy()
はディスクのフォルダー内にデータを書き込む必要があるかどうかを指定するDataFrameWriter
のメソッドです。デフォルトでは、ディスク上のネストされたフォルダーにデータを書き込みません。
メモリーのパーティショニングは、ディスクのパーティショニングとは独立して重要になることが度々あります。適切にディスクにデータを書き込むためには、まず間違いなく最初にメモリー上のデータを再パーティションする必要があります。
シンプルな例
first_name
、last_name
、country
カラムを持つ以下のCSVファイルがあるものとします。
first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
country
をパーティションキーとして、ディスク上のこのデータをパーティショニングしてみましょう。パーティションごとに1ファイルを作成します。
val path = new java.io.File("./src/main/resources/ss_europe/").getCanonicalPath
val df = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
val outputPath = new java.io.File("./tmp/partitioned_lake1/").getCanonicalPath
df
.repartition(col("country"))
.write
.partitionBy("country")
.parquet(outputPath)
ディスク上のデータは以下のようになります。
partitioned_lake1/
country=Argentina/
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
country=China/
part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
country=Russia/
part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
ディスクのパーティションごとに1つのファイルを作成しても、プロダクション規模のサイズのデータセットではうまくいかないでしょう。Chinaパーティションに100GBのデータが含まれるとしたら、単一ファイルにこのようなデータ全てを書き込むことはできないでしょう。
repartition(5)とpartitionBy
partitionBy
を実行する前に、データのそれぞれの行を別々のメモリーパーティションにするために、repartition(5)
を実行して、ディスクへのファイル書き込みにどのようなインパクトがあるのかを見てみましょう。
val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
.repartition(5)
.write
.partitionBy("country")
.parquet(outputPath)
ディスク上のファイルは以下のようになります。
partitioned_lake2/
country=Argentina/
part-00003-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
country=China/
part-00000-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
part-00004-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
country=Russia/
part-00001-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
part-00002-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
partitionBy
ライターは、それぞれのメモリーパーティションごとにディスク上のファイルとして書き出します。書き出されるファイルの最大数は、ユニークな国の数 x メモリーパーティションの数となります。
この例では、3つのユニークな国 x 5つのメモリーパーティションとなり、最大15ファイルが書き出されます(それぞれのメモリーパーティションにアルゼンチン人、中国人、ロシア人が一人づつ含まれていた場合)。この例では、5行のデータのみが含まれているので、5ファイルのみが書き出されます。
repartition(1)とpartitionBy
partitionBy
によるディスク上のパーティショニングの前に、データを1つのメモリーパーティションに再パーティショニングんした場合、最大3ファイルを書き出すことになります。numMemoryPartitions * numUniqueCountries = max-Numfiles
となります。1 * 3 = 3です。
コードを見てみましょう。
val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
.repartition(1)
.write
.partitionBy("country")
.parquet(outputPath)
ディスク上のファイルは以下のようになります。
partitioned_lake3/
country=Argentina/
part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
country=China/
part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
country=Russia/
part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
パーティションごとの最大ファイル数を用いたデータセットのパーティショニング
Chinaから80人のデータセット、Franceから15人、Cubaから5人分のデータセットを使ってみましょう。データへのリンクはこちらです。
データは以下のようになります。
person_name,person_country
a,China
b,China
c,China
...77 more China rows
a,France
b,France
c,France
...12 more France rows
a,Cuba
b,Cuba
c,Cuba
...2 more Cuba rows
8つのメモリーパーティションを作成し、メモリーパーティションに対してランダムにデータを分散させます(データをディスクに書き出すので、メモリーパーティションの中身を調査することができます)。
val outputPath = new java.io.File("./tmp/repartition_for_lake4/").getCanonicalPath
df
.repartition(8, col("person_country"), rand)
.write
.csv(outputPath)
出力されたCSVファイルの一つを見てみましょう。
p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba
このテクニックは、パーティション分けされたレイクを作成する際に、パーティションごとの最大ファイル数を設定する助けになります。ディスクにデータを書き出して、出力を見てみましょう。
val outputPath = new java.io.File("./tmp/partitioned_lake4/").getCanonicalPath
df
.repartition(8, col("person_country"), rand)
.write
.partitionBy("person_country")
.csv(outputPath)
ディスク上のファイルは以下のようになります。
partitioned_lake4/
person_country=China/
part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
... 6 more files
person_country=Cuba/
part-00002-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
part-00003-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
... 2 more files
person_country=France/
part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
... 5 more files
それぞれのディスクパーティションは最大8ファイルとなります。データは8つの8つのメモリーパーティションにランダムに分割されます。特定の国に対するデータがメモリーパーティションに存在しない場合には、その国のディスクパーティションに対する出力は存在しません。
これは良いものではありますが、まだ理想的なものではありません。Cubaに対して4ファイル、Franceに対して7ファイルがあり、小さなファイルがたくさん作成されてしまっています。
上述したメモリーパーティションの中身を確認してみましょう。
p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba
partitionBy
はこの特定のメモリーパーティションを3つのファイルに分割しています:7行のデータを持つChinaファイル、1行のデータを持つFranceファイル、そして、1行データを持つCubaファイルです。
ファイルごとの最大行数を用いたデータセットのパーティショニング
ファイルごとに10行のデータを持つパーティションを作成するコードを書いてみましょう。Chinaに対して8ファイル、Cubaに1ファイル、Franceに2ファイルとして、データを格納したいと考えています。
10行でファイルに出力するようにmaxRecordsPerFile
オプションを使うことができます。
val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
.repartition(col("person_country"))
.write
.option("maxRecordsPerFile", 10)
.partitionBy("person_country")
.csv(outputPath)
このテクニックは、パーティションキーに大きな偏りがある場合に特に重要となります。国ごとの住人の数は、大きな偏りを持つパーティションキーの良い例となります。例えば、ジャマイカは300万人ですが、中国は14億人であり、ジャマイカのパーティションよりも中国のパーティションにおいては467倍のファイルがあって欲しいと考えます。
Spark 2.2より前のSparkでのファイルごとの最大行数を用いたデータセットのパーティショニング
maxRecordsPerFileオプションはSpark 2.2で追加されたので、それより以前のバージョンのSparkを使っている場合には、自身でカスタムソリューションを書く必要があります。
val countDF = df.groupBy("person_country").count()
val desiredRowsPerPartition = 10
val joinedDF = df
.join(countDF, Seq("person_country"))
.withColumn(
"my_secret_partition_key",
(rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
)
val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
.repartition(col("person_country"), col("my_secret_partition_key"))
.drop("count", "my_secret_partition_key")
.write
.partitionBy("person_country")
.csv(outputPath)
パーティションキーごとの合計レコード数を計算し、固定数のパーティションに頼るのではなく、my_secret_partition_key
カラムを作成しています。
desiredRowsPerPartition
は1GBファイル程度を提供する数に基づいて選択する必要があります。7.5億レコードを持つ500GBデータセットを持っているとしたら、desiredRowsPerPartition
を1,500,000に設定します。
小さなファイル問題
パーティション分けされたデータレイクは、インクリメンタルに更新を行うと、すぐに小さなファイル問題を引き起こします。パーティション分けされたデータレイクをコンパクトにするのは困難です。ここまでで見てきたように、パーティション分けされたデータレイクを作ることも困難です!
ご自身でパーティション分けされたデータレイクを構築するためにこの記事で説明された先述を使い、小さなファイル問題を引き起こすことなしにスタートしてみてください!
結論
パーティション分けされたデータレイクは大量のデータをスキップできるので(パーティションキーでフィルタリングを行った際に)クエリーが高速になります。
パーティション分けされたデータレイクの作成、維持は困難なものですが、得られるパフォーマンス改善は労力を費やす価値があるものです。