2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

dbtとAthenaで管理されていないテーブル一覧を出力する

Posted at

目的

dbt-coredbt-athena-communityを使い、Amazon Athenaへデータモデルを構築しています。

運用していくうえで、

  • Athenaにあるテーブルがdbt側で管理されていない
  • dbt側にある定義のテーブルがAthenaにない

この2つについて、管理し定期的にメンテナンスがしたくなりました。

そこで、dbtとAthenaいずれかで管理されていないテーブル一覧を出力するスクリプトを作りました。

コード

先にコードの全容と、実行結果です。

#!/bin/bash

dbt_content="name,resource_type,schema\n"
glue_content="name,resource_type,schema\n"
dbt_data="dbt_output.csv"
glue_data="glue_output.csv"

# sourcesのテーブル群を出力
# jsonを配列に格納
# source_nameをschemaに変換
# name,resource_type,source_nameだけに絞る→最初の配列のキーを取得→カンマで区切ってそれ以降はname,resource_type,source_nameの値を取得→@csvでcsv出力
dbt_source_data=$(poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')

# # modelのテーブル群を出力
dbt_model_data=$(poetry run dbt list --resource-types model --output json --output-keys name resource_type schema --log-level warn --target prod \
| jq -s '.' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')

for data in "$dbt_source_data" "$dbt_model_data"; do
    dbt_content+=$(echo "$data" | sed 1d)
    dbt_content+="\n"
done

echo -e "$dbt_content" > $dbt_data

# AWS Glueのテーブル群を出力
glue_datababases=(
    "database_a"
    "database_b"
)

get_glue_tables() {
    local database_name=$1

    aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name,  DatabaseName: DatabaseName}' --profile mds-sts \
    | jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
    | jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
}

for db in "${glue_datababases[@]}"; do
    result=$(get_glue_tables "$db")

    glue_content+=$(echo "$result" | sed 1d)
    glue_content+="\n"
done

echo -e "$glue_content" > $glue_data

# それぞれUNIONした後にawsで重複を削除
combined_content="${dbt_content}$(echo -e "$glue_content" | sed 1d)"

# 完全に重複がないもののみ出力する 
awk -F, 'NR==FNR{a[$1,$3]++; next} a[$1,$3]==1' <(echo -e "$combined_content") <(echo -e "$combined_content") > unmanaged_tables.csv
$ bash export_table_catalogs.sh

上記ファイルを実行すると、以下3つのcsvファイルが出力されます。

dbt_output.csv
name,resource_type,schema
"table_a","source","database_a"
"table_b","source","database_b"
"table_c","model","database_b"

dbt_output.csvはdbtで管理しているテーブルのリストです。

glue_output.csv
name,resource_type,schema
"table_a","glue_catalog","database_a"
"table_b","glue_catalog","database_b"
"table_c","glue_catalog","database_b"
"test","glue_catalog","database_a"

glue_output.csvはglue catalog(Athena)で管理しているテーブルのリストです

unmanaged_tables.csv
name,resource_type,schema
"test","glue_catalog","database_a"

unmanaged_tables.csvdbt_output.csv,glue_output.csvの両方で存在しなかったテーブルのリストです。

これらのファイルより

  • dbtでは3つのテーブルを管理
  • Athenaには4つのテーブルがある
  • Athenaのdatabase_aにあるtestテーブルが双方で管理されていない

ということが分かります。

コード詳細

dbt 管理テーブル出力

# sourcesのテーブル群を出力
# jsonを配列に格納
# source_nameをschemaに変換
# name,resource_type,source_nameだけに絞る→最初の配列のキーを取得→カンマで区切ってそれ以降はname,resource_type,source_nameの値を取得→@csvでcsv出力
dbt_source_data=$(poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')

# # modelのテーブル群を出力
dbt_model_data=$(poetry run dbt list --resource-types model --output json --output-keys name resource_type schema --log-level warn --target prod \
| jq -s '.' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv')

for data in "$dbt_source_data" "$dbt_model_data"; do
    dbt_content+=$(echo "$data" | sed 1d)
    dbt_content+="\n"
done

echo -e "$dbt_content" > $dbt_data

こちらのコードについての説明です。

dbt list

dbt listは、dbtで管理しているリソースの一覧を表示するコマンドです。

 --resource-types source  --output json --output-keys name resource_type source_name --log-level error

--resource-types source--output-keys,--log-level errorを指定して、jsonを書き出しています。

--log-level errorはこれを外すとコマンド実行時の最初のログがでてしまい、jqでjsonとして正しくパースできないのでつけています。

--resource-types sourceはdbt lsで出力するresource-typeを指定しています。

--resource-types, --resource-type [metric|semantic_model|source|analysis|model|test|exposure|snapshot|seed|default|all]
Restricts the types of resources that dbt
will include

$ poetry run dbt list --resource-types source  --output json  --log-level error --target prod

このコマンドを実行すると

{
  "name": "table_a",
  "resource_type": "source",
  "package_name": "prod",
  "original_file_path": "aws/athena/models/source.yml",
  "unique_id": "source.prod.database_a.table_a",
  "source_name": "database_a",
  "tags": [],
  "config": {
    "enabled": true
  }
}
{
  "name": "table_b",
  "resource_type": "source",
  "package_name": "prod",
  "original_file_path": "aws/athena/models/source.yml",
  "unique_id": "source.prod.database_b.table_b",
  "source_name": "database_b",
  "tags": [],
  "config": {
    "enabled": true
  }
}

のように出力されます。そこから--output-keys name resource_type source_nameを指定して、必要なカラムだけに絞っています。

$ poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod | jq .
{
  "name": "table_a",
  "resource_type": "source",
  "source_name": "database_a"
}
{
  "name": "table_b",
  "resource_type": "source",
  "source_name": "database_b"
}

jq操作

jqを使い、csvのフォーマットに変換します。

先ほどの出力は複数のjsonが出力結果として出てしまっているので、配列にします。

jq -s '.'で配列に括ることができます。

$ poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod | jq -s '.'
 [
  {
    "name": "table_a",
    "resource_type": "source",
    "source_name": "database_a"
  },
  {
    "name": "table_b",
    "resource_type": "source",
    "source_name": "database_b"
  }
]

次にキー名を変更します。これは後から続けて実行するdbt model, glue catalogとカラム名を合わせる為です。

jq '[.[] | .["schema"] = .source_name | del(.source_name)]'をパイプすることで.source_nameキーの値をもとにschemaキーのデータを作り、その後にsource_nameを削除します。

$ poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]'
[
  {
    "name": "table_a",
    "resource_type": "source",
    "schema": "database_a"
  },
  {
    "name": "table_b",
    "resource_type": "source",
    "schema": "database_b"
  }
]

最後に、出力するキーを指定し、json配列の1つ目の要素からキーを取り出し、その後name, resource_type, schemaキーの値を取り出し、csv形式の文字列に変換します。

poetry run dbt list --resource-types source  --output json --output-keys name resource_type source_name --log-level error --target prod \
| jq -s '.' \
| jq '[.[] | .["schema"] = .source_name | del(.source_name)]' \
| jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
"name","resource_type","schema"
"table_a","source","database_a"
"table_b","source","database_b"

これで--resource-types sourceに対するテーブル出力ができました

--resource-types model部分についてもほぼ似たようなことを行っています。

union操作と書き出し

dbt_content="name,resource_type,schema\n"

for data in "$dbt_source_data" "$dbt_model_data"; do
    dbt_content+=$(echo "$data" | sed 1d)
    dbt_content+="\n"
done

echo -e "$dbt_content" > $dbt_data

最後にこれらのデータについて、sed 1dで1行目を省いた状態で各データをdbt_content変数に追記し、書き出します。

Athena 管理テーブル出力

# AWS Glueのテーブル群を出力
glue_datababases=(
    "database_a"
    "database_b"
)

get_glue_tables() {
    local database_name=$1

    aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name,  DatabaseName: DatabaseName}' --profile mds-sts \
    | jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
    | jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
}

for db in "${glue_datababases[@]}"; do
    result=$(get_glue_tables "$db")

    glue_content+=$(echo "$result" | sed 1d)
    glue_content+="\n"
done

echo -e "$glue_content" > $glue_data

次にAthenaテーブル出力部分です。

glue get-tables

aws cli glue get-tablesコマンドを使います。

このコマンドでは、databaseを単一指定しかできない為、glue_databases配列に対象のdatabaseを入れています。

そして、--queryオプションで出力されるデータを絞っています。

$ aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name,  DatabaseName: DatabaseName}' --profile mds-sts
[
    {
        "Name": "table_a",
        "DatabaseName": "database_a"
    },
    {
        "Name": "test",
        "DatabaseName": "database_a"
    }
]

こちらについては最初から配列になっています。

jq操作

この部分についてはdbtのjq操作とほとんど同じです。
キー名を変更し、csvへ変換しています。

aws glue get-tables --database-name "$database_name" --output json --query 'TableList[].{Name: Name,  DatabaseName: DatabaseName}' --profile mds-sts \
    | jq '[.[] | .["name"] = .Name | .["schema"] = .DatabaseName | .["resource_type"] = "glue_catalog" | del(.Name, .DatabaseName)]' \
    | jq -r '[.[] | {name, resource_type, schema}] | (first | keys_unsorted), (.[] | [.name, .resource_type, .schema]) | @csv'
"name","resource_type","schema"
"table_a","glue_catalog","database_a"
"test","glue_catalog","database_a"

union操作と書き出し

こちらもdbtのunion操作と書き出しとほぼ同じです。

for db in "${glue_datababases[@]}"; do
    result=$(get_glue_tables "$db")

    glue_content+=$(echo "$result" | sed 1d)
    glue_content+="\n"
done

echo -e "$glue_content" > $glue_data

非管理テーブルの抽出

最後にそれぞれをunionし、awkコマンド(GNU awk)で完全に重複が無いもののみ出力します。

# それぞれUNIONした後にawsで重複を削除
combined_content="${dbt_content}$(echo -e "$glue_content" | sed 1d)"

# 完全に重複がないもののみ出力する 
awk -F, 'NR==FNR{a[$1,$3]++; next} a[$1,$3]==1' <(echo -e "$combined_content") <(echo -e "$combined_content") > unmanaged_tables.csv

完全に重複がないもののみ出力

のコマンドですが、combined_contentを2回、入力として渡しています。

NR==FNR{}の処理はNR=通し番号,FNR=ファイルごとに振られる通し番号となるため、1つ目の入力に対して処理されます。

NF==FNR{}の処理の中身をa[$1, $3]++;とすることで、連想配列aのキーとして1列目と3列目の組み合わせを使って、出現回数をカウントします。重複がなければ1、あれば1より大きい値になります。

2つ目の入力に対してはa[$1,$3]==1の処理を行います。NR==FNRではないので、NR==FNR{a[$1,$3]++; next}の部分はスルーされます。

結果的にa[$1,$3]==1で1のみの値(要するに重複のないデータ)を取り出すことができ、以下のようなcsvが作成されます。

$ cat unmanaged_tables.csv
name,resource_type,schema
"test","source","database_a"

最後に

ここまで見ていただきありがとうございました。不備やアドバイスがあればコメントいただけますと嬉しいです!

dbtの設定について

--resource-type sourceについては、sourcesに記載するnameキーがAthenaのdatabase名と一致する必要があります。

参考ページ

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?