16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Airflow を2年間使って分かったこと

Posted at

はじめに

こんにちは。テクノプロで自動車関連の組み込み開発エンジニアをしている神崎 一郎です。
SI(System Integration)のプラットフォームを開発しています。
それに、従来の Jenkins を中心に使ってきた処理を Airflow に移行している中、Airflow のドキュメントに書かれていないか理解しにくいことがあり、今回は、それらを書きたいと思います。

Airflow について

Airflow とは、Apache Airflow という Apache License の OSS です。主な機能:

  • Workflow - DAG(Directed Acyclic Graph、有向非巡回グラフ)、Sub Dag、別 Dag の起動
  • スケジューラー
  • 分散処理 - Worker は TCP/IP ネットワーク上のコンピュータに分散される
  • ウェブ管理画面 - モニター、ログを見る
  • コマンドツール - DAG の再実行等

詳細は、下記の URL に参照して下さい。

参考として、当方の環境をメモします:

  • Ubuntu 16.04
  • Python 2.7.12
  • Airflow v1.7.1.3

前提:Airflow システムの構成

  • PC No.1 (Node A)
    • Airflow Browser
    • Airflow Scheduler
    • Airflow Flower
    • Airflow Worker (Queue 指定なし)
  • PC No.2 (Node B)
    • Airflow Worker (Queue = build)
  • PC No.3 (Node C)
    • Airflow Worker (Queue = tester)

ある Task を指定の PC で実行させたい

よくあることですね、Build の DAG は、Build のツールの設定されたBuild の PC に実行させたい。
手順は下記の通りです。

    1. Worker を Queue を指定し起動する:
(myanaconda2) di@blueci:~$ airflow worker -q build
    1. Operator に 引数 queue を指定する:
def print_hello():
    return 'Hello world!'

dag = DAG('hello_world',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20))

with dag:
    dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
    hello_operator = PythonOperator(
        task_id='hello_task', 
        queue='build', 
        python_callable=print_hello, 
        dag=dag)
    dummy_operator >> hello_operator

DAG を 複数 Folder に分散させたい

Airflow 開発の規模が大きくなると、DAG も多くなり、その時、DAG を機能毎に 別々の Folder に分類したいですね。その時、Airflow の DAGBag を使うことが出来ます。下記の記事を参照して下さい:

DAG から Airflow 外のソースコードを参照したい

業務ロジックの記載されるソースコードは、DAG Folder にある DAG 定義のソースコードに分離させたいものですね。
DAG 定義のソースコードは、どうすれば、外に業務ロジックのソースコードを参照出来ますか。
答えは、Airflow から参照したいソースコードは、Python の Package として作成し、インストールする。
そうすれば、DAG 定義のソースコードは import 出来ます。
尚、よくある 参照されるソースコードがあるのに、DAG の import エラーがでるトラブルは、この方法で回避出来ます。
因みに、開発の時、Python の Package は、下記のコマンドでインストールすれば、その後、ソースコードの変更は自動で反映されます:

$ pip install -e .

Web Browser / Scheduler の見える DAG と Workers の見える DAG ソースコード

Airflow では、DAG や Operator 等ソースコードは、Web Browser(Scheduler)と Worker のノードに配布する必要があります。
開発の規模が大きくなると、Worker は、ツールや環境の異なる複数の PC に分散され、異なるソースコードは、それぞれのノード上にある Worker に実行されますね。それらのソースコードは、全て集まって Web Browser / Scheduler のあるノードに配置することが極めて困難で、そもそも、ツールや Library が足りないため、コンパイルすらできません。
では、どうすれば良いのでしょうか。
答えは以下です。
そもそも、全てのソースコードを集まって、その一式を用意し、全てのノードに配布する発想は、間違っています。
実は、Airflow は概ねこのように動作します、

  • Airflow では、Web Browser / Scheduler は、DAG 及びそれに含まれた Task を検出し、Task ID を Worker に通知する。
  • Worker は、通知された DAG ID と Task ID で、その PC 上のソースコードを 検出し実行する。

要するに、

    1. Web Browser / Scheduler と Worker には、ある DAG ID/Task ID に対し、異なる Version のソースコードで構いません。
    1. 各 Worker に、その Worker に実行させたいソースコードだけ配布すると足りる。

なので、下記のように、ソースコードを管理すると良いでしょう。

  • A) ソースコードを Git の Repository に格納する。
  • B) Web Browser / Scheduler 用 ソースコードは、Git branch(例え)develop とする(Task の中身はダミーで構わない)。
  • C) PC No.2 (Node B) 用ソースコードは、Git branch build とする。
  • D) PC No.3 (Node C) 用ソースコードは、Git branch tester とする。

それぞれのノードで Airflow サービスを起動する前、そのノード用ソースコードを Checkout します。

後書き

いかがでしたか。上記のポイントを押さえたら、Airflow 応用の開発では、過去のソースコードは活用でき、Airflow の深く理解される方とそれまでの理解が必要のない方と手分けて、Airflow のソースコードと実処理のソースコードを分けて作成でき、Airflow に移行はよりスムーズになると思います。
ぜひ、Airflow を活用しましょう。

16
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?