はじめに
BigQueryにデータを取り込む際に取得時刻を"レコードの中"にカラムとして入れておき、さらに中間テーブルにもそれを引き継がせておくことで、**「これはいつ時点のデータ(が反映されてるの)?」**という問い答えやすくなるというお話です。
データ分析で頻出する 「データの鮮度」 問題
BigQueryに集めた色々なデータを組み合わせて分析しているとき、よくぶつかるのが「これはいつ時点のデータが反映されてるの?」という疑問です。
また「あれ、もしかして更新されてないんじゃない?」だったり「なんか過去のデータ変わってない?」みたいなケースもあります。
BigQueryのWebコンソールからも最終更新時刻は確認できますが、それはあくまでそのテーブルの最終更新時刻であって**「データの中身の鮮度」**とは微妙にニュアンスが異なります。
また、パーティション化された日次取り込みのログテーブルでは、知りたいのはテーブル全体の最終更新時刻ではなく、パーティションごとの更新情報です。
これらの課題を解決するために、BigQueryに取り込むデータについては、必ずそのデータの取得時刻(または送信時刻)をカラムとして追加してから転送してもらうようにしました。
データの鮮度情報をレコード内に入れておくメリット
BigQueryのメタデータに依存しないため、送信元となる外部システムの中で意味のある情報を挿入することができます。何らかの処理が完了した時刻がほしければ、それを入れてもらうこともできます。
また、SQLクエリで確認できるので以下のようなメリットもあります。
- SQLクエリで分析作業をしている環境を離れずに確認できる
- APIやgcloudコマンドでテーブルのメタ情報を一覧で抜き出すための構文を調べたり出力結果を見やすく加工したりする手間がかからない
- なんならBIツールのレポートのひとつとして可視化も簡単にできる
実装例
日付パーティション化されているテーブルに「dwh_imported_at」という名前で取得時刻カラムを追加した場合、以下のようなクエリで簡単に状況を確認することができます。
SELECT
DATE(_PARTITIONTIME) AS p,
DATETIME(dwh_imported_at, 'Asia/Tokyo') AS dwh_imported_at, --BigQueryのtimestamp列はUTCなので見やすいようにJSTにしておく
COUNT(*) AS cnt
FROM
schema_name.table_name
GROUP BY
1, 2
ORDER BY
p, dwh_imported_at
結果例
Case 1: 特定の日だけ取込時刻がずれている(日次取得のパーティションテーブル)
+------------+---------------------+-------+
| p | dwh_imported_at | cnt |
+------------+---------------------+-------+
| 2019-01-01 | 2019-01-02T01:30:11 | 13335 |
| 2019-01-02 | 2019-01-03T01:31:05 | 12387 |
| 2019-01-03 | 2019-01-04T04:53:49 | 12826 | <--!?
+------------+---------------------+-------+
いつもは翌日のAM1:30前後に取り込まれているのに、1月3日だけAM4:53になっている。
Case 2: パーティションごとに取込時刻は一意なはずなのに、複数存在している
+------------+---------------------+-------+
| p | dwh_imported_at | cnt |
+------------+---------------------+-------+
| 2019-01-01 | 2019-01-02T01:30:11 | 13335 |
| 2019-01-02 | 2019-01-03T01:31:05 | 12387 |
| 2019-01-02 | 2019-01-03T01:43:11 | 7635 | <--!?
| 2019-01-03 | 2019-01-04T01:29:49 | 12826 |
+------------+---------------------+-------+
2019-01-02のパーティションに dwh_imported_at
が2パターン存在している。2回に分けて取り込まれた?それにしては合計の件数が明らかにおかしい。。。
Case 3: 過去日のデータを取り込み直した
+------------+---------------------+-------+
| p | dwh_imported_at | cnt |
+------------+---------------------+-------+
| 2019-01-01 | 2019-01-02T01:30:11 | 13335 |
| 2019-01-02 | 2019-02-12T13:03:11 | 11631 | <--!?
| 2019-01-03 | 2019-01-04T01:29:49 | 12826 |
+------------+---------------------+-------+
2019-01-02のパーティションのデータが2月12日に上書きされてしまった模様。時間も昼過ぎなので夜間バッチではなく誰かが作業した?
このようにカラムの名前(この場合dwh_imported_at
)さえ統一して覚えておけば、分析用のSQLを書いているそのままの流れでデータの鮮度をすぐに確認できて便利ですね!
注意点
「中間テーブルでも元の値を引き継ぐ」
- 知りたいことは各テーブルが更新された時刻ではなく、その中に入っているデータの鮮度情報なので、中間テーブルを更新するたびに値を書き換えないように注意します。
- もちろん、中間テーブルの更新時刻に意味がある場合は、そのためのカラムを別途追加してもよさそうですね。
「1回の取り込みごとに時刻を一意にする」
- データの送信や時刻の挿入をアプリケーション側で実装してもらう場合、各レコードごとに時刻を生成してしまわないように注意します。(処理の開始時点で1つの時刻を生成しておき、それを全てのレコードに付与してもらいます)
- 1回の取り込みデータの中でレコードごとに微妙に異なるタイムスタンプが付与されているのは、かえって使いにくくなります。
- 他のチームに実装をお願いする場合は、そういった意図は明示的に伝える必要があるので注意が必要です。
embulkで取り込んでいる場合
カラムを追加することができるembulk-filter-columnという便利なプラグインがあるのでそれを使うと簡単です。
# Gemfile
gem 'embulk-filter-column'
embulkの定義ファイルは使い回しがしやすいよう、別ファイルに定義しておき、個々の定義ファイルからincludeするようにします。
# common/filter_column.yml.liquid
filters:
- type: column
default_timezone: "Asia/Tokyo"
add_columns:
- name: dwh_imported_at
type: timestamp
default: '{{ "now" | date: "%Y-%m-%d %H:%M:%S" }}'
format: "%Y-%m-%d %H:%M:%S"
# config.yml.liquid
{% include 'common/filter_common' %}
さいごに
DWHは様々な外部システムからあらゆるデータを受け入れる必要があり、想定外のデータが想定外の形やタイミングで送られてくることも珍しくありません。
「本来変わるはずのない過去のデータが変わっている気がする」などの状況でも、今回のような仕組みがあれば、少なくとも何か想定外のことが起こったか起こってないかくらいは、一定の自信を持って素早く確認することができます。
まだまだ細かな部分は改善の余地がありそうなので、よいアイデアをお持ちの方は教えていただけるとうれしいです。