3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

airflow で別コンテナ上でPythonスクリプトを実行する

Last updated at Posted at 2023-03-07

はじめに

airflow を使うにあたり、airflow とは独立したコンテナ上でタスク実行する方法を調査・検証しました。

つまり、airflow 上にインストールするライブラリを最小限にして、すでにあるコンテナ環境(image)を再利用するということです。

TL;DR

結果的に、ざっくりと以下のような構成にすることで、airflow と独立したコンテナ上でタスクを実行することができました。

image.png

コード

動作環境

対象 バージョン
ホストOS Ubuntu 22.04
実行用コンテナ custom based on nvidia/cuda:11.8.0-devel-ubuntu22.04
docker Docker version 23.0.1

compose ファイル

Running Airflow in Docker に記載がある通り、以下のようにして、compose ファイルをダウンロードしておきます。

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
mv docker-compose.yaml compose.yml

変更

compose.yml

既定のDAGのExamplesをロードしないように、AIRFLOW__CORE__LOAD_EXAMPLES の設定を変更し、_PIP_ADDITIONAL_REQUIREMENTS に、docker を追加します。UDSへプロキシするために、docker-proxy サービスを追加します。自分のローカル環境の都合上、airflow-webserver の port もホスト側で 8080 を使っているので、8088 に変更しておきます。最後に、趣味レベルの変更ですが、postgresql の volume を、カレントディレクトリ配下をマウントするように変更しておきます。

(lab) take@aurora:~/pj/lab/airflow$ diff docker-compose.yaml compose.yml 
50a51,53
>   # build:
>   #   context: ./
>   #   dockerfile: Dockerfile
61c64
<     AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
---
>     AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
63c66,67
<     _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
---
>     _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-docker airflow-code-editor black}
>     EXPERIMENT_DIR: ${EXPERIMENT_DIR}
67a72
>     - ${AIRFLOW_PROJ_DIR:-.}/airflow.cfg:/opt/airflow/airflow.cfg
76a82,89
>   docker-proxy:
>     image: bobrik/socat
>     command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock"
>     ports:
>       - "2376:2375"
>     volumes:
>       - /var/run/docker.sock:/var/run/docker.sock
>     restart: always
84c97
<       - postgres-db-volume:/var/lib/postgresql/data
---
>       - ./data/postgresql:/var/lib/postgresql/data
106c119
<       - 8080:8080
---
>       - 8088:8080
108c121
<       test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
---
>       test: ["CMD", "curl", "--fail", "http://localhost:8088/health"]
279,280d291
< volumes:
<   postgres-db-volume:
(lab) take@aurora:~/pj/lab/airflow$ 

airflow.cfg

以下のように、タイムゾーンと既定のexamplesをロードしないように変更し(結果、load_examples の設定はなくてもよいとわかったが・・)、airflow code editor の設定を追加しておきます。

$ diff airflow.cfg.org airflow.cfg
18c18,19
< default_timezone = utc
---
> # default_timezone = utc
> default_timezone = Asia/Tokyo
60c61
< load_examples = True
---
> load_examples = False
544c545,546
< default_ui_timezone = UTC
---
> # default_ui_timezone = UTC
> default_ui_timezone = Asia/Tokyo
1238c1240,1255
< default_timeout = 604800
\ ファイル末尾に改行がありません
---
> default_timeout = 604800
> 
> 
> [code_editor]
> enabled = True
> git_enabled = True
> git_cmd = /usr/bin/git
> git_default_args = -c color.ui=true
> git_init_repo = False
> root_directory = /opt/airflow/dags
> line_length = 88
> string_normalization = False
> mount = name=data,path=/opt/airflow/data
> mount1 = name=logs,path=/opt/airflow/logs
> mount2 = name=data,path=s3://example
> 

DAG ファイル

まずは、簡易な動作確認をするための gpu 用別コンテナ上でPython実行用に分けて解説します。

DAGコードの詳細は、こちら(github)参照


gpu 用

まずは、簡易な動作確認をするため、nvidia-smi を実行するDAGを追加して、動作確認をします。(動作確認は、http://localhost:8088 などにアクセスしてブラウザ上で実行&確認します。)

import docker

from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.docker.operators.docker import DockerOperator


with DAG(
    "job_gpu_docker",
    schedule_interval="@daily",
    start_date=datetime(2023, 3, 1),
    catchup=False,
    tags=["example"],
) as dag:

    docker_task = DockerOperator(
        docker_url="tcp://docker-proxy:2375",
        command="/usr/bin/nvidia-smi",
        image="nvidia/cuda:11.8.0-devel-ubuntu22.04",
        auto_remove=True,
        mount_tmp_dir=False,
        task_id="task_nvidia-smi",
        device_requests=[docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])],
    )

    # Empty operations
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # Create a simple workflow
    start >> docker_task >> end


UI

以下のような画面で追加・作成したDAGの一覧を確認できます。

image.png

右側に画面をスクロールし、実行できそうなボタンを押して「Trigger DAG」を選択することで、即時実行できます。

image.png


別コンテナ上でPython実行用

では、実際に別コンテナ(例:experiment.app)上で、Python スクリプトを実行するDAG を作成します。

なお、別コンテナは、こちら(github の別リポジトリ)のコードで作成したものを使いました。

import os
import docker

from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount



with DAG(
    "job_docker",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    docker_task = DockerOperator(
        docker_url="tcp://docker-proxy:2375",
        image="experiment.app",
        mounts=[
            Mount(
                # source="/home/user_name/pj/experiment",
                source=os.environ["EXPERIMENT_DIR"],
                target="/home/dsuser/workspace",
                type="bind",
            ),
        ],
        working_dir="/home/dsuser/workspace/backend",
        command="bash -c 'PYTHONPATH=. python app/executable/train_topicmodel.py --n-limit=1024 --pipe-file=data/pipe_topic-test.gz'",
        # container_name=None,
        auto_remove=True,
        mount_tmp_dir=False,
        task_id="task_train",
        device_requests=[docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])],
    )

    # Empty operations
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # Create a simple workflow
    start >> docker_task >> end

おまけ

airflow CLI

CLI 上から実行したい時(例:DAG job_demo を実行)は、以下のようにします。

docker compose exec airflow-worker airflow dags trigger job_demo

ここで、pause されていると、queued のまま実行されないので、以下のように unpause します。

docker compose exec airflow-worker airflow dags unpause job_demo

UI 上で画面を更新すると、実行されるのがわかるはずです!

まとめ、というか感想

  • DockerOperator でハマりました
    • 結論は、Device や、Mount の指定が必要だったという点
  • CLI から実行するには、unpause を忘れずに
    • pause と queue の関係がわからず、CLI からは直接実行できないかと思った・・
  • load_examples = False でもハマりました
    • airflow.cfg を変更しても全く変わらないという現象
    • compose.yml で環境変数を設定しているため、そちらが優先されていた・・
  • なんとか、これでリモートコンテナ上で開発しつつ、airflow でタスク実行ができるようになりました!

参考URL

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?