はじめに
タイミーの石井です。
弊社のデータ基盤のTransformationの部分ではdbtを使っていますが、モデルの数もそれなりに多く(検証用含めると1000を超えるくらいあります)実際にデータがどんな状態にあるのかの俯瞰がかなりしにくい状態で、問題が起こったときの原因追求もしにくいという課題がありました。
そういった状態のところに先日elementaryを導入しまして、その結果について説明しようと思います。
構成
弊チームでは技術的負債の解消やテーマを決めた自由研究などを平常時にはある程度の時間取り組んでいるのですが、その枠を借りてelementaryを試してみることにしました。
本格導入前にどのくらいやれそうか、実装コストがどの程度かの判断を行ったのですが
- テストの状況の可視化がしやすく、しっかりテストを導入していけばデータの問題把握に使っていけそう
- elementaryテストでアノマリ系のテストの実装もしやすく、可視化も付いているの使い勝手良さそう
- oss版であれば追加コストはほぼないし、実装のエンジニアコストは軽そう
という結論になりまして、導入を行っていくことになりました。
導入に関しても開始時はこちらにあるように
- dbt packageの導入 + それに付帯する作業
- ローカル環境用のpython package情報(poetry)の用意
- 全てのJobから情報を収集するために昔のまま
dbt run
になっていたJobをdbt build
に書き換えた
程度で済んでいます。
この状態でローカルから edr report
コマンドを打つことできれいなレポートが生成できます
(導入したのは2ヶ月ほど前だと思うのですが、かなりリッチになったので開発も早いなぁ...と感じます)
細かいところは公式ページを見て頂ければと思いますが、このようなレポートが簡単に作れるのは大変便利だなと思ってます。
導入してよかったところと行った改善
よかったこと: テストのカバレッジ把握が行えたこと
それなりに気をつけてテストは導入してきたつもりだったのですが、そのカバレッジに少しがっかりしました。
(もちろん、テスト用のモデルもたくさんあるのですが50%くらいはあるかなという肌感でした)
ただ、これもあり事実に基づいた認識がチームで持てたのは良かったと思います。
よかったこと: warningの活用が進んだ
パイプラインを止めるほどではないけど、データの中身によっては通知を出しておきたいようなケースは多いように思います。
そういうケースではdbtテストの設定の中で severity を warningにすれば良いとは思うのですが、そうすると 「誰の目にも止まらず流れる」 という致命的な弱点を孕んでいると思います。
実際、我々もリアルタイム系のデータ品質の確認のため、とあるサービスを使ってニアリアルタイムな転送を行ったLakeの各テーブルのrelationshipがどのくらい壊れているのかをmonitoringしたいと思ったときにwarningの設定を行ったのですが、個々のタイミングでは見ても実際どのくらいの頻度かわかりにくいし、最初のうちはちゃんと見ていてもすぐに見るのを忘れる(これは個人の問題かもしれません)という状態に陥りました。
実際、elementary導入後はあくまでJobが流れたあとではあるものの「思ったより壊れがちである」ということが可視化できるようになったと思います。
こういうことが可視化できることにより、利用に当たっての注意喚起にも繋げていけるので非常に有用だと思います。
よかったこと: elementaryのアノマリテストが便利
テスト自体はよくあるデータ量やスキーマの型の変化等なのですが、それと可視化がセットになっているのはわかりやすさ的に非常に良かったと思います。
ちょうどとあるイベントテーブルで 私がやらかして 全件転送できずにLimitがかかってしまった事件がありそのテーブルにある程度それっぽい件数でなかったらアラートが鳴るように設定を入れたのですが、実際の件数(緑線)に対して、上限と下限もあわせて表示してくれるので、アラートが鳴ったあとにいちいちクエリを回したりせずともぱっと見で状態がわかりやすいかなと思います。
よかったこと: テスト結果の通知が何かと便利
elementaryではテストの結果によって通知を送ることができるのですが、例えばデータを利用したアプリケーションで15分以内に更新がなかった場合に特定の人の確認が必要なケースであれば、以下の様にテストを設定して
models:
- name: hogehoge
meta:
owner: ["@slackの名前"]
slack_group_alerts_by: "alert"
description: "hogehoge"
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
column_name: TIMESTAMP_MILLIS(timestamp)
datepart: minute
interval: 15
tags: ["TARGET_TAG"]
meta:
owner: ["@slackの名前"]
description: "data is not up-to-date."
次のようなコマンドを打つことで特定のテストで問題があった場合のみ通知を行うようなこともできます。
edr monitor --slack-token $SLACK_BOT_TOKEN --slack-channel-name $SLACK_CHANNEL_ID --select tag:TARGET_TAG
ただ、実装のタイミングだと少しバギーなところがあり、modelのmeta.descriptionが書かれていないと tests.meta.descriptionが反映されないなどいくらかの問題があったことは添えておきます。
改善したこと: レポートの生成を毎日自動化した
導入時点では上述の通り、ローカルで edr report
していたのですが、一回実行するにも数分かかりなかなか面倒でした。
そこでGithub pagesへの出力を検討したのですが、現在のプランだとprivateに扱えなかったためGHAで edr report
したものをSlackに転送するようにしました。
本当は edr send-report
コマンドを使えばよいのですが、私が試したタイミングでは何度試しても外部へ送信できず諦めたという背景があるため、実際はもっとスマートに書けると思うのですが、以下のようにしています。
(導入時の動作検証では動いていたので、バグか環境起因の何らかの問題があるのかと思われます)
name: generate edr report
on:
workflow_dispatch:
schedule:
- cron: "0 0 * * *" # JST 9:00
jobs:
main:
env:
PYTHON_VERSION: 3.11
runs-on: ubuntu-latest
permissions:
contents: "read"
id-token: "write"
steps:
- uses: actions/checkout@v3
- name: setup python
uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Set Up Auth
uses: "google-github-actions/auth@v1"
with:
token_format: "access_token"
workload_identity_provider: "projects/hogehoge/locations/global/workloadIdentityPools/hogehoge/providers/hogehoge"
service_account: "hogehoge@hogehoge.iam.gserviceaccount.com"
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v1
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install dbt-core dbt-bigquery elementary-data
- name: generate report
run: |
# 本来以下コマンドでslackへの連携ができるはずだが、ローカルでもGHA上でも動かないため
# 一旦reportコマンドで生成し、そのファイルを送信することにする
# edr send-report --slack-token ${ELEMENTARY_SLACK_BOT_TOKEN} --slack-channel-name ${ELEMENTARY_NOTIFICATION_CHANNEL}
edr report
- name: Upload report file to Slack
env:
SLACK_API_TOKEN: ${{ secrets.ELEMENTARY_SLACK_BOT_TOKEN }}
run: |
curl -F file=@./edr_target/elementary_report.html -F channels=${{ vars.ELEMENTARY_NOTIFICATION_CHANNEL }} -F token=$SLACK_API_TOKEN https://slack.com/api/files.upload
- name: Notify Failure
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
fields: repo,message,commit,author,action,job,took,eventName,ref,workflow
mention: "subteam^SXXXXXXXXX"
if_mention: failure
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_GITHUB_FAILURE_WEBHOOK }}
if: failure()
これにより以下のような形で毎日レポートが配信されています。
今後の展望
(以下、個人の考えです)
まずは必要なテストをちゃんと実装することで障害対応を早める、というところをやっていきたいなというふうに思ってます。
出さなかったですが、テストの結果をリネージュで追える画面が存在しており、適切なテストを各層で実装していくとどこからエラーが起きているのか把握しやすい状態にすることができると思っています。
そのためにはそもそもチームとして何に対してテストを書いてという基準であったり、そもそもデータに関するテスト文化を根付かせていくみたいなことも必要かなと思っています。
また、我々のチームではdbt外のテストで例えば更新頻度監視、完全性監視なども実施しているのですが、そのあたりも可能な範囲この中に載せていけないかなぁと思っています。
できるだけ1つの仕組みの中に収めていくことにより、データオブザーバビリティの高い状態にしていく、ということができていければ良いかなと思います。
終わりに
我々のチームはデータ基盤にまつわる色々なことをやっていまして、気づくとやる領域が増えていくような大変エキサイティングな組織になっています。
本当に面白いのですが、やりたいこと・やるべきことに対して全く人が足りないのでご興味ある方はぜひお話できると嬉しいです!