3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Apache Airflowのチュートリアルをローカル環境で試す

Last updated at Posted at 2021-12-31

Apache Airflow 2.2.3

Airflowとは

Apache AirflowはAirbnbのMaxime Beauchemin氏によって開始されたPython実装によるJob管理ツールで、ワークフローをプログラムで作成/スケジューリング/監視するためのプラットフォーム。
AirflowではJobの実行順だったり、依存関係のワークフローはDAG(Directed Acyclic Graph)へ定義する。

アーキテクチャ

Airflowは下記のコンポーネントで構成される。

  • スケジュールされたワークフローのトリガーと、実行タスクをExecuterの送信するためのScheduler
  • 実行中のタスクを処理するExecuter。デフォルトではスケジューラ内ですべてを実行するが、本番環境に適したExecuterはタスクの実行をWorkersにプッシュする。
  • DAGおよびタスクの動作を検査、トリガー、およびデバッグするための便利なユーザーインターフェイスを提供するWebサーバー
  • Scheduler/ Executorに読み取られるDAGファイルのフォルダ
  • 状態保持のために Scheduler/ Executor/ Webサーバー に使用されるMeta DB

実行準備

Airflowのパスを設定。

$ export AIRFLOW_HOME=~/airflow

Airflowインストールコマンド。

$ pip install "apache-airflow[celery]==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"

ここでは推奨されているPostgreSQLでDBを準備

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

Airflowでは初期DBがSQLiteになっているらしいので、
airflow/airflow.cfgを編集して

sql_alchemy_conn = sqlite:////home/user/airflow/airflow.db
sql_alchemy_conn = postgresql://airflow_user:airflow_pass@localhost/airflow_dbへ変更

DBの初期化やユーザ作成を実行。

$ airflow standalone

コードの準備

airflow/dagsのフォルダを作成してその下にtest-tutorial.pyを作成して中身は
チュートリアルページのtutorial.pyをパクってDAGを作成。

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'test-tutorial',
    default_args=default_args,
    description='A simple test-tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]

コードを紐解く

パラメータ 説明
dag_id str DAG単位で与える一意な識別子。ここでは'test-tutorial'が該当。
default_args dict オペレータに渡す引数。引数の中には優先順位があり、優先順から
1. 明示した引数
2. default_args
3. オペレータのデフォルト値(存在する場合)
となる。

executorについて

作成したDAGがなぜか読み込まれなかったが、
airflow/airflow.cfg内のexecutorをSequentialExecutorからCeleryExecutor変更することで読み込まれた(SequentialExecutorではタスクを直列にしか実行できないから?)。

executerについて下記にまとめる

SequentialExecuter CeleryExecutor LocalExecutor
デフォルトのExecutorで、直列にしかタスクを実行できない。お試し版的存在 キューを利用したExecutorでスケジューラとworkerが分離される。 schedulerとworkerを同じノードで構成する場合に使用するExecutor
スケールは一切しない workerを並列実行でき、スケールアップおよびスケールアウト でスケール可能になるため本番環境向き。 マルチプロセスで動作し、スケールアップ によってスケール可能

実行

利用するコマンド

$ airflow webserver

実行すると下記のような画面がlocalhost:8080で開く

image.png

admin/adminで入れなければログインするためのユーザを作成してログイン

$ airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

DAGsからtest-tutorialを選択。
image.png

Trigger DAGで手動実行
image.png

実行ステータスが視覚的に確認できる
image.png

ログも出ていることを確認
image.png

用語

タスク

タスクは、オペレータオブジェクトをインスタンス化する際に生成される。演算子からインスタンス化されたオブジェクトをタスクと呼んでいる。ソースで言うと以下。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

第1引数のtask_idは、タスクの一意な識別子として機能する。

参考

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?