はじめに
dbt + Athena でデータ処理フローを作成する時、データの実態はデータベース上ではなくS3上に保存されます。なのでフロー再実行時にS3上のデータを上書するのに一癖あったのですが、うまく設定して新しいテーブルを作れるようにしました。
本記事ではその概要や設定ファイルについて示します。
この記事で紹介すること
- dbt+athebaの設定ファイルと簡単な説明
- 実装時の注意点
紹介しないこと
- dbtの基本的な概念、記述方法
- メタデータ関連
- AWSの認証、Athena, Glue等の説明
dbt+Athenaの特殊性(読み飛ばしてOK)
dbtはSQLでパイプラインを形成できるツールです。いわゆるDAGというやつですね。
その内部では、データベースへのクエリとしてCREATE TABLE
であったりDROP TABLE
を実行しています。
基本的に裏側にデータベースが用意されているのが前提となっている訳です。
ただ、Amazon Athena というサービスはストレージに対するクエリサービスです。Athena内ではETLが自己完結しており、
ストレージ ⇒ 処理領域にダウンロード ⇒ クエリ実行 ⇒ ストレージにアップロード
という手順を内部で勝手に行ってくれています。
何が言いたいかというと、裏側にデータベースが存在せず、ストレージ上の実ファイルがそのまま存在するわけです。
そんなわけで特に「上書き」という処理と相性が悪いようでした。
ただ、dbtもAthenaに対応しているということで、試行錯誤の結果dbt+Athenaを他のデータベースと同じように扱うことができました。
Athenaでdbtが使えれば、つまりデータベースが無くてもdbtの恩恵にあずかれるので、とても素晴らしいことではないでしょうか?
本記事では、Athenaに対してdbtを適用してテーブルを作成、ひいては上書きする方法を残します。
Iceberg形式が推奨されているような気もしますが、今回はHive形式で実装しています。
実装内容
必要なライブラリ
以下が必要でした。
pip install dbt-core dbt-athena
各種コード
さっそくコードの内容を紹介していきます。
my_profile:
outputs:
prd:
type: athena
s3_staging_dir: s3://athena-result-my_project-20250509/dbt/staging/
s3_mart_dir: s3://athena-result-my_project-20250509/dbt/marts/
s3_data_dir: s3://athena-result-my_project-20250509/dbt/data/
table_type: hive
region_name: ap-northeast-1
# data_catalog: AwsDataCatalog
database: AwsDataCatalog
schema: my_schema
threads: 1
num_retries: 0
target: prd
type: athenaと入力することで、Ahtneaを処理するモードになるようです。
s3_***_dirは記述しろとドキュメントに書いてありましたが、書いたところで反映されてない気がしてます(笑)
name: 'my_project'
version: '1.0.0'
# This setting configures which "profile" dbt uses for this project.
profile: 'my_profile'
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets: # directories to be removed by `dbt clean`
- "prd"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
my_project:
# Config indicated by + and applies to all files under models/example/
+materialized: view
+format: JSON
+materialized: table
と設定したはずなのですが、なぜかデフォルトではViewになってしまいます……。ちょっとこの辺りの挙動は謎です。
次はクエリのサンプルを掲載します。
{{
config(
tags=["first_step"],
pre_hook="{{ msck_repair_table('my_schema', 'my_table') }}"
)
}}
SELECT *
FROM {{ source('my_schema', 'my_table') }}
pre_hook
には自作定義マクロを使用しました。Athenaの場合、テーブルの読み込み前にパーティション情報を更新しないとそのパーティションが無いものとして扱われるので、その対策です。
Glueカタログ側で定義したHIVEパーティションを使って区切っていない場合は必要ないかもしれません。
{% macro msck_repair_table(database, table) %}
{% set sql %}
MSCK REPAIR TABLE {{ database }}.{{ table }}
{% endset %}
{% do log("Running MSCK REPAIR TABLE: " ~ sql, info=True) %}
{% do run_query(sql) %}
{% endmacro %}
次は中間Viewです。
下記のように定義したところ、Athena上ではView扱いになりました。
{{
config(
tags=["first_step"],
)
}}
SELECT
item_code,
site
FROM
{{ ref('stg_sample') }}
最後に、Martで上書き可能な実テーブル定義します。
{{
config(
materialized='incremental',
partitioned_by=['yyyymm'],
incremental_strategy='insert_overwrite',
external_location='s3://my-bucket/path/to/resource/',
format='json',
table_type='hive',
tags=['second_step'],
)
}}
SELECT *
FROM {{ ref('int_sample') }}
configにて、
- materialized='incremental'
- incremental_strategy='insert_overwrite'
と指定し、 - external_location
を設定すれば、実行のたびにデータが上書きされるAthenaテーブルを作成できました。
いろいろ試行錯誤したのですが、このconfig内でうまく設定するのが大事なようですね。
結果
全体をお見せできなくて恐縮ですが、私の環境では以下のようになってくれました。
monthly_itemmasterというテーブルが、上書き可能なテーブル、int~~~はビューとして作成されました。
実ファイルの前後比較も行ってみます。
monthly_itemmasterの実態ファイルがこちらで、
見ての通り、古いファイルは削除されて、新しいファイルに置き換わっています!
所感
他のシステムでdbtを使ったことが無いのですが、dbtでSQLベースのDAGを作るのにデータベースシステムを立ち上げなくていいのはとても楽な気がしています。
もちろん dbt test
も実行できるので、データ品質も担保できそうです。
今後も積極的に使っていきたいですね。
注意点
経験則ではありますが、以下のような状況ではうまくいかないことが多かったです。
- アウトプットをGlue定義のテーブルにする
- 後からカラム定義を変更する
- 処理が失敗したとき、もしかすると古いファイルが残るかもしれない ⇒ dbt test でのユニーク制約チェックはしておいた方が良さそうです
あと、よくエラーメッセージで「dbtはS3上のファイルを削除できないので、あなた自身で削除してから再実行してくださいね」と出てきたのですが、実のところあまり関係が無いように感じました。