はじめに
本記事はBigQuery Advent Calendar 2021の18日目の記事です。
IoTやWebアプリのログ分析など、時系列データのETL処理周りで役立つスニペットを集めたものを記載しています。
時系列データ分析で手を焼いている方の一助になれば幸いです。
BigQuery Materialized View (略:MV
)
時系列データは単一の時系列であればそこまで難しくはないのですが、センサーの種類や商品などの項目を持つ複数時系列のテーブルを扱う際はどうしてもデータ容量(≒スキャン量)が多くなりがちなので、比較的重ためなETL処理を行うことがしばしばあります。
しかし、ETLパイプラインのメンテナンス性やリアルタイム性を考えると、あまり乱立させたくないと思われる方は多いのではないでしょうか。
そこで役に立つのがBigQueryの機能の1つ、Materialized Viewです。(以下、MV
と略します。)
超ざっくり&誤解を恐れずいえば、リアルタイムなETL処理をViewという形で実現した、実データを持つViewです。
例えば、
- 商品の売上を商品別・月別で集計する
- IoTのデータを予め分単位で丸めておく
- 不要なデータをフィルタしておく
などの比較的シンプルなGROUP BY
やWHERE
によって、レコード数を圧縮するようなETLをリアルタイムで実現できます。
ベースとなるテーブルのデータが更新された場合は、その差分だけが集計されるのでクエリ課金額は最小限に抑えられます。
これまでこれらの処理はViewで用意しておくことが一般的でしたが、そうすると毎回ベーステーブルのスキャンが入りますので、スキャン量が大きくなる傾向にありました。
MV
を用いると、既に圧縮されたテーブルに対してクエリを発行できるので、スキャン量の削減が可能になります。
コストやクエリ時間が見違えるほど改善されるので、積極的に使っていきたいですね。
時系列データ分析に役立つスニペット集
本題に入りまして、時系列データでよく使う処理をMV
で実現するためのスニペット集を紹介します。
前提となるテーブル
まずは前提となる時系列データのサンプルを以下に記します。
item_id
がAとBのときでそれぞれ10分粒度でデータが入っています。
timestamp | item_id | value | |
---|---|---|---|
0 | 2021-11-30 15:00:00+00:00 | A | 1 |
1 | 2021-11-30 15:10:00+00:00 | A | 2 |
2 | 2021-11-30 15:20:00+00:00 | A | 3 |
3 | 2021-11-30 15:30:00+00:00 | A | 4 |
4 | 2021-11-30 15:40:00+00:00 | A | 5 |
5 | 2021-11-30 15:50:00+00:00 | A | 6 |
6 | 2021-11-30 15:00:00+00:00 | B | 11 |
7 | 2021-11-30 15:10:00+00:00 | B | 12 |
8 | 2021-11-30 15:20:00+00:00 | B | 13 |
9 | 2021-11-30 15:30:00+00:00 | B | 14 |
10 | 2021-11-30 15:40:00+00:00 | B | 15 |
11 | 2021-11-30 15:50:00+00:00 | B | 16 |
timestamp
は時間列で、このテーブルのパーティション列として指定されています。
item_id
は時系列の名前に相当します。例えばセンサー名や商品名などです。これによって時系列が区別されています。
value
は集計対象の値です。
このテーブルがBigQueryのqiita_sample.timeseries
という場所にあるとします。
各時系列ごとに最新値を取得するMV
ダッシュボードなどを作る際に最新値を取得したいことがあると思います。
それぞれの時系列で最新値を取得するMV
を作成するSQLは以下のとおりです。
CREATE MATERIALIZED VIEW qiita_sample.mv_latest_items
AS
SELECT
item_id
, ARRAY_AGG(value IGNORE NULLS ORDER BY timestamp DESC LIMIT 1) as latest_value
, MAX(timestamp) as latest_timestamp
FROM qiita_sample.timeseries
GROUP BY item_id
上記のqiita_sample
は任意のデータセット名、mv_latest_items
は任意のMV
名を使用してください。
ポイント
最新値を取得するSQLは一般的には、
RANK() OVER(PARTITION BY item_id ORDER BY timestamp DESC) as row_no
などのようなランク関数を用いて、後にWHERE row_no=1
でフィルタをするという具合で実現しますが、
残念ながらMV
には分析関数が使用できないので、集計関数を用いた最新値取得を行います。
それが、上記のARRAY_AGG(value IGNORE NULLS ORDER BY timestamp DESC LIMIT 1)
部分です。
このMV
へのクエリ例
ARRAY_AGG
を用いているため、このMV
をクエリしてデータを取得する際はOFFSET
が必要です。
SELECT
item_id
, latest_value[OFFSET(0)] as latest_value
, latest_timestamp
FROM qiita_sample.mv_latest_items
上記のクエリは以下のような結果を返します。
item_id | latest_value | latest_timestamp | |
---|---|---|---|
0 | A | 6 | 2021-11-30 15:50:00+00:00 |
1 | B | 16 | 2021-11-30 15:50:00+00:00 |
ベーステーブルが各時系列ごとに5,6レコードありましたが、この最新値のみを残すMV
ではサイズが1/5前後となり、クエリ課金額や計算時間の大幅な圧縮が期待できます。
任意の時間粒度で時間を丸めるMV
(ダウンサンプリング)
時間別や日別などで予め時系列をダウンサンプリングしておきたいことがあると思います。
その際のMV
は以下の2パターンが考えられます。
-
TIMESTAMP_TRUNC
でサポートされている粒度で丸めたいとき (1時間単位、1日単位など) - 上記でサポートされていない細かい粒度で丸めたいとき (30分単位など)
TIMESTAMP_TRUNC
でサポートされている粒度で丸めたいとき
こちらは通常のViewで実現する場合と同様の方法で問題ないです。
以下の例は、1時間単位で丸めたいときのMV
を作成するSQLです。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_1h
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_TRUNC(timestamp,HOUR) as timestamp
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2
上記によって作られたMV
は以下のようなテーブルになります。
item_id | timestamp | value_avg | |
---|---|---|---|
0 | A | 2021-11-30 15:00:00+00:00 | 3.5 |
1 | B | 2021-11-30 15:00:00+00:00 | 13.5 |
ポイント
MV
は通常のViewと異なり実データを持つため、パーティションの設定が可能です。
上記のSQLのように、PARTITION BY DATE(timestamp)
という形でパーティション列を設定しています。
ただし、このパーティション列はベーステーブルと同一のカラムでなければなりません。
TIMESTAMP_TRUNC
でサポートされていない粒度で丸めたいとき
例えば、30分単位で丸めたMV
を作りたいとします。
この場合、TIMESTAMP_TRUNC
は使用できないので別の方法で時間列timestamp
を丸めなければいけません。
まずは失敗例を説明します。
以下のSQLのように、timestmap
を TIMESTAMP_SECONDS(CAST(FLOOR(UNIX_SECONDS({丸めたい時間列})/{丸めたい秒数})*{丸めたい秒数} AS INT64))
によって
30分単位に丸めて、このときのtimestamp
をパーティション列として指定します。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_30m
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_SECONDS(CAST(FLOOR(UNIX_SECONDS(timestamp)/1800)*1800 AS INT64)) as timestamp
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2
すると以下のようなエラーが出ると思います。
Partitioning column of the materialized view must either match partitioning column or pseudo-column of the base table, or be a TIMESTAMP_TRUNC over it.
どうやら、ベーステーブルで指定されたパーティション列をTIMESTAMP_TRUNC
以外の方法で修正してしまうと、MV
のパーティション列として指定できなくなってしまうようです。
そこで、少し気持ち悪いかもしれませんが以下のように「まず1時間粒度で丸めつつ、分の列を付与する」という方法を試してみます。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_30m
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_TRUNC(timestamp,HOUR) as timestamp
, FLOOR(EXTRACT(MINUTE FROM timestamp) / 30) * 30 as min -- 分を30分単位に丸める
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2,3
このときのMV
は以下のようなテーブルになります。
item_id | timestamp | min | value_avg | |
---|---|---|---|---|
0 | A | 2021-11-30 15:00:00+00:00 | 0 | 2 |
1 | A | 2021-11-30 15:00:00+00:00 | 30 | 5 |
2 | B | 2021-11-30 15:00:00+00:00 | 0 | 12 |
3 | B | 2021-11-30 15:00:00+00:00 | 30 | 15 |
timestamp
は1時間粒度ですが、min
列を参照することで30分単位での時系列データの取得が可能となります。
クエリ例
手間ですが、あるべきtimestamp
の姿に戻したい場合は、このMV
を叩くクエリにて以下のようなSQLを記述してあげるとよいでしょう。
SELECT
item_id
, TIMESTAMP_ADD(timestamp,INTERVAL CAST(min AS INT64) MINUTE) as timestamp
, value_avg
FROM qiita_sample.mv_timeseries_30m
クエリ結果は以下のようになります。
item_id | timestamp | value_avg | |
---|---|---|---|
0 | A | 2021-11-30 15:00:00+00:00 | 2 |
1 | A | 2021-11-30 15:30:00+00:00 | 5 |
2 | B | 2021-11-30 15:00:00+00:00 | 12 |
3 | B | 2021-11-30 15:30:00+00:00 | 15 |
各時系列の値を各列に持つテーブルを取得するMV
(縦持ち->横持ち)
MV
ではPIVOTを使用できないので、地道に横持ち整形します。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_pivot
PARTITION BY DATE(timestamp)
AS
SELECT
timestamp
, AVG(IF(item_id="A",value,NULL)) as A
, AVG(IF(item_id="B",value,NULL)) as B
FROM qiita_sample.timeseries
GROUP BY timestamp
上記のMV
は以下のようなテーブルになります。
timestamp | A | B | |
---|---|---|---|
0 | 2021-11-30 15:00:00+00:00 | 1 | 11 |
1 | 2021-11-30 15:10:00+00:00 | 2 | 12 |
2 | 2021-11-30 15:20:00+00:00 | 3 | 13 |
3 | 2021-11-30 15:30:00+00:00 | 4 | 14 |
4 | 2021-11-30 15:40:00+00:00 | 5 | 15 |
5 | 2021-11-30 15:50:00+00:00 | 6 | 16 |
おわりに
IoTデータなどの時系列かつ大量のデータは油断するとクエリ量が増大しやすいため、上記のようなMV
の恩恵は大きいです。
やるとやらないとで10倍以上のコストダウン効果が見込めることもあるので、ご参考になれば幸いです。