はじめに
本記事はBigQuery Advent Calendar 2021の18日目の記事です。
IoTやWebアプリのログ分析など、時系列データのETL処理周りで役立つスニペットを集めたものを記載しています。
時系列データ分析で手を焼いている方の一助になれば幸いです。
BigQuery Materialized View (略:MV)
時系列データは単一の時系列であればそこまで難しくはないのですが、センサーの種類や商品などの項目を持つ複数時系列のテーブルを扱う際はどうしてもデータ容量(≒スキャン量)が多くなりがちなので、比較的重ためなETL処理を行うことがしばしばあります。
しかし、ETLパイプラインのメンテナンス性やリアルタイム性を考えると、あまり乱立させたくないと思われる方は多いのではないでしょうか。
そこで役に立つのがBigQueryの機能の1つ、Materialized Viewです。(以下、MVと略します。)
超ざっくり&誤解を恐れずいえば、リアルタイムなETL処理をViewという形で実現した、実データを持つViewです。
例えば、
- 商品の売上を商品別・月別で集計する
- IoTのデータを予め分単位で丸めておく
- 不要なデータをフィルタしておく
などの比較的シンプルなGROUP BYやWHEREによって、レコード数を圧縮するようなETLをリアルタイムで実現できます。
ベースとなるテーブルのデータが更新された場合は、その差分だけが集計されるのでクエリ課金額は最小限に抑えられます。
これまでこれらの処理はViewで用意しておくことが一般的でしたが、そうすると毎回ベーステーブルのスキャンが入りますので、スキャン量が大きくなる傾向にありました。
MVを用いると、既に圧縮されたテーブルに対してクエリを発行できるので、スキャン量の削減が可能になります。
コストやクエリ時間が見違えるほど改善されるので、積極的に使っていきたいですね。
時系列データ分析に役立つスニペット集
本題に入りまして、時系列データでよく使う処理をMVで実現するためのスニペット集を紹介します。
前提となるテーブル
まずは前提となる時系列データのサンプルを以下に記します。
item_idがAとBのときでそれぞれ10分粒度でデータが入っています。
| timestamp | item_id | value | |
|---|---|---|---|
| 0 | 2021-11-30 15:00:00+00:00 | A | 1 |
| 1 | 2021-11-30 15:10:00+00:00 | A | 2 |
| 2 | 2021-11-30 15:20:00+00:00 | A | 3 |
| 3 | 2021-11-30 15:30:00+00:00 | A | 4 |
| 4 | 2021-11-30 15:40:00+00:00 | A | 5 |
| 5 | 2021-11-30 15:50:00+00:00 | A | 6 |
| 6 | 2021-11-30 15:00:00+00:00 | B | 11 |
| 7 | 2021-11-30 15:10:00+00:00 | B | 12 |
| 8 | 2021-11-30 15:20:00+00:00 | B | 13 |
| 9 | 2021-11-30 15:30:00+00:00 | B | 14 |
| 10 | 2021-11-30 15:40:00+00:00 | B | 15 |
| 11 | 2021-11-30 15:50:00+00:00 | B | 16 |
timestampは時間列で、このテーブルのパーティション列として指定されています。
item_idは時系列の名前に相当します。例えばセンサー名や商品名などです。これによって時系列が区別されています。
valueは集計対象の値です。
このテーブルがBigQueryのqiita_sample.timeseriesという場所にあるとします。
各時系列ごとに最新値を取得するMV
ダッシュボードなどを作る際に最新値を取得したいことがあると思います。
それぞれの時系列で最新値を取得するMVを作成するSQLは以下のとおりです。
CREATE MATERIALIZED VIEW qiita_sample.mv_latest_items
AS
SELECT
item_id
, ARRAY_AGG(value IGNORE NULLS ORDER BY timestamp DESC LIMIT 1) as latest_value
, MAX(timestamp) as latest_timestamp
FROM qiita_sample.timeseries
GROUP BY item_id
上記のqiita_sampleは任意のデータセット名、mv_latest_itemsは任意のMV名を使用してください。
ポイント
最新値を取得するSQLは一般的には、
RANK() OVER(PARTITION BY item_id ORDER BY timestamp DESC) as row_no
などのようなランク関数を用いて、後にWHERE row_no=1でフィルタをするという具合で実現しますが、
残念ながらMVには分析関数が使用できないので、集計関数を用いた最新値取得を行います。
それが、上記のARRAY_AGG(value IGNORE NULLS ORDER BY timestamp DESC LIMIT 1)部分です。
このMVへのクエリ例
ARRAY_AGGを用いているため、このMVをクエリしてデータを取得する際はOFFSETが必要です。
SELECT
item_id
, latest_value[OFFSET(0)] as latest_value
, latest_timestamp
FROM qiita_sample.mv_latest_items
上記のクエリは以下のような結果を返します。
| item_id | latest_value | latest_timestamp | |
|---|---|---|---|
| 0 | A | 6 | 2021-11-30 15:50:00+00:00 |
| 1 | B | 16 | 2021-11-30 15:50:00+00:00 |
ベーステーブルが各時系列ごとに5,6レコードありましたが、この最新値のみを残すMVではサイズが1/5前後となり、クエリ課金額や計算時間の大幅な圧縮が期待できます。
任意の時間粒度で時間を丸めるMV (ダウンサンプリング)
時間別や日別などで予め時系列をダウンサンプリングしておきたいことがあると思います。
その際のMVは以下の2パターンが考えられます。
-
TIMESTAMP_TRUNCでサポートされている粒度で丸めたいとき (1時間単位、1日単位など) - 上記でサポートされていない細かい粒度で丸めたいとき (30分単位など)
TIMESTAMP_TRUNCでサポートされている粒度で丸めたいとき
こちらは通常のViewで実現する場合と同様の方法で問題ないです。
以下の例は、1時間単位で丸めたいときのMVを作成するSQLです。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_1h
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_TRUNC(timestamp,HOUR) as timestamp
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2
上記によって作られたMVは以下のようなテーブルになります。
| item_id | timestamp | value_avg | |
|---|---|---|---|
| 0 | A | 2021-11-30 15:00:00+00:00 | 3.5 |
| 1 | B | 2021-11-30 15:00:00+00:00 | 13.5 |
ポイント
MVは通常のViewと異なり実データを持つため、パーティションの設定が可能です。
上記のSQLのように、PARTITION BY DATE(timestamp)という形でパーティション列を設定しています。
ただし、このパーティション列はベーステーブルと同一のカラムでなければなりません。
TIMESTAMP_TRUNCでサポートされていない粒度で丸めたいとき
例えば、30分単位で丸めたMVを作りたいとします。
この場合、TIMESTAMP_TRUNCは使用できないので別の方法で時間列timestampを丸めなければいけません。
まずは失敗例を説明します。
以下のSQLのように、timestmapを TIMESTAMP_SECONDS(CAST(FLOOR(UNIX_SECONDS({丸めたい時間列})/{丸めたい秒数})*{丸めたい秒数} AS INT64))によって
30分単位に丸めて、このときのtimestampをパーティション列として指定します。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_30m
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_SECONDS(CAST(FLOOR(UNIX_SECONDS(timestamp)/1800)*1800 AS INT64)) as timestamp
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2
すると以下のようなエラーが出ると思います。
Partitioning column of the materialized view must either match partitioning column or pseudo-column of the base table, or be a TIMESTAMP_TRUNC over it.
どうやら、ベーステーブルで指定されたパーティション列をTIMESTAMP_TRUNC以外の方法で修正してしまうと、MVのパーティション列として指定できなくなってしまうようです。
そこで、少し気持ち悪いかもしれませんが以下のように「まず1時間粒度で丸めつつ、分の列を付与する」という方法を試してみます。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_30m
PARTITION BY DATE(timestamp)
AS
SELECT
item_id
, TIMESTAMP_TRUNC(timestamp,HOUR) as timestamp
, FLOOR(EXTRACT(MINUTE FROM timestamp) / 30) * 30 as min -- 分を30分単位に丸める
, AVG(value) as value_avg
FROM qiita_sample.timeseries
GROUP BY 1,2,3
このときのMVは以下のようなテーブルになります。
| item_id | timestamp | min | value_avg | |
|---|---|---|---|---|
| 0 | A | 2021-11-30 15:00:00+00:00 | 0 | 2 |
| 1 | A | 2021-11-30 15:00:00+00:00 | 30 | 5 |
| 2 | B | 2021-11-30 15:00:00+00:00 | 0 | 12 |
| 3 | B | 2021-11-30 15:00:00+00:00 | 30 | 15 |
timestampは1時間粒度ですが、min列を参照することで30分単位での時系列データの取得が可能となります。
クエリ例
手間ですが、あるべきtimestampの姿に戻したい場合は、このMVを叩くクエリにて以下のようなSQLを記述してあげるとよいでしょう。
SELECT
item_id
, TIMESTAMP_ADD(timestamp,INTERVAL CAST(min AS INT64) MINUTE) as timestamp
, value_avg
FROM qiita_sample.mv_timeseries_30m
クエリ結果は以下のようになります。
| item_id | timestamp | value_avg | |
|---|---|---|---|
| 0 | A | 2021-11-30 15:00:00+00:00 | 2 |
| 1 | A | 2021-11-30 15:30:00+00:00 | 5 |
| 2 | B | 2021-11-30 15:00:00+00:00 | 12 |
| 3 | B | 2021-11-30 15:30:00+00:00 | 15 |
各時系列の値を各列に持つテーブルを取得するMV (縦持ち->横持ち)
MVではPIVOTを使用できないので、地道に横持ち整形します。
CREATE MATERIALIZED VIEW qiita_sample.mv_timeseries_pivot
PARTITION BY DATE(timestamp)
AS
SELECT
timestamp
, AVG(IF(item_id="A",value,NULL)) as A
, AVG(IF(item_id="B",value,NULL)) as B
FROM qiita_sample.timeseries
GROUP BY timestamp
上記のMVは以下のようなテーブルになります。
| timestamp | A | B | |
|---|---|---|---|
| 0 | 2021-11-30 15:00:00+00:00 | 1 | 11 |
| 1 | 2021-11-30 15:10:00+00:00 | 2 | 12 |
| 2 | 2021-11-30 15:20:00+00:00 | 3 | 13 |
| 3 | 2021-11-30 15:30:00+00:00 | 4 | 14 |
| 4 | 2021-11-30 15:40:00+00:00 | 5 | 15 |
| 5 | 2021-11-30 15:50:00+00:00 | 6 | 16 |
おわりに
IoTデータなどの時系列かつ大量のデータは油断するとクエリ量が増大しやすいため、上記のようなMVの恩恵は大きいです。
やるとやらないとで10倍以上のコストダウン効果が見込めることもあるので、ご参考になれば幸いです。