DatabricksでのHyperloglog
Hyperloglogは、webサイトのUU集計などでよく使われる確率的アルゴリズムを用いたデータのカーデナリティを推測するものになります。
細かくは、ブレインパッドさんのブログやフライウィールさんのブログに書いていただいていますのでそちらを参照されるのがいいかと思います。
Hyperloglog関連の関数
Databricksでは、hyperloglog関連の関数として下記の関数が用意されています。
Function | Description |
---|---|
approx_count_distinct(expr[,relativeSD]) | グループ内の expr に含まれる明確な値の推定数を返します。 |
approx_percentile(expr,percentage[,accuracy]) | グループ内の expr のおおよそのパーセンタイルを返します。 |
approx_top_k(expr[,k[,maxItemsTracked]]) | expr の中で最も頻繁に出現する上位 k 個の項目の値を、そのおおよその数とともに返します。 |
approx_count_distinctの動作をみてみる
一番使いそうなapprox_count_distinctはこんな感じです。
テストデータセットから、9104511件ほどのuser idを集計してみます。
まずは、通常のdistinct_countです。
6.4秒ですね。
approx_count_distinctです。rsdはdefault 0.05です。
3.81秒です。件数は9312841なので、20万件ほどの差分がでます。
rsdは0.01で設定します。
4.14秒です。件数は9007831なので、10万件ほどの差分がでます。
rsdは0.001で設定します。
4.95秒です。件数は9113218なので、8700件ほどの差分がでます。
rdsを小さくしていくことで、精度はあがりますが、かかる時間が増えていくので注意が必要です。
また、spark公式Documentでは、0.01以下の場合はcountDistinctを使う方ほが効率的だと言っています。
Sketchは・・・?
DatabricksのDefault関数では、あくまで推定値を返すのみのものになっており、sketchを保存して、さらにreaggregationをかけたい場合などには適しません。
そこで、活用できるのがswoop社が公開してくれているspark-alchemyです。
SparkAI summit Europe2019でも登壇されていたように、hyperloglogでの集計をよりやりやすくするために多くのHLL関数を提供してくれます。(SparkやBigqueryよりも多くの関数を提供してくれています)
spark-alchemyの使い方
Clusterへのinstall
MavenからClusterにinstallします。
library importします
import org.apache.spark.sql.functions._
import com.swoop.alchemy.spark.expressions.hll.functions._
サンプルデータフレーム作成します
hll_init_aggでsketchのbinary生成します
spark.range(100000).createOrReplaceTempView("ids")
val df = (spark.range(100000).select(
// exact distinct count
countDistinct('id).as("cntd"),
// Spark's HLL implementation with default 5% precision
approx_count_distinct('id).as("anctd_spark_default"),
// approximate distinct count with default 5% precision
hll_cardinality(hll_init_agg('id)).as("acntd_default"),
// generate sketch binary
hll_init_agg('id).as("hll_binary"),
// approximate distinct counts with custom precision
map(
Seq(0.005, 0.02, 0.05, 0.1).flatMap { error =>
lit(error) :: hll_cardinality(hll_init_agg('id, error)) :: Nil
}: _*
).as("acntd")
))
データフレームみてみる
こんな感じでsktchが保存されるので、再度集計するときにこのskecthをベース集計することで高速化が期待できます。
また、こちらのドキュメントに再集計のtipsや高速化のためのサンプルまで書いていただいているのでぜひ参考にしてみてください。