9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflowの公式Dockerイメージを使ったサーバー構築 on GCP

Last updated at Posted at 2021-12-20

本記事ではGCPのVMインスタンスでAirflowサーバーを構築・運用している事例について紹介します。

Airflowを使うようになった背景

弊社(株式会社unerry)では、他社様とデータの受け渡しをしています。
こういった連携が始まった当初は個別の案件ごとにmicroインスタンスのサーバーを建てたり、Cloud Functions+Cloud Schedulerを使ったりしていました。
しかし案件が増えてきた時に問題になったのは、障害などでデータが遅れて提供できなかった時、バッチの再実行を個別の実装に応じてあっちこっちマニュアルで行うのに手間がかかるということでした。
Airflowはexecution_dateという概念があり、過去のジョブの再実行が容易です。1
また、多くのジョブをDAGの一覧で管理できることが嬉しく、それらが導入のきっかけでした。

こんな感じです。2
airflow-dag-list.png

GCPにはマネージドのCloud Composerがあるよ

あるんですけど、これ高いです。東京リージョンだと小さめの構成でも$500/月くらい。
弊社ではデータ連携以外でも定期的に重い分析処理3をすることがありますが、R言語やシェルを駆使したりと必ずしもPythonで処理する訳ではありません。ジョブの実行管理が出来れば良いので、GCEでAirflowを建てて使っています。4
基本的にPythonで処理かつAirflowでゴリゴリに並列処理してCPU使うよ。といったケースであればCloud Composerも選択肢に挙がってくるかと思います。5

GCPで使うリソース

  • VMインスタンス
    • Container-Optimized OS
  • ロードバランサ
    • HTTPS用です
  • Container Registry
    • 後述するAirflowのカスタムイメージを置きます
  • Cloud SQL
    • MySQLを例にします
  • Monitoringのアラート
    • マネージドではないので監視は必須です。弊社では以下を監視しています。
      • Uptime Check (HTTPSの生死確認)
      • ディスク使用量
      • メモリ使用量
  • IAM
    • Airflow用のサービスアカウントを作りCloud Functions起動元の権限などを個別に付与出来るようにします

以降、本記事で解説する範囲

AirflowのDockerイメージのカスタマイズ、Container-Optimized OSのcloud-init、データベースの初期化に限定します。その他について特別なことは無いので割愛します。

AirflowのDockerイメージのカスタマイズ

公式イメージのままでも動かすことは出来るのですが、実用上、以下を追加しています。

  • Cloud SDK
    • BashOperatorからgcloudコマンドを使うためです
  • 自前のログ削除シェル
    • 公式のDockerイメージにclean-logs.shが含まれているのですが、これは空になったディレクトリを削除してくれません 6

Dockerfile

FROM apache/airflow:2.2.2

USER root

RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
    curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && \
    apt-get update && \
    apt-get install -y google-cloud-sdk

COPY --chown=airflow:root custom-clean-logs.sh /custom-clean-logs
RUN chmod a+x /custom-clean-logs

USER airflow

custom-clean-logs.sh

#!/usr/bin/env bash

set -euo pipefail

readonly DIRECTORY="${AIRFLOW_HOME:-/usr/local/airflow}"
readonly RETENTION="${AIRFLOW__LOG_RETENTION_DAYS:-15}"

trap "exit" INT TERM

readonly EVERY=$((15*60))

echo "Cleaning logs every $EVERY seconds"

while true; do
  echo "Trimming airflow logs to ${RETENTION} days."
  find "${DIRECTORY}"/logs \
    -type d -name 'lost+found' -prune -o \
    -type f -mtime +"${RETENTION}" -name '*.log' -print0 | \
    xargs -0 rm -f

  # 公式のclean-logs.shにこれを足しています
  echo "Delete empty directories."
  find "${DIRECTORY}"/logs -type d -empty -delete

  seconds=$(( $(date -u +%s) % EVERY))
  (( seconds < 1 )) || sleep $((EVERY - seconds))
done

Container-Optimized OSのcloud-init

cloud-init.ymlの内容はVMインスタンス作成時のカスタムメタデータにキーuser-dataの値として貼り付けます。以下、弊社のデータ連携用Airflowの設定内容です。7

#cloud-config

# AirflowのDockerイメージがairflowユーザーで動作するためホスト側もairflowユーザーを作成します
users:
- name: airflow                               

write_files:
# Airflow起動時の環境変数ファイル
- path: /var/airflow/env
  content: |
    AIRFLOW_HOME=/var/airflow
    AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://dbuser:dbpassword@cloud_sql_proxy:3306/dbname
    AIRFLOW__CORE__EXECUTOR=LocalExecutor
    AIRFLOW__CORE__PARALLELISM=4
    AIRFLOW__WEBSERVER__BASE_URL=
    AIRFLOW__WEBSERVER__WORKERS=2
    AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX=True
  owner: airflow:airflow

  # AIRFLOW_HOME:                         DAGやログなどの配置場所になります。お好みで。
  # AIRFLOW__CORE__SQL_ALCHEMY_CONN:      データベースの接続設定です。
  # AIRFLOW__CORE__EXECUTOR:              一台構成かつ並列実行したいのでLocalExecutorを指定します。
  # AIRFLOW__CORE__PARALLELISM:           並列数はマシンスペックと相談で。デフォルトの32で低スペックマシンだとメモリ不足で死にます。
  # AIRFLOW__WEBSERVER__BASE_URL:         デフォルトはhttp://localhost:8080なのですが、空にしておかないとタスクのログが見れません。
  # AIRFLOW__WEBSERVER__WORKERS:          デフォルトは4ですが、そんなに必要ないと思うのでケチってます。
  # AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX: ロードバランサ用の設定です。

# systemd のユニットファイル - Cloud SQL Proxy
- path: /etc/systemd/system/cloud_sql_proxy.service
  content: |
    [Unit]
    Description=Cloud SQL Proxy daemon
    After=network.target

    [Service]
    ExecStart=/usr/bin/docker run --name=cloud_sql_proxy --rm --net=db gcr.io/cloudsql-docker/gce-proxy:latest /cloud_sql_proxy -instances=abechan-no-project-id:asia-northeast1:airflow=tcp:0.0.0.0:3306
    ExecStop=/usr/bin/docker stop cloud_sql_proxy
    Restart=on-failure

# systemd のユニットファイル - Airflow WebServer
- path: /etc/systemd/system/airflow-webserver.service
  content: |
    [Unit]
    Description=Airflow webserver daemon
    After=network.target

    [Service]
    Environment="HOME=/home/airflow"
    ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
    ExecStart=/usr/bin/docker run --name=airflow-server --rm --net=db -p 80:8080 --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 airflow webserver
    ExecStop=/usr/bin/docker stop airflow-server
    Restart=on-failure

# systemd のユニットファイル - Airflow Scheduler
- path: /etc/systemd/system/airflow-scheduler.service
  content: |
    [Unit]
    Description=Airflow scheduler daemon
    After=network.target

    [Service]
    Environment="HOME=/home/airflow"
    ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
    ExecStart=/usr/bin/docker run --name=airflow-scheduler --rm --net=db --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 airflow scheduler
    ExecStop=/usr/bin/docker stop airflow-scheduler
    Restart=on-failure

# systemd のユニットファイル - Airflow のログ削除シェル
- path: /etc/systemd/system/airflow-clean-logs.service
  content: |
    [Unit]
    Description=Airflow clean-logs daemon
    After=network.target

    [Service]
    Environment="HOME=/home/airflow"
    ExecStartPre=/usr/bin/docker-credential-gcr configure-docker
    ExecStart=/usr/bin/docker run --name=airflow-clean-logs --rm --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 bash /custom-clean-logs
    ExecStop=/usr/bin/docker stop airflow-clean-logs
    Restart=on-failure

runcmd:
- mkdir -p /var/airflow/scripts              # BashOperatorから叩きたいシェルの置き場所
- mkdir -p /var/airflow/dags                 # DAGのディレクトリ
- mkdir -p /var/airflow/logs                 # ログのディレクトリ

- usermod -u 50000 airflow                   # コンテナからホスト側ボリュームを見た時にパーミッションエラーになるので、AirflowのDockerイメージのユーザーIDにあわせています
- groupmod -g 50000 airflow                  # ↑と同じ理由でグループID
- chown -R airflow:airflow /var/airflow/

- docker images -aq | xargs docker rmi       # インスタンスを再起動した時に最新のイメージ(キャッシュされたものではなく)を取得したいのでインスタンス起動時に削除しています
                                             #   => こうしておくとDockerイメージのタグをlatestなどで固定していてもインスタンスの再起動で更新を反映できます

- docker network create -d bridge db         # AirflowのコンテナからCloud SQL Proxyのコンテナに接続するためのブリッジ

- systemctl daemon-reload                    # 以下、systemd関連
- systemctl start cloud_sql_proxy.service
- systemctl start airflow-webserver.service
- systemctl start airflow-scheduler.service
- systemctl start airflow-clean-logs.service

データベースの初期化

  • Container-Optimized OSのVMインスタンスにログイン後、Airflowイメージをbashシェルで起動します

    docker run -it --rm --net=db --env-file=/var/airflow/env -v /var/airflow:/var/airflow gcr.io/abechan-no-project-id/airflow-custom:2.2.2 bash
    
  • Airflowのコンテナからデータベースの初期化と管理者ユーザーの作成を行います

    airflow db init
    airflow users create -u abechan -p password -r Admin -e mail@address -f firstname-is-secret -l lastname-is-abechan
    

最後に

Airflowは最初はとっつきにくくクセも強いなぁと思ったものの、その辺は慣れの問題で今では欠かすことの出来ないツールとして重宝しています。ちょっとしたバッチがいっぱいあって管理が大変、といったお悩みを抱えている方は一度導入を検討してみてはいかがでしょうか?
PythonでLinuxに直にインストールすることも出来ますが、Dockerイメージでも環境変数で色々制御出来て構築が楽なのでおすすめです。

  1. クセは強いですが

  2. これはデータ連携用のAirflowなのですが、これとは別に分析バッチ用のAirflowがもう一台あります

  3. こういった分析処理ではAirflowから一時的にスペック高めのVMインスタンスを立ち上げて処理が終わったらインスタンスを削除、といったことをしています

  4. Airflowから実行される側のバッチは主にCloud Functions/Cloud Run/常時稼働のVMインスタンス+SSHリモート実行 などで処理しています

  5. 実は弊社でもCloud Composerを使っているところもあります(詳細は内緒)

  6. 運用していてディスク使用量がじわじわ増えていくといった現象に遭遇しました

  7. 記事用に一部情報のマスクや古い部分の最新化をしています

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?