AirflowでDAGを定義するためには、通常はPythonを使って以下のように書きます。
Pythonの機能を使うことができ柔軟性が高いですが、一方簡単なDAGを書くだけであればもう少しシンプルに書くことができると嬉しいです。
# taskの定義
t1 = BashOperator(
task_id='task_1',
bash_command='echo 1',
)
t2 = BashOperator(
task_id='task_2',
bash_command='echo 2',
)
t3 = BashOperator(
task_id='task_3',
bash_command='echo 3',
)
t1 >> t2
t2 >> t3
dag-factoryというライブラリを利用すると、YAMLを使ってシンプルにDAGを記述することができます。
上でpythonを使って書いたDAGをdag-factoryを使って書くと以下のようになります。
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 2'
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 3'
dependencies: [task_1]
DAGのプロパティもYAMLで設定できます。
example_dag1:
default_args:
owner: 'example_owner'
start_date: 2018-01-01
end_date: 2018-01-05
retries: 1
retry_delay_sec: 300
schedule_interval: '0 3 * * *'
最後に以下のPythonファイルをdagsディレクトリに配置すれば、YAMLからDAGが生成されます。
from airflow import DAG
import dagfactory
dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())