airflow

Job管理ツールAirflowを使ってみる

More than 1 year has passed since last update.

Job管理ツールを色々探してみている。

今回はAirflowを試してみる。

https://airflow.incubator.apache.org/index.html


手順

pipでインストール。今回はMySQLを使うので指定してインストール。

$ pip install airflow[mysql]


初期設定

airflowの設定ファイル群はデフォルトだと $HOME/airflow/ に置かれる。

変更したい場合は 環境変数 $AIRFLOW_HOMEを変更する。

DB接続設定は $AIRFLOW_HOME/airflow.cfg

以下を設定


airflow.cfg

sql_alchemy_conn = mysql://[user]:[password]@[host]:[port]/[db]


上記で設定した接続先のDBを事前に作っておきます。今回はairflowというDB名にしています。

CREATE DATABASE IF NOT EXISTS airflow CHARACTER SET utf8;

examplesをロードする必要がないのでFalseに設定。

これを指定する場合は様々なデータソースのサンプルが用意されているので、

合わせてmysql以外もインストールしないといけないことがあります。


airflow.cfg

load_examples = False


上記設定変更が終わったら、DBの初期化処理を走らせます。

$ airflow initdb

初期化が完了したら下記のようにwebserverを起動させます。

デフォルトは以下の通りに8080ポートになっているので必要に応じてcfgファイルで設定を変更します。

$ airflow webserver -D --stdout=/path/to/logs/webserver-stdout.log --stderr=/path/to/log/webserver-stderr.log

上記で設定しているオプションに関しては

-D: daemonize

--stdout: 標準出力先

--stderr: 標準エラー出力先

です。

詳しくは https://airflow.incubator.apache.org/cli.html

jobの定義ファイルは$AIRFLOW_HOME/dags/ 以下に置きます。

サンプルはgithub上にあるので、参考にしてみてください。

https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags

bashタスクだと下記のような形式で記述します。

dag_id, schedule_intervalを設定して、

taskのbash_commandを指定するような形です。

from builtins import range

from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
'owner': 'airflow',
'start_date': datetime.today(),
}

dag = DAG(
dag_id='test_dag', default_args=args,
schedule_interval='0 4 * * *',
dagrun_timeout=timedelta(minutes=60))

task = BashOperator(
task_id='task',
bash_command='echo "do task"',
dag=dag)

タスクの手動実行テストがCUIからできます。Jobが間違ってないかの確認はこれで。

$ airflow test [dag_id] [task_id] [実行日]

schedulerコマンドを実行することでタスクのスケジューリング設定を有効化できます。

$ airflow scheduler -D --stdout=/path/to/logs/stdout.log --stderr=/path/to/logs/stderr.log

上記で設定しているオプションに関してはwebserverのオプションと同様です。


まとめ


  • pipで入るので導入はしやすい

  • ワークフローの書き方も割とシンプル

  • 実行のみをairflowに渡すのであれば、BashOperatorだけで事足りそう

  • GUIは実行結果が見やすい。設定はCLIでやることが多くなる。