やりたいこと
副業先の1社では、ワークフロー管理としてApache AirflowのAWSマネージドサービスであるAmazon Managed Workflows for Apache Airflow(MWAA)を利用しています。
DAG実行時にいくつか環境依存情報や機密情報を取得するため、環境構築後Airflow UI上でAdmin -> Variablesから環境変数を設定していました。
こちらの設定を自動化したく、2つの方法を試してみました。
前提
- MWAAのリソース含め、AWS CDKで環境の初期構築をしている。
- Apache Airflowのバージョンは2.2.2。
- Admin -> Variablesに、環境構築後ではないと取得できない値をいくつか変数として設定している。
- 過去にParameterStoreで管理する方法を採用していたが、ジョブの実行頻度が高く、ThrottlingExceptionが頻発していた。(TPSのデフォルトは40だが、引き上げは可能)
結論
Airflow CLIを利用して cdk deploy
実行後にAWS CodeBuildで作成したビルドプロジェクトをトリガーしてジョブ実行する方法に落ち着きました。
以下に試した方法と記載します。
StartupScriptを試してみる
説明は以下のマニュアルに載っている。
マニュアルにあるサンプルスクリプトをS3に配置して、設定値が取得できそうか試してみる。
#!/bin/sh
export ENVIRONMENT_STAGE="development"
echo "$ENVIRONMENT_STAGE"
MWAAリソースを作成するcdkのコードに必要な設定を追加。
// cdkのコード一部抜粋
const deployment = new s3deploy.BucketDeployment(this, 'UploadRequirement', {
sources: [s3deploy.Source.asset(path.join(__dirname, '../../src/mwaa'))], //この階層以下にstartup.shを配置
destinationBucket: airflowBucket,
retainOnDelete: false
})
const cfnEnvironment = new mwaa.CfnEnvironment(this, 'Mwaa', {
name: `${serviceName}-${envName}-airflow`,
airflowVersion: '2.2.2',
dagS3Path: 'dags',
requirementsS3Path: 'requirements.txt',
startupScriptS3Path: 'startup.sh', // startup.shを指定
sourceBucketArn: deployment.deployedBucket.bucketArn, // bucketを指定
...
環境変数で設定した値を出力することができた。
...
[2023-12-xx, xx:xx:xx UTC] {{logging_mixin.py:109}} INFO - ENVIRONMENT_STAGE: development
...
ただ、StartupScriptを利用するにはMWAAリソース作成時に環境変数の設定内容が固まっている必要があるため、一旦保留。
Airflow CLIを試してみる
Airflowの公式ドキュメントに記載されている airflow variables import
が使えそうだった。
variables.jsonというファイルを作成して試してみたが、エラーが発生して取り込むことができなかった。。。(エラー内容もOops...ぐらいしかでず、不親切でよくわからない。Airflow UI上では取り込むことができるが...)
先にMWAAのマニュアルを確認しておくべきだったが、Apache Airflowでは対応しているものの、どうやらMWAAでは対応していないようだった。
結果的にはvariables import
ではなく、サポートしているvariables set
を利用して、取り込むことにした。
一応今後variables import
が対応された時のためにvariables.jsonというjsonファイルを作成するようにしている。
build:
commands:
- >
CLI_JSON=$(aws mwaa create-cli-token --name foodies-$env-airflow)
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken')
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname')
- |
cat <<EOF >variables.json
{
"web_server_url": "https://${WEB_SERVER_HOSTNAME}",
"hoge": "${HOGE}",
"hoge_api_key": "xxxxxxxxxxxxx",
"hoge_token": "xxxxxxxxxxxxx"
}
EOF
- |
cat variables.json | jq -r 'to_entries[] | .key + " " + .value' | while read key value
do
curl -X POST "https://${WEB_SERVER_HOSTNAME}/aws_mwaa/cli" \
-H "Authorization: Bearer ${CLI_TOKEN}" \
-H "Content-Type: text/plain" \
-d "variables set $key $value" | echo $(jq -r .stdout) | base64 -d
done
post_build:
commands:
- echo "Finish creating airflow variables."
取り込んだ結果は以下。
最後に
おそらく事前に設定した環境変数をDAG実行時に取得する方法はいくつかあると思います。
また、その自動化方法もいくつかあるはずです。
会社や提供しているサービスの前提条件によって、求める内容が変わると思うので、いくつかある方法から最適な方法を見つけて試してみるのが良いと思います。(まあ何でもそうですよね)