この記事はSupership Advent Calendar 2018 19日目です。
はじめに
こんにちは。
Supership 株式会社 データビジネス事業部 データサイエンス&エンジニアリング室所属の @hkak03key です!
普段の業務としては、
・分析基盤をより良くするためのPOC
・部署内のシスアド業務
・風紀委員(と言う名のサーバ負荷監視 and コードレビュー/リファクタリング)
を行っております。
好きなキャラはさくらちゃんなのに黒子なのかっていうツッコミはさておき。
さて、弊チームではビッグデータを扱う部署ということで、数億レコード(1日分)以上あるユーザログを処理するSparkのクエリが走ったりします。
そして、対象期間が1日だけではなく1ヶ月、3ヶ月と長期間となる場合が多々あり、その場合は100億行を超えるレコード数になってしまい、処理に時間がかかってしまいます。
ここでは、そのような超巨大データを限られた台数のSparkクラスタで捌く方法を説明します。
起きていた事象
問題のケースでは、
・ユーザログ180億行
・フィルタ用テーブル80万行
を、結合しようとしていました。
df_l = spark.table(table_l)
df_r = spark.table(table_r)
df_joined = df_l.join(df_r, ["user_id"], "inner")
で、その結果がこちら。
r4.2xlarge 4workersで1日回しても終わらないという有様です。なんてこった。
sparkの実行計画をみたところ、「すべてのデータを読み込んでからsortしてmerge joinする」という感じでした。そりゃ重たい。
更に、実行状況についても見てみました。
(本当はsparkのCPU使用率も載せたかったんですが、古いログで見つからず。。)
ただ、一つ言えるのは「1台だけが頑張って処理している状態」でした。
これは、結合する列の値の出現頻度に偏りがある場合に発生し、"skewing"なんて呼んだりします。
これら複合的な要因により、いつまで経っても終わらない処理になってしまっていたと考えられます。
対応策
この時点で、「もっと短い期間であれば完走する」ということはわかっていました。
そこで、 「1日ごとのデータをjoinして結合したテーブルを適当なストレージに保存し、保存したデータを再度読み込むことで後続処理を続行させる」 ことにしました。
やってることとしては何のひねりもない感じですが、これを行ったところ 24時間経っても終わらない処理が2時間以内で完了しました。
これは、クラスタスペックに対して必要十分なデータサイズで処理ができたからであると考えられます。
正直、1日分の結合処理でもskewingは発生するのですが、高々1日分のデータということで許容範囲な時間で処理ができました。
また、アルゴリズムについて、コードで示します:
#-#-#- split join on pyspark -#-#-#
## read DataFrame ##
# 今回は、df_l.count() >>> df_r.count() を仮定している
df_l = spark.table(table_l) # df_lは、未処理(=読み込んだばかり)であることが望ましい
df_r = spark.table(table_r) # df_rは、cacheされていることが望ましい
## params (example) ##
hive_partition_column_name = "date" # 注意: hive partitionとして扱われている列を指定しないと意味がない
hive_partition_column_vals = [20180901, 20180902] # ここは頑張って作る
on = ["user_id"]
how = "inner"
tmp_location = "s3a://temp_backet/temp_path/{}".format(uuid.uuid4())
# tempfileは、処理が終わったら消すこと
## join and write by partition ##
for val in hive_partition_column_vals:
df_temp = df_l.filter( col(hive_partition_column_name) == val ).join(df_r, on, how)
df_temp.write.mode("append").partitionBy(hive_partition_column_name).parquet(tmp_location)
## read joined DataFrame ##
df_joined = spark.read.parquet(tmp_location)
右側のテーブルも巨大な場合 〜両方とも分割したい〜
右側のテーブルも巨大な場合、同じ理屈で分割したいと思うはずです。
「で、それって可能なの?」という話ですが、INNER JOIN
であれば、両方のテーブルを分割しながら結合できます。
一方、 OUTER JOIN
は分割できない場合が多いです。
なぜこのようなことが発生するのでしょうか?
LEFT OUTER JOIN
で考えてみましょう。
OUTER JOIN
の特性から、right_table
に同一キーが存在しないレコードについては left_table
側の値はそのまま保持され、 right_table
側の値には NULL
がはいります。
この特性より、right_table
が分割されてしまうと、分割された right_table_a
では結合したのに right_table_b
では結合しないというパターンが発生し、不要な NULL
行を生じさせてしまいます。
よって、OUTER JOIN
では分割パターンが制限されるということになります。
結合方法と分割可否について、以下の表にまとめました。
結合方法 | left_table | right_table |
---|---|---|
INNER JOIN | 分割可 | 分割可 |
LEFT OUTER JOIN | 分割可 | 分割不可 |
FULL OUTER JOIN | 分割不可 | 分割不可 |
終わりに
巨大なテーブルを分割しながら処理することで、高速なテーブル結合を実現可能であることを説明しました。
しかし、この方法がベストであるとは思ってなく、
「更にクエリを工夫することでまだ速くできるのではないか?」とも思ってますし、
「そもそもsparkですべての処理を行うことが正しいのか?」とも思っております。
そして、特に後者の問いに対して、様々なmiddlewareを検証し、より良い分析環境を求めPoCを回すのが僕の任務となっております。
もし、分析環境の高速化等、エンジニアリングに興味をお持ちの方は一緒に働きましょう!