13
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflow環境構築パターン&構築手順メモ ~その1~

Last updated at Posted at 2023-03-31

はじめに

前回の続きです。

Airflowを実際に触ってみようと思いますが、環境の構築パターンが色々あるみたいです。
なのでまずは環境構築パターンとその手順を整理することにしました。

今回の構築パターンは下記のとおりです。

  • パターン1:直接インストール方式
  • パターン2:直接インストール + 別インスタンス実行
  • パターン3:DockerComposeで起動
  • パターン4:DockerCompose起動 + 別インスタンス実行

Airflowの利用方法とバージョン

Airflowの公式ドキュメントによるとAirflowの利用方法は下記の6通り。

  1. ソースをビルドしてインスタンスにインストール
  2. PyPIからインスタンスにインストール
  3. Dockerイメージを利用する
  4. Helmを利用してKubernetesクラスタ上にインストール
  5. マネージドサービスを利用する(GCP,AWS,Azure)
  6. 3rdPartyのDockerイメージやマニフェスト、Helmを利用する

今回は利用方法の2~3を対象とした環境構築手順を整理します。
また、AirflowとHelm、その他ソフトウェアのバージョン対応は下表の通りです。

Airflow リリース日付 Airflow Helm Chart その他
May 15, 2023 2.5.2 - Requirements
Jan 21, 2023 2.5.1 1.8.0 Requirements 
Dec 3, 2022 2.5.0 - Requirements 
Oct 1, 2022 2.4.1 1.7.0 Requirements 
Sep 19, 2022 2.4.0 - Requirements 

2.4.32.4.2はHelm側に対応するバージョンが存在しません。
Helm版はマイナーバージョンごとにリリースされる傾向があるらしく、2.5.0をHelmで利用したい場合独自にHelmファイルを作成するか3rd PartyのHelmファイルを利用する必要があります。

「その他」はPythonやPostgreSQL、Kubernetesのバージョンで、リンク先に対応バージョンがまとめられています。
インストールするAirflowのバージョンが推奨するバージョンを利用します。

Airflow環境構築例

パターン1:直接インストール方式

概要

インスタンスに直接Airflowをインストールして利用するパターンです。
またアプリケーションジョブもAirflowと同じ環境で実行します。

Airflow構成パターン1.png

webServerschedulerを動かすインスタンスでアプリケーションを実行するため、LocalExecutorを利用します。
LocalExecutorを利用する場合、workerは不要です(workerを起動コマンドを実行するととエラーとなります)。

構築手順

Pythonはこちらの方法でインストールします。
Installation and upgrade of Airflow coreの手順を参考にpipを利用してインストールします。

# pythonのバージョン確認
$ python -V
Python 3.10.6

# AIRFLOW_HOMEの設定
$ mkdir -p ~/airflow
$ echo "export AIRFLOW_HOME=~/airflow" >> ~/.profile
$ source ~/.profile
$ echo $AIRFLOW_HOME
/home/xxxx/airflo

# Airflowインストール
$ AIRFLOW_VERSION=2.5.2
$ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ python -m pip install --upgrade pip
$ python -m pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# airflowのバージョン&HOMEディレクトリの確認
$ airflow version
2.5.2
$ ls ~/airflow/
airflow.cfg  logs  webserver_config.py

また、Airflowインストールと同時に[extra-package]をインストールする場合は"apache-airflow[postgres,google]==${AIRFLOW_VERSION}"のようにextra-packageを指定します。

インストール方法の詳細はUpgrading Airflow with providersを参照してください。
また利用可能なextra-packageはReference for package extrasに一覧化されています。

初期化手順

Airflowの初期化手順です。
デフォルトではSQLiteを利用しますが、公式ドキュメントでは検証目的にのみ利用すべきであり、本番環境ではMySQLまたはPostgreSQLの利用を推奨しています。
今回の構成ではDBにPostgreSQLを利用するのでPostgreSQLにAirflow用のDBやユーザを作成します。

CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'airflow';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
ALTER DATABASE airflow OWNER TO airflow;

Airflowインストール時にairflow.cfgが環境変数AIRFLOW_HOMEに作成されるので設定を変更します。
airflow.cfgのDB接続先設定を書き換えます。
またExecutorをLocalExecutorに、サンプルのロードをFalseに変更します。

airflow.cfg
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow

[core]
executor = LocalExecutor
load_examples = False

AirflowのDBを初期化します。
psycopg2のインストールにはpg_configが必要となるので事前にインストールし、初期化コマンドを実行します。

$ sudo apt install libpq-dev
# pythonのPostgreSQL接続ライブラリのインストール(PostgreSQLを利用する場合は必須)
$ python -m pip install psycopg2
$ airflow config get-value database sql_alchemy_conn
postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
# DB初期化コマンド
$ airflow db init

以上で設定は完了です。

動作確認

Airflowを起動する前にDAGファイルと実行するShellScriptを作成します。

# DAGファイル格納ディレクトリの作成(デフォルトのDAG参照先)
$ mkdir -p ~/airflow/dags
# 実行するShellScript
$ echo "echo Hello Airflow" > ~/workspace/sample.sh
$ chmod +x sample.sh

次に、Tutorialを参考にサンプルのDAGファイルを作成し、~/airflow/dagsに格納します。

sample.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
        "sample",
        default_args={
            "depends_on_past": False,
            "email": ["airflow@example.com"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=5)
        },
        description="A simple DAG",
        schedule=timedelta(days=1),
        start_date=datetime(2023, 1, 1),
        catchup=False,
        tags=["sample"],
) as dag:

    t1 = BashOperator(
        task_id="hello",
        bash_command="echo hello",
    )
    # 注意事項:bash_commandの最後にスペースが必要
    t2 = BashOperator(
        task_id="hello_airflow",
        bash_command="~/workspace/sample.sh ",
    )

    t3 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )

    t1 >> t2 >> t3

準備ができたのでAirflowを起動します。
起動方法はstandalonewebserverschedulerをまとめて起動する方法がありますが、今回はそれぞれ分けて起動する方法で試します。

下記コマンドでwebserverを起動させた後、user createでログインするユーザを作成します。

$ airflow webserver --port 8080
$ airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@sample.com

次に新しいコンソールを開いてschedulerを起動します。

$ airflow scheduler

webserverschedulerが起動したらブラウザからWebUIにアクセスし、作成したユーザでログインします。

image.png

sampleを選択しTrigger DAGで実行するとDAGが正常に実行されました。

image.png

パターン2:直接インストール + 別インスタンス実行

概要

パターン1の構成に別のインスタンスを追加したパターンとなります。

Airflow構成パターン2.png

ジョブ実行は追加したインスタンス(Server02)に任せ、Server01にはwebserverschedulerのみを動かします。
Airflowでは複数のノード(Worker)で分散してジョブを実行するためにCeleryExecutorを利用します。
CeleryはPythonにおける非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。

公式ドキュメントによるとQueue BokerにはRabbitMQ、Redis、DBなどが利用できるようです。
今回の環境構築例ではRedisを利用する方針とします。

構築手順

パターン1で構築した環境にジョブ実行用のインスタンス(Server02)を追加します。
追加したインスタンスにはパターン1の構築手順の内容に従ってAirflowをインストールします。
Server02へのAirflowインストール完了後、Server01とServer02の両方に下記の手順でceleryのインストールします。
また、redisのライブラリも一緒にインストールします。

# celeryインストール
$ python -m pip install "apache-airflow[celery]"
$ python -m pip install redis

Server01とServer02のairflow.cfgに下記の設定を追加します。

airflow.cfg
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://<IP>:6379/0
# result_backendを設定しない場合は「sql_alchemy_conn」と同じ接続先になります。
result_backend = db+postgresql+psycopg2://airflow:airflow@<IP>:5432/airflow

動作確認

同じDAGファイルをServer01とServer02の両方に配置します。
また、実行するShellScriptをServer02に配置します。
その後、Server01インスタンスでwebserverschedulerを起動します。

server01
# Webサーバを起動
$ airflow webserver --port 8080
# スケジューラを起動
$ airflow scheduler

Server02インスタンスではworkerを起動します。

server02
# ワーカーを起動
$ airflow celery worker

あとはパターン1同じ手順でsampleを実行します。
WebUIからJobsを確認すると、CeleryExecutorかつ別ホストで実行されていることがわかります。

image.png

pythonのバージョンについて

schedulerworkerで実行するpythonのバージョンが異なると、schedulerが動かなくなる可能性があります。
異なるインスタンスでworkerを動かす場合は、必ずschedulerwebserver側インスタンスと同じバージョンのpythonを利用する必要があります。

パターン3:DockerComposeで起動

概要

パターン1の構成をDockerComposeで起動する方式です。

Airflow構成パターン3.png

wevserverschedulerworkerに加えてDB初期化用のinitのコンテナを立ち上げます。
webserverschedulerworkerの起動条件としてinitのコンテナ実行完了を前提とします。
これで、DBの初期化完了前に他のコンテナが起動するのを防ぎます。

また、各コンテナには図に示すとおりにホストのフォルダをマウントします。
マウントしたフォルダにDAGファイルや実行するスクリプトを配置します。
(フォルダAにはDAGファイルとairflow.cfgを、フォルダBには実行するShellScript等を格納します。)

構築手順

公式のdocker-compose.yamlを参考に独自のdocker-compose.yamlを作成します。
公式の実行手順はRunning Airflow in Dockerに記載されています。

今回の構成ではDBやRedisは既存の環境を利用するため不要です。
また、flowerairflow-cliも使いません。

下記はdocker-compose.yamlのディレクトリ構成です。
mntにマウントするファイルを格納しています。

airflow-docker
 ├ docker-compose.yaml
 └ mnt
    ├ dags
    |  └ sample.py
    ├ logs
    ├ plugins
    └ script
       └ sample.sh

また、docker-compose.yamlの内容は下記のとおりです(一部省略)。

docker-compose.yaml
※省略となっている部分は公式のdocker-compose.yamlの設定と同じとなります。

version: '3.8'
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.5.2-python3.10
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@192.168.10.3:5432/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql+psycopg2://airflow:airflow@192.168.10.3:5432/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://192.168.10.3:6379/0
   
    # 省略

  volumes:
    - ./mnt/dags:/opt/airflow/dags
    - ./mnt/logs:/opt/airflow/logs
    - ./mnt/plugins:/opt/airflow/plugins 

    # 省略

services:
  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    volumes:
      - ./mnt/dags:/opt/airflow/dags
      - ./mnt/logs:/opt/airflow/logs
      - ./mnt/plugins:/opt/airflow/plugins
      - ./mnt/script:/opt/airflow/script
    depends_on:
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    # 省略

また、パターン1のDAGファイルもbash_commandworkerコンテナ内部のパスに変更します。
volumes:でマウントした- ./mnt/script:/opt/airflow/scriptと設定を合わせます。

sample.py
# 省略
    t2 = BashOperator(
        task_id="hello_airflow",
        bash_command="/opt/airflow/script/sample.sh ",
    )
# 省略

準備ができたらdocker composeでコンテナを起動します。

$ docker compose up -d
[+] Running 5/5
 - Network docker-compose_default                Created   0.9s
 - Container docker-compose-airflow-init-1       Exited    33.2s
 - Container docker-compose-airflow-webserver-1  Started   35.9s
 - Container docker-compose-airflow-scheduler-1  Started   35.6s
 - Container docker-compose-airflow-worker-1     Started   35.8s   
$ docker compose ps -a
NAME                                 COMMAND                  SERVICE             STATUS              PORTS
docker-compose-airflow-init-1        "/bin/bash -c 'funct…"   airflow-init        exited (0)
docker-compose-airflow-scheduler-1   "/usr/bin/dumb-init …"   airflow-scheduler   running (healthy)   8080/tcp
docker-compose-airflow-webserver-1   "/usr/bin/dumb-init …"   airflow-webserver   running (healthy)   0.0.0.0:8080->8080/tcp
docker-compose-airflow-worker-1      "/usr/bin/dumb-init …"   airflow-worker      running (healthy)   8080/tcp

動作確認

WebUIにログインしてDAGを実行します。
コンテナから起動したAirflowのユーザIDとパスワードは同じで「airflow」となります。

動作確認は他のパターンと同じなので割愛します。

パターン4:DockerCompose起動 + 別インスタンス実行

概要

パターン2の構成をコンテナ化したパターンです。

Airflow構成パターン4.png

パターン3で構築した環境にジョブ実行用のサーバを追加するだけです。
基本的にはServer02に下記設定を行うだけでOKです。

  • パターン3のdocker-compose.yamlからworkerの設定を削除
  • パターン2の手順でAirflowをインストールする。
  • $AIRFLOW_HOME/dagsにServer01のDAGファイルを配置する。
  • airflow.cfgの接続先設定をServer01と同じDBとRedisに設定する。

その後Server02でworkerを起動するだけです。
workerの起動手順と動作確認手順は他のパターンと同じなため割愛します。

さいごに

これ以上は長くなるので今回はここまでにします。
「~その2~」ではKubernetes環境での環境構築についてまとめたいと思います。

13
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?