目的
dbt-coreとdbt-athena-communityを使い、Amazon Athenaへデータモデルを構築しています。
運用していくうえで、
- Athenaにあるテーブルがdbt側で管理されていない
- dbt側にある定義のテーブルがAthenaにない
この2つについて、管理し定期的にメンテナンスがしたくなりました。
そこで、dbtとAthenaいずれかで管理されていないテーブル一覧を出力するスクリプトを作りました。
コード
先にコードの全容と、実行結果です。
#!/bin/bash
dbt_content="name,resource_type,schema\n"
glue_content="name,resource_type,schema\n"
dbt_data="dbt_output.csv"
glue_data="glue_output.csv"
# sourcesのテーブル群を出力
# jsonを配列に格納
# source_nameをschemaに変換
# name,resource_type,source_nameだけに絞る→最初の配列のキーを取得→カンマで区切ってそれ以降はname,resource_type,source_nameの値を取得→@csvでcsv出力
dbt_source_data=$(poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')
# # modelのテーブル群を出力
dbt_model_data=$(poetry run dbt list --resource-types model --output json --output-keys name resource_type schema --log-level warn --target prod \
| jq -s '.' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')
for data in "$dbt_source_data" "$dbt_model_data"; do
dbt_content+=$(echo "$data" | sed 1d)
dbt_content+="\n"
done
echo -e "$dbt_content" > $dbt_data
# AWS Glueのテーブル群を出力
glue_datababases=(
"database_a"
"database_b"
)
get_glue_tables() {
local database_name=$1
aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name, DatabaseName: DatabaseName}' --profile mds-sts \
| jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
}
for db in "${glue_datababases[@]}"; do
result=$(get_glue_tables "$db")
glue_content+=$(echo "$result" | sed 1d)
glue_content+="\n"
done
echo -e "$glue_content" > $glue_data
# それぞれUNIONした後にawsで重複を削除
combined_content="${dbt_content}$(echo -e "$glue_content" | sed 1d)"
# 完全に重複がないもののみ出力する
awk -F, 'NR==FNR{a[$1,$3]++; next} a[$1,$3]==1' <(echo -e "$combined_content") <(echo -e "$combined_content") > unmanaged_tables.csv
$ bash export_table_catalogs.sh
上記ファイルを実行すると、以下3つのcsvファイルが出力されます。
name,resource_type,schema
"table_a","source","database_a"
"table_b","source","database_b"
"table_c","model","database_b"
dbt_output.csv
はdbtで管理しているテーブルのリストです。
name,resource_type,schema
"table_a","glue_catalog","database_a"
"table_b","glue_catalog","database_b"
"table_c","glue_catalog","database_b"
"test","glue_catalog","database_a"
glue_output.csv
はglue catalog(Athena)で管理しているテーブルのリストです
name,resource_type,schema
"test","glue_catalog","database_a"
unmanaged_tables.csv
はdbt_output.csv
,glue_output.csv
の両方で存在しなかったテーブルのリストです。
これらのファイルより
- dbtでは3つのテーブルを管理
- Athenaには4つのテーブルがある
- Athenaのdatabase_aにあるtestテーブルが双方で管理されていない
ということが分かります。
コード詳細
dbt 管理テーブル出力
# sourcesのテーブル群を出力
# jsonを配列に格納
# source_nameをschemaに変換
# name,resource_type,source_nameだけに絞る→最初の配列のキーを取得→カンマで区切ってそれ以降はname,resource_type,source_nameの値を取得→@csvでcsv出力
dbt_source_data=$(poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')
# # modelのテーブル群を出力
dbt_model_data=$(poetry run dbt list --resource-types model --output json --output-keys name resource_type schema --log-level warn --target prod \
| jq -s '.' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')
for data in "$dbt_source_data" "$dbt_model_data"; do
dbt_content+=$(echo "$data" | sed 1d)
dbt_content+="\n"
done
echo -e "$dbt_content" > $dbt_data
こちらのコードについての説明です。
dbt list
dbt listは、dbtで管理しているリソースの一覧を表示するコマンドです。
--resource-types source --output json --output-keys name resource_type source_name --log-level error
--resource-types source
と--output-keys
,--log-level error
を指定して、jsonを書き出しています。
--log-level error
はこれを外すとコマンド実行時の最初のログがでてしまい、jqでjsonとして正しくパースできないのでつけています。
--resource-types source
はdbt lsで出力するresource-typeを指定しています。
--resource-types, --resource-type [metric|semantic_model|source|analysis|model|test|exposure|snapshot|seed|default|all]
Restricts the types of resources that dbt
will include
$ poetry run dbt list --resource-types source --output json --log-level error --target prod
このコマンドを実行すると
{
"name": "table_a",
"resource_type": "source",
"package_name": "prod",
"original_file_path": "aws/athena/models/source.yml",
"unique_id": "source.prod.database_a.table_a",
"source_name": "database_a",
"tags": [],
"config": {
"enabled": true
}
}
{
"name": "table_b",
"resource_type": "source",
"package_name": "prod",
"original_file_path": "aws/athena/models/source.yml",
"unique_id": "source.prod.database_b.table_b",
"source_name": "database_b",
"tags": [],
"config": {
"enabled": true
}
}
のように出力されます。そこから--output-keys name resource_type source_name
を指定して、必要なカラムだけに絞っています。
$ poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod | jq .
{
"name": "table_a",
"resource_type": "source",
"source_name": "database_a"
}
{
"name": "table_b",
"resource_type": "source",
"source_name": "database_b"
}
jq操作
jqを使い、csvのフォーマットに変換します。
先ほどの出力は複数のjsonが出力結果として出てしまっているので、配列にします。
jq -s '.'
で配列に括ることができます。
$ poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod | jq -s '.'
[
{
"name": "table_a",
"resource_type": "source",
"source_name": "database_a"
},
{
"name": "table_b",
"resource_type": "source",
"source_name": "database_b"
}
]
次にキー名を変更します。これは後から続けて実行するdbt model, glue catalogとカラム名を合わせる為です。
jq '[.[] | .["schema"] = .source_name | del(.source_name)]'
をパイプすることで.source_nameキーの値をもとにschemaキーのデータを作り、その後にsource_nameを削除します。
$ poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]'
[
{
"name": "table_a",
"resource_type": "source",
"schema": "database_a"
},
{
"name": "table_b",
"resource_type": "source",
"schema": "database_b"
}
]
最後に、出力するキーを指定し、json配列の1つ目の要素からキーを取り出し、その後name, resource_type, schemaキーの値を取り出し、csv形式の文字列に変換します。
poetry run dbt list --resource-types source --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
"name","resource_type","schema"
"table_a","source","database_a"
"table_b","source","database_b"
これで--resource-types sourceに対するテーブル出力ができました
--resource-types model部分についてもほぼ似たようなことを行っています。
union操作と書き出し
dbt_content="name,resource_type,schema\n"
for data in "$dbt_source_data" "$dbt_model_data"; do
dbt_content+=$(echo "$data" | sed 1d)
dbt_content+="\n"
done
echo -e "$dbt_content" > $dbt_data
最後にこれらのデータについて、sed 1d
で1行目を省いた状態で各データをdbt_content変数に追記し、書き出します。
Athena 管理テーブル出力
# AWS Glueのテーブル群を出力
glue_datababases=(
"database_a"
"database_b"
)
get_glue_tables() {
local database_name=$1
aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name, DatabaseName: DatabaseName}' --profile mds-sts \
| jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
}
for db in "${glue_datababases[@]}"; do
result=$(get_glue_tables "$db")
glue_content+=$(echo "$result" | sed 1d)
glue_content+="\n"
done
echo -e "$glue_content" > $glue_data
次にAthenaテーブル出力部分です。
glue get-tables
aws cli glue get-tablesコマンドを使います。
このコマンドでは、databaseを単一指定しかできない為、glue_databases
配列に対象のdatabaseを入れています。
そして、--query
オプションで出力されるデータを絞っています。
$ aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name, DatabaseName: DatabaseName}' --profile mds-sts
[
{
"Name": "table_a",
"DatabaseName": "database_a"
},
{
"Name": "test",
"DatabaseName": "database_a"
}
]
こちらについては最初から配列になっています。
jq操作
この部分についてはdbtのjq操作とほとんど同じです。
キー名を変更し、csvへ変換しています。
aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name, DatabaseName: DatabaseName}' --profile mds-sts \
| jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
"name","resource_type","schema"
"table_a","glue_catalog","database_a"
"test","glue_catalog","database_a"
union操作と書き出し
こちらもdbtのunion操作と書き出しとほぼ同じです。
for db in "${glue_datababases[@]}"; do
result=$(get_glue_tables "$db")
glue_content+=$(echo "$result" | sed 1d)
glue_content+="\n"
done
echo -e "$glue_content" > $glue_data
非管理テーブルの抽出
最後にそれぞれをunionし、awkコマンド(GNU awk)で完全に重複が無いもののみ出力します。
# それぞれUNIONした後にawsで重複を削除
combined_content="${dbt_content}$(echo -e "$glue_content" | sed 1d)"
# 完全に重複がないもののみ出力する
awk -F, 'NR==FNR{a[$1,$3]++; next} a[$1,$3]==1' <(echo -e "$combined_content") <(echo -e "$combined_content") > unmanaged_tables.csv
完全に重複がないもののみ出力
のコマンドですが、combined_content
を2回、入力として渡しています。
NR==FNR{}
の処理はNR=通し番号,FNR=ファイルごとに振られる通し番号となるため、1つ目の入力に対して処理されます。
NF==FNR{}
の処理の中身をa[$1, $3]++;
とすることで、連想配列aのキーとして1列目と3列目の組み合わせを使って、出現回数をカウントします。重複がなければ1、あれば1より大きい値になります。
2つ目の入力に対してはa[$1,$3]==1
の処理を行います。NR==FNR
ではないので、NR==FNR{a[$1,$3]++; next}
の部分はスルーされます。
結果的にa[$1,$3]==1
で1のみの値(要するに重複のないデータ)を取り出すことができ、以下のようなcsvが作成されます。
$ cat unmanaged_tables.csv
name,resource_type,schema
"test","source","database_a"
最後に
ここまで見ていただきありがとうございました。不備やアドバイスがあればコメントいただけますと嬉しいです!
dbtの設定について
--resource-type source
については、sourcesに記載するnameキーがAthenaのdatabase名と一致する必要があります。
参考ページ