この記事はアドベントカレンダー6日目の記事です。
はじめに
こんにちは、STORES株式会社データ本部所属のKです。
データ分析やビジネスインテリジェンスの領域では、データの品質と正確性は非常に重要ですよね。特にデータモデリングの過程で不要になったテーブルの削除が漏れてしまうと、鮮度の古いデータを用いることによる間違った意思決定を引き起こしてしまう危険性があります。
弊社では BigQuery をデータウェアハウスとして採用し、データモデリングにおいては dbt を積極的に活用しています。本記事では、シェルスクリプトを用いてデータモデリングの中で発生しうる削除漏れテーブル(dbt プロジェクト内では既に削除済みだが、BigQuery にはまだ残っているテーブル)を検知し、結果を Slack に通知する方法について紹介したいと思います。
最終的に出来上がったシェルスクリプト
# Google Cloud プロジェクトと BigQuery データセットのプレフィックスを引数から取得
PROJECT_NAME=$1
DATASET_PREFIX_NAME=$2 # "warehouse"や"mart"を想定
# dbt lsコマンドを用いて dbt のモデル一覧を取得
dbt_models=$(dbt ls --quit --output json --resource-type model --output-keys original_file_path alias --select models/${DATASET_PREFIX_NAME} | jq -c .)
# dbt のモデルを dataset_name.table_name の形式に変換して配列に格納
dbt_array=$(for dbt_model in $dbt_models; do
dataset=$(echo $dbt_model | jq -r '.original_file_path' | sed -E "s/models\/(.*)\/.+/\1/" | sed -e "s/\//_/g")
table=$(echo $dbt_model | jq -r '.alias')
echo "$dataset.$table"
done)
# bq ls コマンドを用いて BigQuery のデータセット一覧を取得
bq_datasets=$(bq ls -d=true --project_id=${PROJECT_NAME} | awk -v dataset_name=${DATASET_PREFIX_NAME} '$1 ~ "^" dataset_name {print $1}')
# BigQuery データセットごとに dataset_name.table_name 形式でテーブルを配列に格納
bq_array=$(for bq_dataset in $bq_datasets; do
bq ls --format=json ${PROJECT_NAME}:$bq_dataset | jq -r '.[].tableReference | .datasetId + "." + .tableId'
done)
IFS=$'\n'
# dbt にしかないモデルをリストアップ
both_models=(`{ echo "${dbt_array[*]}"; echo "${bq_array[*]}"; } | sort | uniq -d`)
only_models=(`{ echo "${bq_array[*]}"; echo "${both_models[*]}"; } | sort | uniq -u`)
if (( ${#only_models[@]} == 0 )); then
slack_text=$(cat <<EOF
*dbt では削除済みだが BigQuery にはまだ残っているテーブルはありませんでした*
検索条件:\`${PROJECT_NAME}.${DATASET_PREFIX_NAME}*\`
EOF
)
else
slack_text=$(cat <<EOF
*dbt では削除済みだが BigQuery にはまだ残っているテーブルを検知しました*
検索条件:\`${PROJECT_NAME}.${DATASET_PREFIX_NAME}*\`
\`\`\`
${only_models[*]}
\`\`\`
INFORMATION_SCHEMA 等を参考に対象のテーブルが参照されているかを調べ、参照されていない場合は削除してください。
参照されている場合は、利用者を特定し、データの再更新が必要かを確認してください。
EOF
)
fi
webhook_url={{ 通知したいSlackチャンネルのWebhook URL }}
curl -X POST -H 'Content-type: application/json' --data '{"text":"'"${slack_text}"'"}' ${webhook_url}
一部処理の説明
dbt ls
コマンドを用いて dbt のモデル一覧を取得
dbt_models=$(dbt ls --quiet --output json --resource-type model --output-keys original_file_path alias --select models/${DATASET_PREFIX_NAME} | jq -c .)
下記オプションを用いて json 形式で検知対象のモデルの一覧を取得し、jq
コマンドで整形した上でdbt_models
変数に格納しています。
オプション | 説明 |
---|---|
--quiet | dbt のバージョンやモデル数といった情報の出力せず、モデルの一覧のみを出力することができる。 |
--output | json や yaml など、出力形式を指定することができる。 |
--resource-type | model や snapshot など、特定のリソースに絞って出力することができる。 |
--output-keys | --output オプションで json を選んだ場合のみ、出力するキーを絞り込むことができる。 |
--select | 出力範囲を指定することができる。 |
弊社の dbt プロジェクトは下記のような階層構造になっているため、--select models/${DATASET_NAME}
としています。
ここは実行する dbt プロジェクトの階層構造に合わせて適宜変更してください。
models
├── warehouse
└── mart
└──ec
└── mrt_ec__stores.sql
dbt ls
コマンドの詳細については、公式リファレンスをご参照ください。
dbt のモデルをdataset_name.table_name
の形式に変換して配列に格納
dbt_array=$(for dbt_model in $dbt_models; do
dataset=$(echo $dbt_model | jq -r '.original_file_path' | sed -E "s/models\/(.*)\/.+/\1/" | sed -e "s/\//_/g")
table=$(echo $dbt_model | jq -r '.alias')
echo "$dataset.$table"
done)
弊社ではgenerate_schema_name
マクロをオーバーライドすることで、models
ディレクトリ配下のディレクトリ名をもとにテーブル生成先のデータセットを指定するようにしています。
先程の階層構造の例でいくと、models/mart/ec
ディレクトリ配下のモデルは、mart_ec
というデータセットに生成されます。
また、dbt の制約上models
ディレクトリ配下のモデル名はユニークである必要があるため、それを回避するためにmrt_ec__stores.sql
のように__
の前にデータセット名の略称、後ろにテーブル名を入れるようにしています。
テーブルに関しても、generate_alias_name
マクロをオーバーライドすることで、モデル名の__
の後ろの文字列をテーブル名として BigQuery にテーブルを生成するようにしています。
上記仕様に合わせて、original_file_path
からデータセット名、alias
からテーブル名を抽出し、dataset_name.table_name
の形式でモデルの一覧をdbt_array
変数に格納しています。
こちらも dbt プロジェクト内でのモデルの管理方法に依存するため、必要に応じて適宜変更してください。
bq ls
コマンドを用いてBigQueryのデータセット一覧を取得
bq_datasets=$(bq ls -d=true --project_id=${PROJECT_NAME} | awk -v dataset_name=${DATASET_PREFIX_NAME} '$1 ~ "^" dataset_name {print $1}')
下記オプションを用いて、指定したプレフィックスを含むデータセットの一覧を抽出し、bq_datasets
変数に格納しています。
オプション | 説明 |
---|---|
-d | データセットのみを一覧で出力することができる。 |
--project_id | 出力元の Google Cloud プロジェクトを指定することができる。 |
なお、bq ls
コマンドはデフォルトでは最大で50個しかデータセットを出力することができません。もし BigQuery 上に51個以上データセットが存在する場合は、-n
オプションを指定することで最大1000個まで上限を引き上げることも可能です。
bq ls
コマンドの詳細については、公式リファレンスをご参照ください。
データセットごとにdataset_name.table_name
形式でテーブルを配列に格納
bq_array=$(for bq_dataset in $bq_datasets; do
bq ls --format=json ${PROJECT_NAME}:$bq_dataset | jq -r '.[].tableReference | .datasetId + "." + .tableId'
done)
先程抽出した検知対象のデータセット内に含まれるテーブルの一覧をbq ls
コマンドを用いて json 形式で抽出し、jq
コマンドで整形した上で、dataset_name.table_name
の形式でbq_array
変数に格納しています。
なお、こちらもデフォルトは最大50個となっているため、テーブルの数に応じて-n
オプションで上限を上げると良いでしょう。
ここまでの処理で、dbt プロジェクトにあるモデルと BigQuery にあるテーブルの一覧をそれぞれ抽出することができました。
残りの処理としては、配列の比較をして差分を計算し、その結果を Slack に通知させているだけなので割愛させていただきます。
実行結果
今回実装したシェルスクリプトを動かすと、下記のような結果が Slack に通知されます。
工夫した点としては、こういった通知は結果を見て終わってしまう傾向が強いため、結果と一緒にネクストアクションを促すようなコメントを記載するようにしました。
おわりに
今回の記事では、dbt ls
コマンドを用いて削除漏れテーブルを検知するためのシェルスクリプトを紹介させていただきました。
元々は「削除漏れテーブルを調べたい」という動機のもと実装を進めていたのですが、結果的に「まだ dbt でモデル化できていないテーブル」を調べることもでき、今後も汎用性高く利用できそうなイメージを持てました。
データ品質の維持は継続的な課題です。まずは紹介した処理を実行して現状を把握しつつ、GitHub Actions や Argo Workflows といったワークフローに乗せて定期実行していくと、データマネジメントの観点でもデータ品質の維持に繋がりとても良さそうですね。
本記事の内容が少しでも役に立てば幸いです。ここまで読んでくださりありがとうございました!