14
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

LivesenseAdvent Calendar 2020

Day 6

Apache Airflow 2.0を使ってみる

Last updated at Posted at 2020-12-06

はじめに

Livesense Advent Calendar 6日目を担当します @take-m です。普段はリブセンスのデータ分析基盤の開発、運用を担当しています。

今回はAirflow 2.0がリリース間際ということで、1系とどこが変わったのか、また実際にインストールする手順を紹介したいと思います。

なお、本稿執筆時点での最新リリースは2.0.0b3ですが、予定ではまもなくrc1がリリース予定で、ここで書かれた内容と実際の動作とは異なる場合があることをご了承ください。

Apache Airflowとは

Airflowはデータ処理フローを管理するオープンソースのツールです。2014年にAirbnb社のMaxime Beauchemin(現在はpreset社CEO)が開発し、その後2016年にApacheソフトウェア財団のサポートを受け、現在はApache Airflowという正式名称になっています。

現在ではGCPのCloud ComposerやAWSのAmazon Managed Workflows for Apache Airflow(MWAA)など、クラウドプラットフォームのマネージドサービスとしても提供されており、世界中で広く利用されているワークフローツールです。

AirflowはPythonで書かれていて、実行するタスクやフローの定義もPythonで作成していきます。

作成されたタスクフローはDAG(有向グラフ)で表すことができ、WebUIの管理画面で依存関係や実行状況などがビジュアライズされます。

このタスクの定義に利用できる数多くのオペレータやトリガーが用意されています。PythonやBashの処理を実行するためのオペレータをはじめ、GCPやAWSなどのサービスやKubernetesなどを実行、操作するためのオペレータも数多く用意されています。

Airflow 2.0

そのAirflowのメジャーバージョンアップ作業が現在進んでいます。10月にα版、11月にはβ版がリリースされ、12/6現在でβ3までリリースされています。まもなくRC版がリリース予定で、早ければ年内にも正式リリースとなりそうです。(参考: Airflow 2.0 - Planning)

ここでAirflow 2.0の主な変更点を紹介します。なお採用されたv1からの変更内容については、昨年行われたAirflowユーザーに対するサーベイの結果が参考になっているそうです。

TaskFlow API

Airflowで個人的に不便を感じていたのが、タスク間での情報のやり取りでした。標準ではXComを利用するのですが、ちょっと癖のある仕様であまり使い勝手がいいものではありませんでした。

Airflow 2.0では TaskFlow API, Task Decoratorが導入されます。これにより、自動でPythonOperatorタスクを生成したりXComの記述が抽象化され、DAGの定義を直感的かつ簡素に書くことが出来るようになります。

with DAG(
    'test',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2)
) as dag:
    dag.doc_md = __doc__
    @task
    def test1(message: str) -> str:
        print(message)
        return message

    @task
    def test2(message: str) -> int:
        return len(message)

    @task
    def test3(l: int) -> int:
        return l * 2

    test3(test2(test1('hoge')))

例えば上記のようなコードでは以下のようなDAGが自動的に生成されます。

airflow_dag.png

従来の書き方であれば以下のようなコードになると思います(Airflow 2.0でも動作します)。

with DAG(
    'test_v1',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2)
) as dag:
    dag.doc_md = __doc__

    def test1(**kwargs):
        ti = kwargs['ti']
        message = 'hoge'
        print(message)
        ti.xcom_push('message', message)

    def test2(**kwargs):
        ti = kwargs['ti']
        message = ti.xcom_pull(task_ids='test1', key='message')
        ti.xcom_push('length', len(message))

    def test3(**kwargs):
        ti = kwargs['ti']
        l = ti.xcom_pull(task_ids='test2', key='length')
        print(l)

    task1 = PythonOperator(
        task_id='test1',
        python_callable=test1,
    )

    task2 = PythonOperator(
        task_id='test2',
        python_callable=test2,
    )

    task3 = PythonOperator(
        task_id='test3',
        python_callable=test3,
    )

    task1 >> task2 >> task3

PythonOperator を生成し python_callable に実行するメソッドを指定し、オペレータ間の値の受け渡しはXComを使って、依存関係を >> で定義して… という手順で記述していました。

しかしAirflow 2.0では @task デコレータを用いて関数を定義し、呼び出し側で引数、戻り値を使って値を受け渡す事が出来ます。このように記述すると内部で PythonOperator に変換し、タスクIDを自動的に生成し、関数の戻り値をXComを用いて次のオペレータに受け渡し、依存関係も定義されます。

スケジューラの刷新

スケジューラーのパフォーマンスの見直しを行い、スケーラビリティの向上が図られました。

スケジューラーの負荷が増大した時に、レプリカスケジューラーを起動することが出来るようになり、スループットを向上させることが出来るようになったようです。

またタスクの待ち時間短縮が行われ、高速にタスクスケジューリングが行われるようになりました。

新しいスケジューラについては SPOFとはもう呼ばせない!Airflow 2.0で生まれ変わったHAスケジューラー が分かりやすくまとめられています。

REST API

v1にもAPIはあったのですが、Experimental という位置づけでした。そのため、DAGのトリガーなどは出来ましたが認証などができず、安全に運用するには不十分な仕様でした。

そこでAirflow 2.0では仕様を見直し、新しいREST APIが導入されました。

新しいREST APIの特徴として次のものが挙げられます。

  • サードパーティが簡単にアクセスできる
  • Swagger / OpenAPI 仕様に基づいている
  • すべてのAirflowリソースに対してCRUD操作ができる
  • 認証対応

これにより、セキュアにAirflowと外部のプログラムとの連携が非常にやりやすくなり、DAGの制御だけでなくエラーやステータスの監視なども出来るようになります。

その他

他にも、ワーカースロットの消費を50%減らした Smart Sensor の導入や、SubDAGの問題点を見直した Task Groups の導入などもあります。

また、AWSやGCP、Azureなど外部サービスのオペレータなどの管理を、Airflowのコアディストリビューションと独立して airflow/providers ディレクトリ以下で管理、リリースできるようになりました。

もちろんUI/UXも大きく見直されており、見た目がモダンになり使い勝手が向上しています。

Install

さて実際にAirflow 2.0を動かしてみようと思います。以下のコマンドを実行し、インストールから起動まで行っていきます。

初期手順

pip install apache-airflow==2.0.0b3
export AIRFLOW_HOME=$PWD
airflow db init

airflow webserver -D
airflow scheduler -D

Airflow 2.0ではデフォルトでログイン認証が有効になっています。なのでユーザーを作っておく必要があります。以下のコマンドでadminユーザーを作成します。(パスワードやメールアドレスは適当に変更してください)

airflow users create -u admin -f test -l test -p hogehoge -r Admin -e test@example.com

アクセスしてみる

port 8080で起動しているのでアクセスしてみましょう。先ほど作成したadminユーザーでログインする事が出来ます。

airflow_login.png

ログイン後、DAG一覧が表示されます。初期状態ではサンプルのDAGの一覧が表示されます。(エラーが出ているのはKubernetes関連のモジュールをインストールしていないのが原因なので、今は無視してください)

airflow_home.png

ちなみに画面上部のロゴにカーソルオーバーすると風車が回転するギミックが隠されていますw

airflow_logo.gif

先ほど作成したDAGの画面を開いてみます。メニューの並びが、v1ではGraph Viewが先頭でしたがv2ではTree Viewが先頭になっています。画面のリフレッシュやトリガー実行、DAG削除ボタンが追加されています。

airflow_tree_view.png

Graph ViewではAuto-refreshの設定もできるようになりました。これをonにしておくと自動でタスクの実行状況が更新されていきます。

airflow_graph_view.png

またユーザー管理やロールの設定が細かく出来るようになったりしてます。
airflow_users.png
airflow_roles.png

まとめ

リブセンスの分析基盤の運用でAirflowを今まで活用してきましたが(参考: Airflow を用いたデータフロー分散処理
リブセンスのデータ分析基盤を支えるRedshiftとAirflow)、色々癖があり運用で苦労する事も多々ありました。しかし今回のメジャーバージョンアップで様々な改善が行われており、使い勝手も向上しているようなので、正式リリースを待ちつつ新バージョンへの移行も検討していきたいと思います。

参考

14
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?