LoginSignup
8
5

More than 3 years have passed since last update.

airflowのチュートリアルをしてみた

Last updated at Posted at 2019-05-19

背景

業務でairflowを触るかもしれないので、事前にいろいろ調べておこうと思ったのがキッカケです。
ただドキュメントを読むだけだとあまりイメージわかないと思うので実際に手を動かしながらやってみます。
参考にしたのはこちらのサイトです。この場を借りてお礼申し上げます。
https://ohke.hateblo.jp/entry/2018/04/21/230000

準備

普通にローカルにairflowをインストールできますが、せっかくなのでdockerでやってみます。
こちらのリポジトリを使います。
https://github.com/puckel/docker-airflow
さっそくcloneしていきます。
コマンドはこれで。

git clone git@github.com:puckel/docker-airflow.git

ついでにairflowの追加パッケージやpythonの依存関係を解決するために、
cloneしたリポジトリ配下に行き、以下のコマンドを実行しておきます。

docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .

ちょっと時間がかかるので、この間にコーヒーとかお茶を用意して待ちましょう。
ソシャゲのAPを消費しても良いかもしれません。

終わったら以下のコマンドを実行してairflowを立ち上げます。

docker run -d -p 8080:8080 puckel/docker-airflow webserver

うまく行けばこのような画面が表示されるかと思います。こうなればひとまず準備完了です。
スクリーンショット 2019-05-19 13.47.37.png
ここまで大体10分かかっていないです。楽に用意できて良いですね!

DAGの準備

環境も整ったので、実際にDAGを設定していきましょう。
DAGとは「有効非巡回グラフ(Directed acyclic graph)」の略で、
airflowでは複数集まったタスクのまとまりのことを言います。(詳しくはwikipediaからどうぞ)

とりあえずDAGに関しては元々入っていたtuto.pyを使ってみます。
DAGの定義はpythonで行い、作成したDAGは/dagsに配置するとDAGとして認識し実行できるようになります。
(この設定はairflow.cfgで変えられます。)

しかし...

ここで躓きました。。。
docker-airflowのREADMEと背景で紹介したサイトを頼りに動かそうとしていたのですが、airflowを実行する時にSequentialExecutorで動かしていました。
これでDAGが表示されるかと思ったのですが、全く表示されず。。。
いろいろ調べていたら以下の記事が。。。
https://qiita.com/kysnm/items/feda7b8cca44bb7389ac
どうもSequentialExecutorではうまく動かないっぽく手を加える必要があります。
さらにこれとは別で紹介されているLocalExecutorにはジョブを実行するためのschedulerが入っていないという・・・
直すのも手間なので、すでに動いている実績があるCeleryExecutorを使うことになりました。

そのため、元々立ち上がっていたコンテナをkillして以下のコマンドでもう一度立ち上げます。
docker-compose -f docker-compose-CeleryExecutor.yml up -d
すると以下のような画面になるかと思われます。
スクリーンショット 2019-05-19 14.47.58.png
ここまでくれば一安心です。一旦休憩しましょう。

実際にDAGを作ってみる

ちょっと休憩したので実際にDAGを作ってみます。
といってもそんな高度なものではなく簡単なものです。
作成する場所は/dags以下に作ります。

Hello.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    "owner": "raqwel",
    "depends_on_past": False, # 上流タスクが失敗した時に、タスクを実行するかどうか
    "start_date": datetime(2019, 5, 19),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id="dag_test",
    default_args=default_args,
    catchup=False, # タスク実行に期間が空いてしまった場合、その期間の分のタスクを実行するかどうか
    schedule_interval=timedelta(1)
)

t1 = BashOperator(task_id="print_hello", bash_command="echo Hello World", dag=dag)

t2 = BashOperator(task_id="wait", bash_command="sleep 5", dag=dag)


template_command = """
    echo "hello airflow"
"""

t3 = BashOperator(task_id="template_hello", bash_command=template_command, dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)

はい、tuto.pyを写経して少し変えてみただけのファイルです。

確認してみる

画面に戻ると先程作成したDAGが登録されているのがわかります。
スクリーンショット 2019-05-19 15.37.42.png
何もしてないけど読み込んだ・・・すごい・・・
ということで左のOffになっているスイッチをOnにします。
Graph_viewを見てみると自分が設定した順にtaskを実行するようになっています。
スクリーンショット 2019-05-19 15.39.43.png
とりあえず設定は完了した模様です。これで時間が来たら設定したとおりに実行してくれます。
ですが、今回は実行結果を早く見たいので、自分で実行してみます。
Linksの左にある再生ボタンをクリックしたら「本当に実行しますか」と画面に出てくるのでOKを押します。
スクリーンショット 2019-05-19 15.44.46.png
これで実行されます。そしてログを見に行きます。
Graph Viewタグから適当にtaskを一つクリックし、
出てくるポップアップ内にあるView Logをクリックします。
今回はprint_helloを選択します。
すると以下のようなログがある画面が出てきます
スクリーンショット 2019-05-19 15.48.57.png
見てみるとRunning commandのあとに実行された結果が載っています。うまく実行できているみたいです。

まとめ

今回はairflowを少し触ってみました。
使ってみて良いなと思ったのは、やはりGUI部分でしょうか。
設定したタスクが成功しているか失敗しているか、どのタスクがどのタスクに依存していて、どれだけの時間実行されていたかがぱっと見てわかるのは強みだと感じました。
またいろいろ上流タスクの成否、空き期間時による実行制御等細かく設定できるのも役に立ちそうです。
今回はBashのオペレーターを使いましたが、他にもPythonなど色々な言語で処理を定義できるので、色々試していきたいと思います。
本当はここからECSも使いたかったのですが、疲れたのでここまでにします。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5