はじめに
MWAAでdbtを使いたい場合、パッケージの依存関係の問題で使うのが難しいという話がありました。
ただ、現時点で(というより、2023年のMWAAの機能追加にて)以前よりは楽に使えるようになりました。
今回はこれを解説していこうと思います。
当時の問題は?
パッケージの依存関係の問題で使うのが難しい
この部分をもう少し解説します。
そもそも、AirflowではTaskの実行時にWorkerを起動して、その上でTaskを実行します。
ただ、このWorkerはAirflow側が自動で起動している環境で、色々なライブラリが不足している可能性があります。
そこで、requirements.txtにWorkerに入れたいライブラリの名前とバージョンを記載してS3に格納しておくと、Worker起動時にインストールして、Taskを実行してくれます。
ただ、この方法だと、Workerに直接ライブラリをインストールしてしまいます。Airflowとして、Workerに入れておきたいライブラリのバージョンと競合してしまい、うまくdbtを使えなかったのです。
今はどうやる?
2023年4月に、スタートアップスクリプトを使える機能が追加されました。
これは、Workerなどが起動したときに、自動で実行するスクリプトを仕込んでおける仕組みです。環境変数の設定や、特定のライブラリのインストールなど、準備ができます。
requirements.txtはライブラリを指定しておくことしかできませんが、スタートアップスクリプトはシェルスクリプトなので自由に処理を記載できます。
このスタートアップスクリプトを使って、以下を実施します。
- virtualenvで仮想環境を作成
- その仮想環境上で、dbtをインストール
virtualenvで仮想環境を使うことで、Workerとdbtの実行環境を分離できます。
こうすることで、以前問題となっていたAirflow側との依存の競合を回避することができます。
本内容は、以下の公式ドキュメントに記載されています。
実装方法
実際にやってみました。
スタートアップスクリプトの作成と適用
まずは、スタートアップスクリプトを作成します。
#!/bin/bash
if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
then
exit 0
fi
echo "------------------------------"
echo "Installing virtual Python env"
echo "------------------------------"
pip3 install --upgrade pip
echo "Current Python version:"
python3 --version
echo "."
sudo pip3 install --user virtualenv
sudo mkdir -p /usr/local/airflow/python3-virtualenv
cd /usr/local/airflow/python3-virtualenv
sudo python3 -m venv dbt-env
sudo chmod -R 777 *
echo "------------------------------"
echo "Activating venv"
echo "------------------------------"
source dbt-env/bin/activate
pip3 list
echo "------------------------------"
echo "Installing libraries."
echo "------------------------------"
pip3 install dbt-athena-community==1.10.0
echo "------------------------------"
echo "Venv libraries."
echo "------------------------------"
pip3 list
dbt --version
echo "------------------------------"
echo "Deactivating venv."
echo "------------------------------"
deactivate
ほとんど先ほどのドキュメントで記載されていた内容と同じです。
ただし、今回私はdbt-athenaを試したかったので、dbt-athena-communityをインストールしています。
スクリプトを作ったら、以下のドキュメントのように、S3に格納して、スクリプトを環境に紐づけます。
(CLIでの環境紐づけのコマンド。)
aws mwaa update-environment \
--name your-mwaa-environment \
--startup-script-s3-path startup.sh \
--startup-script-s3-object-version BbdVMmBRjtestta1EsVnbybZp1Wqh1J4
上記を実行すると、環境のupdateが走るのでしばらく待機。
dbtを呼び出すDAG作成
以下のように、BashOperatorでdbt runします。
その実行する前に、dbtプロジェクトのディレクトリを/tmpにコピーしています。これは、MWAAのdags/ディレクトリが読み取り専用の場合があり、dbtが実行時にtarget/やlogs/を書き込む必要があるためです。
また、dbtのバイナリは仮想環境内のもの(/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt)をフルパスで指定して実行します。
最後は、次のDAG実行の時に備えて、コピーしたファイルを削除します(起動しているWorkerはDAG実行と共に停止するわけではなく、使いまわされる可能性があるため)。
from datetime import datetime
from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.bash import BashOperator
DWH_A = Asset("s3://test-bucket/dwh/A")
DWH_B = Asset("s3://test-bucket/dwh/B")
DWH_C = Asset("s3://test-bucket/dwh/C")
with DAG(
dag_id="dbt_run",
schedule=(DWH_A & DWH_B & DWH_C),
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["dbt"],
) as dag:
BashOperator(
task_id="run_dbt",
bash_command=(
"cp -R /usr/local/airflow/dags/dbt /tmp/dbt && "
"/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run "
"--profiles-dir /tmp/dbt --project-dir /tmp/dbt && "
"rm -rf /tmp/dbt"
),
)
このDAGでは、3つのAssetsの実行をトリガーとして定義していますが、今回のdbtの件とは特に関係ありません
DAG実行
DAGを実行すると、成功しているのが分かります。
まとめ
今回は、MWAAでは最新のAirflow v3.0.6でdbtの実行を試してみました。
IAMやLake Formationの権限エラーはいくつか出ましたが、それなりにすんなり動いたかなと思います。
MWAA×dbtの構成は、公式ブログでも最近いくつか見るようになってきています。もう少し理解を進めて、データ基盤に導入できるようにしていきたいと思います。
