14
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Job管理ツールAirflowを使ってみる

Posted at

Job管理ツールを色々探してみている。
今回はAirflowを試してみる。

手順

pipでインストール。今回はMySQLを使うので指定してインストール。

$ pip install airflow[mysql]

初期設定

airflowの設定ファイル群はデフォルトだと $HOME/airflow/ に置かれる。
変更したい場合は 環境変数 $AIRFLOW_HOMEを変更する。

DB接続設定は $AIRFLOW_HOME/airflow.cfg
以下を設定

airflow.cfg
sql_alchemy_conn = mysql://[user]:[password]@[host]:[port]/[db]

上記で設定した接続先のDBを事前に作っておきます。今回はairflowというDB名にしています。

CREATE DATABASE IF NOT EXISTS airflow CHARACTER SET utf8;

examplesをロードする必要がないのでFalseに設定。
これを指定する場合は様々なデータソースのサンプルが用意されているので、
合わせてmysql以外もインストールしないといけないことがあります。

airflow.cfg
load_examples = False

上記設定変更が終わったら、DBの初期化処理を走らせます。

$ airflow initdb

初期化が完了したら下記のようにwebserverを起動させます。
デフォルトは以下の通りに8080ポートになっているので必要に応じてcfgファイルで設定を変更します。

$ airflow webserver -D --stdout=/path/to/logs/webserver-stdout.log --stderr=/path/to/log/webserver-stderr.log

上記で設定しているオプションに関しては

-D: daemonize
--stdout: 標準出力先
--stderr: 標準エラー出力先
です。
詳しくは https://airflow.incubator.apache.org/cli.html

jobの定義ファイルは$AIRFLOW_HOME/dags/ 以下に置きます。

サンプルはgithub上にあるので、参考にしてみてください。
https://github.com/apache/incubator-airflow/tree/master/airflow/example_dags

bashタスクだと下記のような形式で記述します。
dag_id, schedule_intervalを設定して、
taskのbash_commandを指定するような形です。

from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime.today(),
}

dag = DAG(
    dag_id='test_dag', default_args=args,
    schedule_interval='0 4 * * *',
    dagrun_timeout=timedelta(minutes=60))


task = BashOperator(
    task_id='task',
    bash_command='echo "do task"',
    dag=dag)

タスクの手動実行テストがCUIからできます。Jobが間違ってないかの確認はこれで。

$ airflow test [dag_id] [task_id] [実行日]

schedulerコマンドを実行することでタスクのスケジューリング設定を有効化できます。

$ airflow scheduler -D --stdout=/path/to/logs/stdout.log --stderr=/path/to/logs/stderr.log

上記で設定しているオプションに関してはwebserverのオプションと同様です。

まとめ

  • pipで入るので導入はしやすい
  • ワークフローの書き方も割とシンプル
  • 実行のみをairflowに渡すのであれば、BashOperatorだけで事足りそう
  • GUIは実行結果が見やすい。設定はCLIでやることが多くなる。
14
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?