本記事ではGCPのVMインスタンスでAirflowサーバーを構築・運用している事例について紹介します。
Airflowを使うようになった背景
弊社(株式会社unerry)では、他社様とデータの受け渡しをしています。
こういった連携が始まった当初は個別の案件ごとにmicroインスタンスのサーバーを建てたり、Cloud Functions+Cloud Schedulerを使ったりしていました。
しかし案件が増えてきた時に問題になったのは、障害などでデータが遅れて提供できなかった時、バッチの再実行を個別の実装に応じてあっちこっちマニュアルで行うのに手間がかかるということでした。
Airflowはexecution_dateという概念があり、過去のジョブの再実行が容易です。1
また、多くのジョブをDAGの一覧で管理できることが嬉しく、それらが導入のきっかけでした。
こんな感じです。2
GCPにはマネージドのCloud Composerがあるよ
あるんですけど、これ高いです。東京リージョンだと小さめの構成でも$500/月くらい。
弊社ではデータ連携以外でも定期的に重い分析処理3をすることがありますが、R言語やシェルを駆使したりと必ずしもPythonで処理する訳ではありません。ジョブの実行管理が出来れば良いので、GCEでAirflowを建てて使っています。4
基本的にPythonで処理かつAirflowでゴリゴリに並列処理してCPU使うよ。といったケースであればCloud Composerも選択肢に挙がってくるかと思います。5
GCPで使うリソース
- VMインスタンス
- Container-Optimized OS
- ロードバランサ
- HTTPS用です
- Container Registry
- 後述するAirflowのカスタムイメージを置きます
- Cloud SQL
- MySQLを例にします
- Monitoringのアラート
- マネージドではないので監視は必須です。弊社では以下を監視しています。
- Uptime Check (HTTPSの生死確認)
- ディスク使用量
- メモリ使用量
- マネージドではないので監視は必須です。弊社では以下を監視しています。
- IAM
- Airflow用のサービスアカウントを作りCloud Functions起動元の権限などを個別に付与出来るようにします
以降、本記事で解説する範囲
AirflowのDockerイメージのカスタマイズ、Container-Optimized OSのcloud-init、データベースの初期化に限定します。その他について特別なことは無いので割愛します。
AirflowのDockerイメージのカスタマイズ
公式イメージのままでも動かすことは出来るのですが、実用上、以下を追加しています。
- Cloud SDK
- BashOperatorからgcloudコマンドを使うためです
- 自前のログ削除シェル
- 公式のDockerイメージにclean-logs.shが含まれているのですが、これは空になったディレクトリを削除してくれません 6
Dockerfile
FROM apache/airflow:2.2.2
USER root
RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && \
apt-get update && \
apt-get install -y google-cloud-sdk
COPY --chown=airflow:root custom-clean-logs.sh /custom-clean-logs
RUN chmod a+x /custom-clean-logs
USER airflow
custom-clean-logs.sh
#!/usr/bin/env bash
set -euo pipefail
readonly DIRECTORY="${AIRFLOW_HOME:-/usr/local/airflow}"
readonly RETENTION="${AIRFLOW__LOG_RETENTION_DAYS:-15}"
trap "exit" INT TERM
readonly EVERY=$((15*60))
echo "Cleaning logs every $EVERY seconds"
while true; do
echo "Trimming airflow logs to ${RETENTION} days."
find "${DIRECTORY}"/logs \
-type d -name 'lost+found' -prune -o \
-type f -mtime +"${RETENTION}" -name '*.log' -print0 | \
xargs -0 rm -f
# 公式のclean-logs.shにこれを足しています
echo "Delete empty directories."
find "${DIRECTORY}"/logs -type d -empty -delete
seconds=$(( $(date -u +%s) % EVERY))
(( seconds < 1 )) || sleep $((EVERY - seconds))
done
Container-Optimized OSのcloud-init
cloud-init.ymlの内容はVMインスタンス作成時のカスタムメタデータにキーuser-data
の値として貼り付けます。以下、弊社のデータ連携用Airflowの設定内容です。7
#cloud-config
# AirflowのDockerイメージがairflowユーザーで動作するためホスト側もairflowユーザーを作成します
users:
- name: airflow
write_files:
# Airflow起動時の環境変数ファイル
- path: /var/airflow/env
content: |
AIRFLOW_HOME=/var/airflow
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://dbuser:dbpassword@cloud_sql_proxy:3306/dbname
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__PARALLELISM=4
AIRFLOW__WEBSERVER__BASE_URL=
AIRFLOW__WEBSERVER__WORKERS=2
AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX=True
owner: airflow:airflow
# AIRFLOW_HOME: DAGやログなどの配置場所になります。お好みで。
# AIRFLOW__CORE__SQL_ALCHEMY_CONN: データベースの接続設定です。
# AIRFLOW__CORE__EXECUTOR: 一台構成かつ並列実行したいのでLocalExecutorを指定します。
# AIRFLOW__CORE__PARALLELISM: 並列数はマシンスペックと相談で。デフォルトの32で低スペックマシンだとメモリ不足で死にます。
# AIRFLOW__WEBSERVER__BASE_URL: デフォルトはhttp://localhost:8080なのですが、空にしておかないとタスクのログが見れません。
# AIRFLOW__WEBSERVER__WORKERS: デフォルトは4ですが、そんなに必要ないと思うのでケチってます。
# AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX: ロードバランサ用の設定です。
# systemd のユニットファイル - Cloud SQL Proxy
- path: /etc/systemd/system/cloud_sql_proxy.service
content: |
[Unit]
Description=Cloud SQL Proxy daemon
After=network.target
[Service]
ExecStart=/usr/bin/docker run --name=cloud_sql_proxy --rm --net=db gcr.io/cloudsql-docker/gce-proxy:latest /cloud_sql_proxy -instances=abechan-no-project-id:asia-northeast1:airflow=tcp:0.0.0.0:3306
ExecStop=/usr/bin/docker stop cloud_sql_proxy
Restart=on-failure
# systemd のユニットファイル - Airflow WebServer
- path: /etc/systemd/system/airflow-webserver.service
content: |
[Unit]
Description=Airflow webserver daemon
After=network.target
[Service]
Environment="HOME=/home/airflow"
ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
ExecStart=/usr/bin/docker run --name=airflow-server --rm --net=db -p 80:8080 --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 airflow webserver
ExecStop=/usr/bin/docker stop airflow-server
Restart=on-failure
# systemd のユニットファイル - Airflow Scheduler
- path: /etc/systemd/system/airflow-scheduler.service
content: |
[Unit]
Description=Airflow scheduler daemon
After=network.target
[Service]
Environment="HOME=/home/airflow"
ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
ExecStart=/usr/bin/docker run --name=airflow-scheduler --rm --net=db --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 airflow scheduler
ExecStop=/usr/bin/docker stop airflow-scheduler
Restart=on-failure
# systemd のユニットファイル - Airflow のログ削除シェル
- path: /etc/systemd/system/airflow-clean-logs.service
content: |
[Unit]
Description=Airflow clean-logs daemon
After=network.target
[Service]
Environment="HOME=/home/airflow"
ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
ExecStart=/usr/bin/docker run --name=airflow-clean-logs --rm --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 bash /custom-clean-logs
ExecStop=/usr/bin/docker stop airflow-clean-logs
Restart=on-failure
runcmd:
- mkdir -p /var/airflow/scripts # BashOperatorから叩きたいシェルの置き場所
- mkdir -p /var/airflow/dags # DAGのディレクトリ
- mkdir -p /var/airflow/logs # ログのディレクトリ
- usermod -u 50000 airflow # コンテナからホスト側ボリュームを見た時にパーミッションエラーになるので、AirflowのDockerイメージのユーザーIDにあわせています
- groupmod -g 50000 airflow # ↑と同じ理由でグループID
- chown -R airflow:airflow /var/airflow/
- docker images -aq | xargs docker rmi # インスタンスを再起動した時に最新のイメージ(キャッシュされたものではなく)を取得したいのでインスタンス起動時に削除しています
# => こうしておくとDockerイメージのタグをlatestなどで固定していてもインスタンスの再起動で更新を反映できます
- docker network create -d bridge db # AirflowのコンテナからCloud SQL Proxyのコンテナに接続するためのブリッジ
- systemctl daemon-reload # 以下、systemd関連
- systemctl start cloud_sql_proxy.service
- systemctl start airflow-webserver.service
- systemctl start airflow-scheduler.service
- systemctl start airflow-clean-logs.service
データベースの初期化
-
Container-Optimized OSのVMインスタンスにログイン後、Airflowイメージをbashシェルで起動します
docker run -it --rm --net=db --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 bash
-
Airflowのコンテナからデータベースの初期化と管理者ユーザーの作成を行います
airflow db init airflow users create -u abechan -p password -r Admin -e mail@address -f firstname-is-secret -l lastname-is-abechan
最後に
Airflowは最初はとっつきにくくクセも強いなぁと思ったものの、その辺は慣れの問題で今では欠かすことの出来ないツールとして重宝しています。ちょっとしたバッチがいっぱいあって管理が大変、といったお悩みを抱えている方は一度導入を検討してみてはいかがでしょうか?
PythonでLinuxに直にインストールすることも出来ますが、Dockerイメージでも環境変数で色々制御出来て構築が楽なのでおすすめです。
-
クセは強いですが ↩
-
これはデータ連携用のAirflowなのですが、これとは別に分析バッチ用のAirflowがもう一台あります ↩
-
こういった分析処理ではAirflowから一時的にスペック高めのVMインスタンスを立ち上げて処理が終わったらインスタンスを削除、といったことをしています ↩
-
Airflowから実行される側のバッチは主にCloud Functions/Cloud Run/常時稼働のVMインスタンス+SSHリモート実行 などで処理しています ↩
-
実は弊社でもCloud Composerを使っているところもあります(詳細は内緒) ↩
-
運用していてディスク使用量がじわじわ増えていくといった現象に遭遇しました ↩
-
記事用に一部情報のマスクや古い部分の最新化をしています ↩