25
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

限られたリソース内で行う超巨大データの高速なテーブル結合方法

Last updated at Posted at 2018-12-18

この記事はSupership Advent Calendar 2018 19日目です。

はじめに

こんにちは。
Supership 株式会社 データビジネス事業部 データサイエンス&エンジニアリング室所属の @hkak03key です!

普段の業務としては、
・分析基盤をより良くするためのPOC
・部署内のシスアド業務
風紀委員ジャッジメント(と言う名のサーバ負荷監視 and コードレビュー/リファクタリング)
を行っております。
好きなキャラはさくらちゃんカードキャプターなのに黒子とあるxxなのかっていうツッコミはさておき。

さて、弊チームではビッグデータを扱う部署ということで、数億レコード(1日分)以上あるユーザログを処理するSparkのクエリが走ったりします。

そして、対象期間が1日だけではなく1ヶ月、3ヶ月と長期間となる場合が多々あり、その場合は100億行を超えるレコード数になってしまい、処理に時間がかかってしまいます。

ここでは、そのような超巨大データを限られた台数のSparkクラスタで捌く方法を説明します。

起きていた事象

問題のケースでは、
・ユーザログ180億行
・フィルタ用テーブル80万行
を、結合しようとしていました。

pyspark}
df_l = spark.table(table_l)
df_r = spark.table(table_r)

df_joined = df_l.join(df_r, ["user_id"], "inner")

で、その結果がこちら。
スクリーンショット 2018-12-18 18.02.34.png
r4.2xlarge 4workersで1日回しても終わらないという有様です。なんてこった。

sparkの実行計画をみたところ、「すべてのデータを読み込んでからsortしてmerge joinする」という感じでした。そりゃ重たい。

更に、実行状況についても見てみました。
(本当はsparkのCPU使用率も載せたかったんですが、古いログで見つからず。。)
ただ、一つ言えるのは「1台だけが頑張って処理している状態」でした。
これは、結合する列の値の出現頻度に偏りがある場合に発生し、"skewing"なんて呼んだりします。

これら複合的な要因により、いつまで経っても終わらない処理になってしまっていたと考えられます。

対応策

この時点で、「もっと短い期間であれば完走する」ということはわかっていました。

そこで、 「1日ごとのデータをjoinして結合したテーブルを適当なストレージに保存し、保存したデータを再度読み込むことで後続処理を続行させる」 ことにしました。

その結果がこちら。
スクリーンショット 2018-12-18 19.07.34.png

やってることとしては何のひねりもない感じですが、これを行ったところ 24時間経っても終わらない処理が2時間以内で完了しました
これは、クラスタスペックに対して必要十分なデータサイズで処理ができたからであると考えられます。
正直、1日分の結合処理でもskewingは発生するのですが、高々1日分のデータということで許容範囲な時間で処理ができました。

また、アルゴリズムについて、コードで示します:

pyspark}
#-#-#- split join on pyspark -#-#-#
## read DataFrame ##
# 今回は、df_l.count() >>> df_r.count() を仮定している
df_l = spark.table(table_l) # df_lは、未処理(=読み込んだばかり)であることが望ましい
df_r = spark.table(table_r) # df_rは、cacheされていることが望ましい

## params (example) ##
hive_partition_column_name = "date" # 注意: hive partitionとして扱われている列を指定しないと意味がない
hive_partition_column_vals = [20180901, 20180902] # ここは頑張って作る
on = ["user_id"]
how = "inner"
tmp_location = "s3a://temp_backet/temp_path/{}".format(uuid.uuid4()) 
# tempfileは、処理が終わったら消すこと

## join and write by partition ##
for val in hive_partition_column_vals: 
    df_temp = df_l.filter( col(hive_partition_column_name) == val ).join(df_r, on, how)
    df_temp.write.mode("append").partitionBy(hive_partition_column_name).parquet(tmp_location)

## read joined DataFrame ##
df_joined = spark.read.parquet(tmp_location)

右側のテーブルも巨大な場合 〜両方とも分割したい〜

右側のテーブルも巨大な場合、同じ理屈で分割したいと思うはずです。

「で、それって可能なの?」という話ですが、INNER JOIN であれば、両方のテーブルを分割しながら結合できます。
一方、 OUTER JOIN は分割できない場合が多いです。

なぜこのようなことが発生するのでしょうか?
LEFT OUTER JOIN で考えてみましょう。
OUTER JOIN の特性から、right_table に同一キーが存在しないレコードについては left_table 側の値はそのまま保持され、 right_table 側の値には NULL がはいります。
この特性より、right_table が分割されてしまうと、分割された right_table_a では結合したのに right_table_b では結合しないというパターンが発生し、不要な NULL 行を生じさせてしまいます。

よって、OUTER JOINでは分割パターンが制限されるということになります。

結合方法と分割可否について、以下の表にまとめました。

結合方法 left_table right_table
INNER JOIN 分割可 分割可
LEFT OUTER JOIN 分割可 分割不可
FULL OUTER JOIN 分割不可 分割不可

終わりに

巨大なテーブルを分割しながら処理することで、高速なテーブル結合を実現可能であることを説明しました。

しかし、この方法がベストであるとは思ってなく、
「更にクエリを工夫することでまだ速くできるのではないか?」とも思ってますし、
「そもそもsparkですべての処理を行うことが正しいのか?」とも思っております。

そして、特に後者の問いに対して、様々なmiddlewareを検証し、より良い分析環境を求めPoCを回すのが僕の任務となっております。

もし、分析環境の高速化等、エンジニアリングに興味をお持ちの方は一緒に働きましょう!

25
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?