はじめに
前回の続きです。
Airflowを実際に触ってみようと思いますが、環境の構築パターンが色々あるみたいです。
なのでまずは環境構築パターンとその手順を整理することにしました。
今回の構築パターンは下記のとおりです。
- パターン1:直接インストール方式
- パターン2:直接インストール + 別インスタンス実行
- パターン3:DockerComposeで起動
- パターン4:DockerCompose起動 + 別インスタンス実行
Airflowの利用方法とバージョン
Airflowの公式ドキュメントによるとAirflowの利用方法は下記の6通り。
- ソースをビルドしてインスタンスにインストール
- PyPIからインスタンスにインストール
- Dockerイメージを利用する
- Helmを利用してKubernetesクラスタ上にインストール
- マネージドサービスを利用する(GCP,AWS,Azure)
- 3rdPartyのDockerイメージやマニフェスト、Helmを利用する
今回は利用方法の2~3を対象とした環境構築手順を整理します。
また、AirflowとHelm、その他ソフトウェアのバージョン対応は下表の通りです。
Airflow リリース日付 | Airflow | Helm Chart | その他 |
---|---|---|---|
May 15, 2023 | 2.5.2 | - | Requirements |
Jan 21, 2023 | 2.5.1 | 1.8.0 | Requirements |
Dec 3, 2022 | 2.5.0 | - | Requirements |
Oct 1, 2022 | 2.4.1 | 1.7.0 | Requirements |
Sep 19, 2022 | 2.4.0 | - | Requirements |
2.4.3
、2.4.2
はHelm側に対応するバージョンが存在しません。
Helm版はマイナーバージョンごとにリリースされる傾向があるらしく、2.5.0
をHelmで利用したい場合独自にHelmファイルを作成するか3rd PartyのHelmファイルを利用する必要があります。
「その他」はPythonやPostgreSQL、Kubernetesのバージョンで、リンク先に対応バージョンがまとめられています。
インストールするAirflowのバージョンが推奨するバージョンを利用します。
Airflow環境構築例
パターン1:直接インストール方式
概要
インスタンスに直接Airflowをインストールして利用するパターンです。
またアプリケーションジョブもAirflowと同じ環境で実行します。
webServer
やscheduler
を動かすインスタンスでアプリケーションを実行するため、LocalExecutor
を利用します。
LocalExecutor
を利用する場合、worker
は不要です(worker
を起動コマンドを実行するととエラーとなります)。
構築手順
Pythonはこちらの方法でインストールします。
Installation and upgrade of Airflow coreの手順を参考にpip
を利用してインストールします。
# pythonのバージョン確認
$ python -V
Python 3.10.6
# AIRFLOW_HOMEの設定
$ mkdir -p ~/airflow
$ echo "export AIRFLOW_HOME=~/airflow" >> ~/.profile
$ source ~/.profile
$ echo $AIRFLOW_HOME
/home/xxxx/airflo
# Airflowインストール
$ AIRFLOW_VERSION=2.5.2
$ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ python -m pip install --upgrade pip
$ python -m pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# airflowのバージョン&HOMEディレクトリの確認
$ airflow version
2.5.2
$ ls ~/airflow/
airflow.cfg logs webserver_config.py
また、Airflowインストールと同時に[extra-package]をインストールする場合は"apache-airflow[postgres,google]==${AIRFLOW_VERSION}"
のようにextra-packageを指定します。
インストール方法の詳細はUpgrading Airflow with providersを参照してください。
また利用可能なextra-packageはReference for package extrasに一覧化されています。
初期化手順
Airflowの初期化手順です。
デフォルトではSQLiteを利用しますが、公式ドキュメントでは検証目的にのみ利用すべきであり、本番環境ではMySQLまたはPostgreSQLの利用を推奨しています。
今回の構成ではDBにPostgreSQLを利用するのでPostgreSQLにAirflow用のDBやユーザを作成します。
CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'airflow';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER DATABASE airflow OWNER TO airflow;
Airflowインストール時にairflow.cfg
が環境変数AIRFLOW_HOME
に作成されるので設定を変更します。
airflow.cfg
のDB接続先設定を書き換えます。
またExecutorをLocalExecutor
に、サンプルのロードをFalse
に変更します。
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
[core]
executor = LocalExecutor
load_examples = False
AirflowのDBを初期化します。
psycopg2
のインストールにはpg_config
が必要となるので事前にインストールし、初期化コマンドを実行します。
$ sudo apt install libpq-dev
# pythonのPostgreSQL接続ライブラリのインストール(PostgreSQLを利用する場合は必須)
$ python -m pip install psycopg2
$ airflow config get-value database sql_alchemy_conn
postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
# DB初期化コマンド
$ airflow db init
以上で設定は完了です。
動作確認
Airflowを起動する前にDAGファイルと実行するShellScriptを作成します。
# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先)
$ mkdir -p ~/airflow/dags
# 実行するShellScript
$ echo "echo Hello Airflow" > ~/workspace/sample.sh
$ chmod +x sample.sh
次に、Tutorialを参考にサンプルのDAGファイルを作成し、~/airflow/dags
に格納します。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
"sample",
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5)
},
description="A simple DAG",
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["sample"],
) as dag:
t1 = BashOperator(
task_id="hello",
bash_command="echo hello",
)
# 注意事項:bash_commandの最後にスペースが必要
t2 = BashOperator(
task_id="hello_airflow",
bash_command="~/workspace/sample.sh ",
)
t3 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1 >> t2 >> t3
準備ができたのでAirflowを起動します。
起動方法はstandalone
でwebserver
とscheduler
をまとめて起動する方法がありますが、今回はそれぞれ分けて起動する方法で試します。
下記コマンドでwebserver
を起動させた後、user create
でログインするユーザを作成します。
$ airflow webserver --port 8080
$ airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@sample.com
次に新しいコンソールを開いてscheduler
を起動します。
$ airflow scheduler
webserver
とscheduler
が起動したらブラウザからWebUIにアクセスし、作成したユーザでログインします。
sample
を選択しTrigger DAG
で実行するとDAGが正常に実行されました。
パターン2:直接インストール + 別インスタンス実行
概要
パターン1の構成に別のインスタンスを追加したパターンとなります。
ジョブ実行は追加したインスタンス(Server02)に任せ、Server01にはwebserver
やscheduler
のみを動かします。
Airflowでは複数のノード(Worker)で分散してジョブを実行するためにCeleryExecutor
を利用します。
CeleryはPythonにおける非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。
公式ドキュメントによるとQueue BokerにはRabbitMQ、Redis、DBなどが利用できるようです。
今回の環境構築例ではRedisを利用する方針とします。
構築手順
パターン1で構築した環境にジョブ実行用のインスタンス(Server02)を追加します。
追加したインスタンスにはパターン1の構築手順の内容に従ってAirflowをインストールします。
Server02へのAirflowインストール完了後、Server01とServer02の両方に下記の手順でcelery
のインストールします。
また、redis
のライブラリも一緒にインストールします。
# celeryインストール
$ python -m pip install "apache-airflow[celery]"
$ python -m pip install redis
Server01とServer02のairflow.cfg
に下記の設定を追加します。
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://<IP>:6379/0
# result_backendを設定しない場合は「sql_alchemy_conn」と同じ接続先になります。
result_backend = db+postgresql+psycopg2://airflow:airflow@<IP>:5432/airflow
動作確認
同じDAGファイルをServer01とServer02の両方に配置します。
また、実行するShellScriptをServer02に配置します。
その後、Server01インスタンスでwebserver
とscheduler
を起動します。
# Webサーバを起動
$ airflow webserver --port 8080
# スケジューラを起動
$ airflow scheduler
Server02インスタンスではworker
を起動します。
# ワーカーを起動
$ airflow celery worker
あとはパターン1同じ手順でsample
を実行します。
WebUIからJobsを確認すると、CeleryExecutor
かつ別ホストで実行されていることがわかります。
pythonのバージョンについて
scheduler
とworker
で実行するpythonのバージョンが異なると、scheduler
が動かなくなる可能性があります。
異なるインスタンスでworker
を動かす場合は、必ずscheduler
やwebserver
側インスタンスと同じバージョンのpythonを利用する必要があります。
パターン3:DockerComposeで起動
概要
パターン1の構成をDockerComposeで起動する方式です。
wevserver
、scheduler
、worker
に加えてDB初期化用のinit
のコンテナを立ち上げます。
webserver
、scheduler
、worker
の起動条件としてinit
のコンテナ実行完了を前提とします。
これで、DBの初期化完了前に他のコンテナが起動するのを防ぎます。
また、各コンテナには図に示すとおりにホストのフォルダをマウントします。
マウントしたフォルダにDAGファイルや実行するスクリプトを配置します。
(フォルダAにはDAGファイルとairflow.cfg
を、フォルダBには実行するShellScript等を格納します。)
構築手順
公式のdocker-compose.yamlを参考に独自のdocker-compose.yaml
を作成します。
公式の実行手順はRunning Airflow in Dockerに記載されています。
今回の構成ではDBやRedisは既存の環境を利用するため不要です。
また、flower
やairflow-cli
も使いません。
下記はdocker-compose.yaml
のディレクトリ構成です。
mnt
にマウントするファイルを格納しています。
airflow-docker
├ docker-compose.yaml
└ mnt
├ dags
| └ sample.py
├ logs
├ plugins
└ script
└ sample.sh
また、docker-compose.yaml
の内容は下記のとおりです(一部省略)。
※省略となっている部分は公式のdocker-compose.yamlの設定と同じとなります。
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.5.2-python3.10
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@192.168.10.3:5432/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql+psycopg2://airflow:airflow@192.168.10.3:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://192.168.10.3:6379/0
# 省略
volumes:
- ./mnt/dags:/opt/airflow/dags
- ./mnt/logs:/opt/airflow/logs
- ./mnt/plugins:/opt/airflow/plugins
# 省略
services:
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
DUMB_INIT_SETSID: "0"
restart: always
volumes:
- ./mnt/dags:/opt/airflow/dags
- ./mnt/logs:/opt/airflow/logs
- ./mnt/plugins:/opt/airflow/plugins
- ./mnt/script:/opt/airflow/script
depends_on:
airflow-init:
condition: service_completed_successfully
airflow-init:
# 省略
また、パターン1のDAGファイルもbash_command
をworker
コンテナ内部のパスに変更します。
volumes:
でマウントした- ./mnt/script:/opt/airflow/script
と設定を合わせます。
# 省略
t2 = BashOperator(
task_id="hello_airflow",
bash_command="/opt/airflow/script/sample.sh ",
)
# 省略
準備ができたらdocker compose
でコンテナを起動します。
$ docker compose up -d
[+] Running 5/5
- Network docker-compose_default Created 0.9s
- Container docker-compose-airflow-init-1 Exited 33.2s
- Container docker-compose-airflow-webserver-1 Started 35.9s
- Container docker-compose-airflow-scheduler-1 Started 35.6s
- Container docker-compose-airflow-worker-1 Started 35.8s
$ docker compose ps -a
NAME COMMAND SERVICE STATUS PORTS
docker-compose-airflow-init-1 "/bin/bash -c 'funct…" airflow-init exited (0)
docker-compose-airflow-scheduler-1 "/usr/bin/dumb-init …" airflow-scheduler running (healthy) 8080/tcp
docker-compose-airflow-webserver-1 "/usr/bin/dumb-init …" airflow-webserver running (healthy) 0.0.0.0:8080->8080/tcp
docker-compose-airflow-worker-1 "/usr/bin/dumb-init …" airflow-worker running (healthy) 8080/tcp
動作確認
WebUIにログインしてDAGを実行します。
コンテナから起動したAirflowのユーザIDとパスワードは同じで「airflow
」となります。
動作確認は他のパターンと同じなので割愛します。
パターン4:DockerCompose起動 + 別インスタンス実行
概要
パターン2の構成をコンテナ化したパターンです。
パターン3で構築した環境にジョブ実行用のサーバを追加するだけです。
基本的にはServer02に下記設定を行うだけでOKです。
- パターン3の
docker-compose.yaml
からworker
の設定を削除 - パターン2の手順でAirflowをインストールする。
-
$AIRFLOW_HOME/dags
にServer01のDAGファイルを配置する。 -
airflow.cfg
の接続先設定をServer01と同じDBとRedisに設定する。
その後Server02でworker
を起動するだけです。
worker
の起動手順と動作確認手順は他のパターンと同じなため割愛します。
さいごに
これ以上は長くなるので今回はここまでにします。
「~その2~」ではKubernetes環境での環境構築についてまとめたいと思います。