Job管理ツールを色々探してみている。
今回はAirflowを試してみる。
手順
pipでインストール。今回はMySQLを使うので指定してインストール。
$ pip install airflow[mysql]
初期設定
airflowの設定ファイル群はデフォルトだと $HOME/airflow/ に置かれる。
変更したい場合は 環境変数 $AIRFLOW_HOMEを変更する。
DB接続設定は $AIRFLOW_HOME/airflow.cfg
以下を設定
sql_alchemy_conn = mysql://[user]:[password]@[host]:[port]/[db]
上記で設定した接続先のDBを事前に作っておきます。今回はairflowというDB名にしています。
CREATE DATABASE IF NOT EXISTS airflow CHARACTER SET utf8;
examplesをロードする必要がないのでFalseに設定。
これを指定する場合は様々なデータソースのサンプルが用意されているので、
合わせてmysql以外もインストールしないといけないことがあります。
load_examples = False
上記設定変更が終わったら、DBの初期化処理を走らせます。
$ airflow initdb
初期化が完了したら下記のようにwebserverを起動させます。
デフォルトは以下の通りに8080ポートになっているので必要に応じてcfgファイルで設定を変更します。
$ airflow webserver -D --stdout=/path/to/logs/webserver-stdout.log --stderr=/path/to/log/webserver-stderr.log
上記で設定しているオプションに関しては
-D: daemonize
--stdout: 標準出力先
--stderr: 標準エラー出力先
です。
詳しくは https://airflow.incubator.apache.org/cli.html
jobの定義ファイルは$AIRFLOW_HOME/dags/ 以下に置きます。
サンプルはgithub上にあるので、参考にしてみてください。
https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags
bashタスクだと下記のような形式で記述します。
dag_id, schedule_intervalを設定して、
taskのbash_commandを指定するような形です。
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'start_date': datetime.today(),
}
dag = DAG(
dag_id='test_dag', default_args=args,
schedule_interval='0 4 * * *',
dagrun_timeout=timedelta(minutes=60))
task = BashOperator(
task_id='task',
bash_command='echo "do task"',
dag=dag)
タスクの手動実行テストがCUIからできます。Jobが間違ってないかの確認はこれで。
$ airflow test [dag_id] [task_id] [実行日]
schedulerコマンドを実行することでタスクのスケジューリング設定を有効化できます。
$ airflow scheduler -D --stdout=/path/to/logs/stdout.log --stderr=/path/to/logs/stderr.log
上記で設定しているオプションに関してはwebserverのオプションと同様です。
まとめ
- pipで入るので導入はしやすい
- ワークフローの書き方も割とシンプル
- 実行のみをairflowに渡すのであれば、BashOperatorだけで事足りそう
- GUIは実行結果が見やすい。設定はCLIでやることが多くなる。