##背景
業務でairflowを触るかもしれないので、事前にいろいろ調べておこうと思ったのがキッカケです。
ただドキュメントを読むだけだとあまりイメージわかないと思うので実際に手を動かしながらやってみます。
参考にしたのはこちらのサイトです。この場を借りてお礼申し上げます。
https://ohke.hateblo.jp/entry/2018/04/21/230000
##準備
普通にローカルにairflowをインストールできますが、せっかくなのでdockerでやってみます。
こちらのリポジトリを使います。
https://github.com/puckel/docker-airflow
さっそくcloneしていきます。
コマンドはこれで。
git clone git@github.com:puckel/docker-airflow.git
ついでにairflowの追加パッケージやpythonの依存関係を解決するために、
cloneしたリポジトリ配下に行き、以下のコマンドを実行しておきます。
docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .
ちょっと時間がかかるので、この間にコーヒーとかお茶を用意して待ちましょう。
ソシャゲのAPを消費しても良いかもしれません。
終わったら以下のコマンドを実行してairflowを立ち上げます。
docker run -d -p 8080:8080 puckel/docker-airflow webserver
うまく行けばこのような画面が表示されるかと思います。こうなればひとまず準備完了です。
ここまで大体10分かかっていないです。楽に用意できて良いですね!
##DAGの準備
環境も整ったので、実際にDAGを設定していきましょう。
DAGとは「有効非巡回グラフ(Directed acyclic graph)」の略で、
airflowでは複数集まったタスクのまとまりのことを言います。(詳しくはwikipediaからどうぞ)
とりあえずDAGに関しては元々入っていたtuto.py
を使ってみます。
DAGの定義はpythonで行い、作成したDAGは/dagsに配置するとDAGとして認識し実行できるようになります。
(この設定はairflow.cfg
で変えられます。)
###しかし...
ここで躓きました。。。
docker-airflowのREADMEと背景で紹介したサイトを頼りに動かそうとしていたのですが、airflowを実行する時にSequentialExecutor
で動かしていました。
これでDAGが表示されるかと思ったのですが、全く表示されず。。。
いろいろ調べていたら以下の記事が。。。
https://qiita.com/kysnm/items/feda7b8cca44bb7389ac
どうもSequentialExecutor
ではうまく動かないっぽく手を加える必要があります。
さらにこれとは別で紹介されているLocalExecutor
にはジョブを実行するためのschedulerが入っていないという・・・
直すのも手間なので、すでに動いている実績があるCeleryExecutor
を使うことになりました。
そのため、元々立ち上がっていたコンテナをkillして以下のコマンドでもう一度立ち上げます。
docker-compose -f docker-compose-CeleryExecutor.yml up -d
すると以下のような画面になるかと思われます。
ここまでくれば一安心です。一旦休憩しましょう。
##実際にDAGを作ってみる
ちょっと休憩したので実際にDAGを作ってみます。
といってもそんな高度なものではなく簡単なものです。
作成する場所は/dags
以下に作ります。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "raqwel",
"depends_on_past": False, # 上流タスクが失敗した時に、タスクを実行するかどうか
"start_date": datetime(2019, 5, 19),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id="dag_test",
default_args=default_args,
catchup=False, # タスク実行に期間が空いてしまった場合、その期間の分のタスクを実行するかどうか
schedule_interval=timedelta(1)
)
t1 = BashOperator(task_id="print_hello", bash_command="echo Hello World", dag=dag)
t2 = BashOperator(task_id="wait", bash_command="sleep 5", dag=dag)
template_command = """
echo "hello airflow"
"""
t3 = BashOperator(task_id="template_hello", bash_command=template_command, dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
はい、tuto.pyを写経して少し変えてみただけのファイルです。
###確認してみる
画面に戻ると先程作成したDAGが登録されているのがわかります。
何もしてないけど読み込んだ・・・すごい・・・
ということで左のOffになっているスイッチをOnにします。
Graph_viewを見てみると自分が設定した順にtaskを実行するようになっています。
とりあえず設定は完了した模様です。これで時間が来たら設定したとおりに実行してくれます。
ですが、今回は実行結果を早く見たいので、自分で実行してみます。
Linksの左にある再生ボタンをクリックしたら「本当に実行しますか」と画面に出てくるのでOKを押します。
これで実行されます。そしてログを見に行きます。
Graph View
タグから適当にtaskを一つクリックし、
出てくるポップアップ内にあるView Logをクリックします。
今回はprint_hello
を選択します。
すると以下のようなログがある画面が出てきます
見てみるとRunning command
のあとに実行された結果が載っています。うまく実行できているみたいです。
###まとめ
今回はairflowを少し触ってみました。
使ってみて良いなと思ったのは、やはりGUI部分でしょうか。
設定したタスクが成功しているか失敗しているか、どのタスクがどのタスクに依存していて、どれだけの時間実行されていたかがぱっと見てわかるのは強みだと感じました。
またいろいろ上流タスクの成否、空き期間時による実行制御等細かく設定できるのも役に立ちそうです。
今回はBashのオペレーターを使いましたが、他にもPythonなど色々な言語で処理を定義できるので、色々試していきたいと思います。
本当はここからECSも使いたかったのですが、疲れたのでここまでにします。