はじめに
airflow を使うにあたり、airflow とは独立したコンテナ上でタスク実行する方法を調査・検証しました。
つまり、airflow 上にインストールするライブラリを最小限にして、すでにあるコンテナ環境(image)を再利用するということです。
TL;DR
結果的に、ざっくりと以下のような構成にすることで、airflow と独立したコンテナ上でタスクを実行することができました。
コード
- コードは、こちら
動作環境
対象 | バージョン |
---|---|
ホストOS | Ubuntu 22.04 |
実行用コンテナ | custom based on nvidia/cuda:11.8.0-devel-ubuntu22.04
|
docker | Docker version 23.0.1 |
compose ファイル
Running Airflow in Docker に記載がある通り、以下のようにして、compose ファイルをダウンロードしておきます。
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
mv docker-compose.yaml compose.yml
変更
compose.yml
既定のDAGのExamplesをロードしないように、AIRFLOW__CORE__LOAD_EXAMPLES の設定を変更し、_PIP_ADDITIONAL_REQUIREMENTS に、docker
を追加します。UDSへプロキシするために、docker-proxy
サービスを追加します。自分のローカル環境の都合上、airflow-webserver の port もホスト側で 8080 を使っているので、8088 に変更しておきます。最後に、趣味レベルの変更ですが、postgresql の volume を、カレントディレクトリ配下をマウントするように変更しておきます。
(lab) take@aurora:~/pj/lab/airflow$ diff docker-compose.yaml compose.yml
50a51,53
> # build:
> # context: ./
> # dockerfile: Dockerfile
61c64
< AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
---
> AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
63c66,67
< _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
---
> _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-docker airflow-code-editor black}
> EXPERIMENT_DIR: ${EXPERIMENT_DIR}
67a72
> - ${AIRFLOW_PROJ_DIR:-.}/airflow.cfg:/opt/airflow/airflow.cfg
76a82,89
> docker-proxy:
> image: bobrik/socat
> command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock"
> ports:
> - "2376:2375"
> volumes:
> - /var/run/docker.sock:/var/run/docker.sock
> restart: always
84c97
< - postgres-db-volume:/var/lib/postgresql/data
---
> - ./data/postgresql:/var/lib/postgresql/data
106c119
< - 8080:8080
---
> - 8088:8080
108c121
< test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
---
> test: ["CMD", "curl", "--fail", "http://localhost:8088/health"]
279,280d291
< volumes:
< postgres-db-volume:
(lab) take@aurora:~/pj/lab/airflow$
airflow.cfg
以下のように、タイムゾーンと既定のexamplesをロードしないように変更し(結果、load_examples の設定はなくてもよいとわかったが・・)、airflow code editor の設定を追加しておきます。
$ diff airflow.cfg.org airflow.cfg
18c18,19
< default_timezone = utc
---
> # default_timezone = utc
> default_timezone = Asia/Tokyo
60c61
< load_examples = True
---
> load_examples = False
544c545,546
< default_ui_timezone = UTC
---
> # default_ui_timezone = UTC
> default_ui_timezone = Asia/Tokyo
1238c1240,1255
< default_timeout = 604800
\ ファイル末尾に改行がありません
---
> default_timeout = 604800
>
>
> [code_editor]
> enabled = True
> git_enabled = True
> git_cmd = /usr/bin/git
> git_default_args = -c color.ui=true
> git_init_repo = False
> root_directory = /opt/airflow/dags
> line_length = 88
> string_normalization = False
> mount = name=data,path=/opt/airflow/data
> mount1 = name=logs,path=/opt/airflow/logs
> mount2 = name=data,path=s3://example
>
DAG ファイル
まずは、簡易な動作確認をするための gpu 用、別コンテナ上でPython実行用に分けて解説します。
DAGコードの詳細は、こちら(github)参照
gpu 用
まずは、簡易な動作確認をするため、nvidia-smi を実行するDAGを追加して、動作確認をします。(動作確認は、http://localhost:8088 などにアクセスしてブラウザ上で実行&確認します。)
import docker
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.docker.operators.docker import DockerOperator
with DAG(
"job_gpu_docker",
schedule_interval="@daily",
start_date=datetime(2023, 3, 1),
catchup=False,
tags=["example"],
) as dag:
docker_task = DockerOperator(
docker_url="tcp://docker-proxy:2375",
command="/usr/bin/nvidia-smi",
image="nvidia/cuda:11.8.0-devel-ubuntu22.04",
auto_remove=True,
mount_tmp_dir=False,
task_id="task_nvidia-smi",
device_requests=[docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])],
)
# Empty operations
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Create a simple workflow
start >> docker_task >> end
UI
以下のような画面で追加・作成したDAGの一覧を確認できます。
右側に画面をスクロールし、実行できそうなボタンを押して「Trigger DAG」を選択することで、即時実行できます。
別コンテナ上でPython実行用
では、実際に別コンテナ(例:experiment.app
)上で、Python スクリプトを実行するDAG を作成します。
なお、別コンテナは、こちら(github の別リポジトリ)のコードで作成したものを使いました。
import os
import docker
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
with DAG(
"job_docker",
schedule_interval="@daily",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example"],
) as dag:
docker_task = DockerOperator(
docker_url="tcp://docker-proxy:2375",
image="experiment.app",
mounts=[
Mount(
# source="/home/user_name/pj/experiment",
source=os.environ["EXPERIMENT_DIR"],
target="/home/dsuser/workspace",
type="bind",
),
],
working_dir="/home/dsuser/workspace/backend",
command="bash -c 'PYTHONPATH=. python app/executable/train_topicmodel.py --n-limit=1024 --pipe-file=data/pipe_topic-test.gz'",
# container_name=None,
auto_remove=True,
mount_tmp_dir=False,
task_id="task_train",
device_requests=[docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])],
)
# Empty operations
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Create a simple workflow
start >> docker_task >> end
おまけ
airflow CLI
CLI 上から実行したい時(例:DAG job_demo
を実行)は、以下のようにします。
docker compose exec airflow-worker airflow dags trigger job_demo
ここで、pause されていると、queued
のまま実行されないので、以下のように unpause します。
docker compose exec airflow-worker airflow dags unpause job_demo
UI 上で画面を更新すると、実行されるのがわかるはずです!
まとめ、というか感想
- DockerOperator でハマりました
- 結論は、Device や、Mount の指定が必要だったという点
- CLI から実行するには、unpause を忘れずに
- pause と queue の関係がわからず、CLI からは直接実行できないかと思った・・
- load_examples = False でもハマりました
- airflow.cfg を変更しても全く変わらないという現象
- compose.yml で環境変数を設定しているため、そちらが優先されていた・・
- なんとか、これでリモートコンテナ上で開発しつつ、airflow でタスク実行ができるようになりました!
参考URL
- airflow
- docker-compose
- demo code
- 既定の dags examples をロードしない (compose.yml の 環境変数を変更する)
- github
- docker image
- アーキテクチャ
- Mounts for DockerOperator
- DockerOperator
- Trouble shooting for docker operator
- airflow code editor
- docker compose