この記事はRetty Inc. Advent Calendar 2017 2日目です。
昨日は @bokeneko さんによる
「うわっ…私のリコメンド、ビール多すぎ…?」doc2vecの拡張とリコメンドシステムへの応用 でした。
ということで、みなさん こんにちは。
RettyでSoftware Engineerをやってます@tkngueです。
Retty においては日々の分析業務〜データサービス開発の範囲で
BigQueryが利用されています
僕は使い始めてから、およそ半年というところでBigQueryの知見も溜まってきたところで
せっかくなのでベストプラクティスを本稿においてまとめてみたいと思います。
Disclaimer:
- BigQueryの基本的な構文の説明はしません ( クエリリファレンス を参照ください)
- Standard SQL を利用した BigQueryの説明です。Legacy SQLは解説の対象としません
クエリのパフォーマンスチューニングを行う
BigQueryは性質上、大規模なデータ処理を非常に高速でお手軽に終わらせられます。
一方で、ちょっと複雑なクエリ書けば5分以上になることも珍しくないため
時間節約の観点から最適なクエリを書けることが望ましいです。
この点から純粋に起動時間を短くするチューニングに関するBest Practicesは
公式(英語)でまとめられていますので、そちらを参考するのが一番です。
クエリのスキャン量をチューニングする
BigQueryはデータのスキャン量に応じた課金システムとなっているため
BigQueryを利用するSQLは高速に動くだけでなく、利用するデータ量も
最低限に抑えるような書き方が必要になってきます。
BigQuery の Web UI では max billing bytes により
スキャンするデータ量の上限をあらかじめ設定しておくことができるため
クエリを書くときにはこれを設定しておきましょう。
データ量を抑えるための基本方針は、だいたい次の通りになります。
- 利用する列数を少なくする
- _PARTITIONTIME を利用したフィルタリング
- キャッシュを再利用する(別テーブルに出力し、それを利用する)
- Partitioned tableを活用する
特に大事なのが、2 の _PARTITIONTIME による最適化になります.。
Legacy SQLにおいては, テーブルを日付ごとまたは用途に応じて
テーブルに分けてサイズを小さくすることで, スキャン量を抑えていました。
しかし、複数テーブルをのちにワイルドカードを利用して結合する方式は、
擬似列を利用する場合と比べてパフォーマンスが低下するため使えるならば
日付分割テーブル を使いましょう
日付分割テーブルは 擬似列(_PARTITIONTIME) に対して評価式を書くことで スキャン量を抑えます。
この評価式は 例えば奇数月、曜日ごとなどのといったデータの取り方ができるため
より柔軟にスキャン量を抑えたクエリを書くことができます。
ビュー に 擬似列 (_PARTITIONTIME
) を つくる
擬似列はクエリの高速化においても有効なため 積極的に使っていくのですが
便利な分析のプリセットとしてビューを作るのですが_PARTITIONTIME
という
列名がカラム名として使えません。
じゃあビューで使えないの?といわれると、そうではなく
_PARTITIONTIMEを別名で保存してやることで 列名に確保ができ
このカラムに条件を課すことで、スキャン量の最適化が可能です。
#standard SQL
WITH
BASE AS (
SELECT
_PARTITIONTIME as __PARTITIONTIME
, #...
FROM
mytable
)
SELECT
FROM
BASE
WHERE
__PARTITIONTIME = TIMESTAMP("2017-10-01") -- 擬似列からスキャン量が抑えられる
_PARTITIONTIME
によるデータのフィルタリングは、クエリの解析後かつクエリの
実行前に行われるため
GROUP BY などの集計処理をした後においてもスキャン量の押さえ込みが可能です。
これを利用することで, __PARTITIONTIME をまとめて記述したり、後置することができます。
ただし、_PARTITIONTIME自体に集計関数を
適用してしまった場合(i.e. MAX(_PARTITIONTIME) as __PARTITIONTIME
) には
フルスキャンになってしまいますし、分析関数を利用する前には擬似列によるフィルタリングが必要
などがあり、必ずスキャン量を確認しながらクエリは作りましょう。
#standard SQL
WITH
BASE AS (
SELECT
_PARTITIONTIME as __PARTITIONTIME
, #...
FROM
mytable
)
, AGG AS (
SELECT
__PARTITIONTIME
, # ...
FROM
BASE
GROUP BY
__PARTITIONTIME, # ...
)
SELECT
*
FROM
AGG
WHERE
__PARTITIONTIME = TIMESTAMP("2017-10-01") -- 擬似列を利用しスキャン量が抑えられる
注意ですが_PARTITIONTIME は 基本的に UTCで保存するため、
JST(GMT+9)のタイムゾーンを考慮して _PARTITIONTIME を多めにとって絞り込む必要があります。
BigQueryに入れる際にはテーブルはなるべく細かい単位に分割する
時系列による分割は擬似列で済ませますが、その他の分割は
テーブル自体を細分化し、別テーブルとして管理しましょう。
スキャン量自体も抑えられるというのもありますが、オールインワンなテーブルは
テーブル構造の把握コストが非常に高いです。
小さいクエリでも起動時間に数秒かかってしまうBigQueryでは致命的と言えます。
ということで命名規則と構造だけ定めて、ワイルドカードで必要であればまとめましょう。
(ワイルドカードを利用した場合、パフォーマンスが少し落ちます)
擬似列も2017/12/01現在で適用できるので、両方を組み合わせて利用するのがオススメです。
#standard SQL
SELECT
column, # ...
FROM
`dataset.table_prefix_*` -- ワイルドカードでまとめる
WHERE
-- 2017/12/01 より partitioned tableにおいても擬似列が有効
_PARTITIONTIME = TIMESTAMP("2017-12-30")
クエリのキャッシュを利用する
BigQueryで一度利用されたクエリの結果はキャッシュされるようになっています。
キャッシュされた場合、その対応するクエリの実行はスキャン量ゼロで結果を得ることができます。
キャッシュはユーザ単位とプロジェクト単位で保存されるため、
うまく使う使うことでスキャン量の節約ができます。
ただ一時テーブルに結果がキャッシュされるためには 特定の条件が必要になります (参考)
大きく留意すべきことは
-
CURRENT_USER()
,CURRENT_TIMESTAMP()
,NOW()
といった日時関数などの変動関数を利用したクエリ -
_PARTITIONTIME
による分割テーブル/ビューにデータの挿入や更新などによって変動した場合
です。
キャッシュの結果を利用しやすくするために、
CURRENT_USER()
, CURRENT_TIMESTAMP()
, NOW()
は使わないのが無難です
高頻度で利用するするようなデータに関しては,
一時テーブルではなくテーブルに出力して利用するのが良いでしょう。
CLI や Python から BigQueryを利用する
BigQueryを利用には、Cloud SDKのbqコマンドや
Pythonであればpandas
の利用(read_gbq)を合わせて考えておくと良さそうです。
これらの経路から利用することで
- クエリのパラメタライズ (? による再利用ができる)
- Google SpreadSheetやCSVにレコードが多すぎて書き出せない場合も対応 (Python/pandas)
することが可能です。
クエリのパラメタライズ
echo "SELECT @hoge" | bq query --parameter "hoge:string:bigquery"
結果:
+----------+
| f0_ |
+----------+
| bigquery |
+----------+
クエリのダウンロード
query = "SELECT 1";
project_id = "project_name"
df = pd.read_gbq(query, project_id, dialect="standard")
df.to_csv(dest)
BigQueryの標準関数の活用
BigQueryには様々な便利な関数が用意されているので
それらにまつわるtipsをここで紹介しておきたいと思います。
DAUからMAUを算出する: HYPERLOGLOG++
粒度の細かい統計量からより粒度の大きい統計量の計算が
できるようにしておくことでスキャン量の低減とビューの再利用性が高まります。
同様に時間毎の集計数から日毎、週毎、月ごとのの集計値が導けると
各レコードが必要とする最小の集計単位のテーブルを作っておくだけで済むので
スキャン量や保持するデータ量の劇的な節約につなげることができます。
例えばPV数ですが
ページ毎のPV数からページの種類毎のPV数を集計をすることは
種類毎にページ毎のPV数を足し合わせるだけで簡単に算出することができますね。
中央値または90%ileなどの値であれば、精度に応じたビンを用意して
値を求める時にそれらから集計することが考えられます。
一方で少し大変ななのが, データのCardinalityの計測 (i.e. HAU, DAU, WAU, MAU) です.
APPROX_COUNT_DISTINCT という誤差を許容しつつ省メモリかつ高速にかぞえあげる方法が
BigQueryに用意されていますが
この関数で計算してしまった場合、DAUの計算結果はWAUの計算に利用できないため
DAU、WAUごとにクエリを個別に書いてやる必要がでてきます。
これでは少々面倒ですね。
そこで利用するのが HLL_COUNT関数群 です
HYPERLOGLOG++ ( 名前のインフレしすぎ )と呼ばれる確率的アルゴリズムの実実装で
APPROX_COUNT_DISTINCT の 内部実装もこの関数です。
HYPERLOGLOG++の理論的な詳細は割愛しますが, cardinalityが低い状態と高い状態に
おいても精度高く求められるようあ工夫してあります。
APPROX_COUNT_DISTINCT と違うのは 集計の中間状態を保持 が可能です。
使い方は簡単で、以下のような形で使います。
#standard SQL
WITH
BASE AS (
SELECT
SUBSTR(MD5(CAST(seed AS STRING)), 0, 1) as head
, SUBSTR(MD5(CAST(seed AS STRING)), 0, 2) as id
FROM
UNNEST(GENERATE_ARRAY(1, 10000)) as seed
),
HLL_BASE AS (
SELECT
head,
-- cardinary を記録する中間状態(sketch)を作成
HLL_COUNT.INIT(id) as sketch
FROM
BASE
GROUP BY
head
)
SELECT
-- 複数のsketchを集計し, cardinarlityを算出する
HLL_COUNT.MERGE(sketch)
FROM HLL_BASE
DAU から WAU を算出する時には
HLL_COUNT.INIT(id)
で、DAUの中間状態を作成しておき
HLL_COUNT.MERGE(id)
で、WAUのcardinalityを計算するという手順です。
日時の関数群の落とし穴
タイムゾーンの変換のために
TIMESTAMP_xxxや DATETIME_xxxx の関数には 引数にタイムゾーンの
指定を行えるものがあります。
正直色々と罠が多いので, 手元で挙動を確認しておくことオススメします。
#standard SQL
WITH
base AS (
SELECT CURRENT_TIMESTAMP() as ts
UNION ALL SELECT TIMESTAMP("2017-11-12 00:00:00")
UNION ALL SELECT TIMESTAMP("2017-11-12 00:00:00", "Asia/Tokyo")
),
SELECT
ts
, EXTRACT (HOUR FROM ts AT TIME ZONE "Asia/Tokyo")
, TIMESTAMP_TRUNC(ts, HOUR, "Asia/Tokyo")
, TIMESTAMP_TRUNC(ts, DAY, "Asia/Tokyo")
, FORMAT_TIMESTAMP("%F %X", ts, "Asia/Tokyo")
, FORMAT_TIMESTAMP("%F", ts, "Asia/Tokyo")
FROM
base
結果:
ts | f0_ | f1_ | f2_ | f3_ |
---|---|---|---|---|
2017-11-12 18:55:03.416043 | UTC,3,2017-11-12 18:00:00.000000 UTC | 2017-11-12 15:00:00.000000 UTC | 2017-11-13 03:55:03 | 2017-11-13 |
2017-11-12 00:00:00.000000 | UTC,9,2017-11-12 00:00:00.000000 UTC | 2017-11-11 15:00:00.000000 UTC | 2017-11-12 09:00:00 | 2017-11-12 |
2017-11-11 15:00:00.000000 | UTC,0,2017-11-11 15:00:00.000000 UTC | 2017-11-11 15:00:00.000000 UTC | 2017-11-12 00:00:00 | 2017-11-12 |
まずBigQueryでは時間の扱い方は原則UTCになります。
CURRENT_TIMESTAMP()等で返される値は UTCなので気をつけましょう。
文字列からTIMESTAMPを生成する時にも忘れず,タイムゾーンを明示的につけましょう。
そして TIMESTAMP_TRUNC に落とし穴が多いです。
タイムスタンプ型にタイムゾーンの情報を持たせてない場合(2行目)
TIMESTAMP_TRUNC(ts, HOUR, "Asia/Tokyo")
, TIMESTAMP_TRUNC(ts, DAY, "Asia/Tokyo")
の日付がまず一致しません。
TIMESTAMP_TRUNC(ts, HOUR, "Asia/Tokyo")
の 返り値がJST時間で帰ってくると思いきや、
TIMESTAMP型はUTCで原則表示されるため、結局UTCになります。
かと思いきや
TIMESTAMP_TRUNC(ts, DAY, "Asia/Tokyo")
の場合には、時間が切り詰められた後タイムゾーンの処理が入り
タイムゾーンの文だけ時間が巻き戻されます。
正直直感的ではないので FORMAT_TIMESTAMP 等を活用するのが一番オススメです。
地理情報をBigQueryで扱う
BigQueryには地形情報を取り扱うための型や関数が存在しません。
そのためレコードをBigQueryのテーブルで用意する時には
-
POINT
型はSTRUCT(lat FLAOT64, lng FLOAT64)
へ変換,POLYGON
型はARRAY<STRUCT(lat FLAOT64, lng FLOAT64)>
へ変換する - 上記を取り扱う関数をUDF関数で用意する
のがおおよそのベストプラクティスかと思います。
#standard SQL
CREATE TEMP FUNCTION
degree2rad ( degree FLOAT64) AS ( ATAN(1.0) *4 * degree / 180 );
CREATE TEMP FUNCTION
distance_from_rad (
rlat1 FLOAT64
, rlng1 FLOAT64
, rlat2 FLOAT64,
, rlng2 FLOAT64
) AS (
6378.137 * ACOS(SIN(rlat1) * SIN(rlat2) + COS(rlat1) * COS(rlat2) * COS(rlng1 - rlng2))
);
CREATE TEMP FUNCTION
distance(
p1 STRUCT<lat FLOAT64, lng FLOAT64>,
p2 STRUCT<lat FLOAT64, lng FLOAT64>
) AS (
distance_from_rad(
degree2rad(p1.lat),
degree2rad(p1.lng),
degree2rad(p2.lat),
degree2rad(p2.lng)
)
);
WITH
base AS (
SELECT [
STRUCT<lat FLOAT64,lng FLOAT64>(32.345,178.3)
, STRUCT<lat FLOAT64, lng FLOAT64>(32.345, 178.2)
] AS p
)
SELECT
distance(p[OFFSET(0)], p[OFFSET(1)])
FROM
base
最後に
BigQueryのベストプラクティス集を書き出してみました。
一方で、UDF関数の外部保存がjsにしか対応していない、UDFはビューに保存できない
まだ痒いところに手が届かない、というところもありますが
定期的に様々な機能がリリースされています。
BigQueryは力押しできる分散処理環境を低価格で提供してくれるサービスのため
数十GB程度のデータを処理しないと...!ってなった時には
とりあえず BigQuery にあげてしまって処理しよう、ということもしばしばです。
本記事が何かの参考になり、良いBigQueryライフをお過ごしいただければ幸いです。