この記事でやること
ワークフローエンジンであるAirflowの使い方のポイントを書きます
私はGCPのCloud Composerで使っています
※ワークフローエンジンについては以前こちらに書きました
https://qiita.com/elyunim26/items/15db924e4c9833e5050a
MWAA来ましたね
ちょうど、AWSのフルマネージドAirflow(MWAA)も来ましたね
使い始めるかたもいらっしゃるのではないでしょうか
https://dev.classmethod.jp/articles/amazon-managed-workflows-for-apache-airflow-mwaa-ga/
AWS MWAAの利用料としてはGCPのCloud Composerと似たようなレンジで400-600$/月くらいはかかってしまいそうです
https://aws.amazon.com/jp/managed-workflows-for-apache-airflow/pricing/
フルマネージドはコストは掛かるものの、死活監視などもやってくれていて、完全に放置してても環境が死んでたりしないので楽ではあります
ただコストはかかるので、より良い手段があれば移行したい気持ちもあります
Airflowを使う理由
- GUIが強力で処理の可視化ができ、そのまま操作もできる
- 特定タスクが失敗したときのリトライが直感的にできる
- backfill(開始日時を指定すると現在日までのタスクを順次実行させる)ができる
- 「この日のデータから連携したくて、後はよろしくっ!」みたいな感じで使える
- 固定化され時間がかかるワークロードを、固定化されたスケジュールで実行する用途に特化
- 後述するPrefectチームによる説明
- dag記述はpythonなのである程度プログラマブルに書ける
- 反復して書く記述をまとめたりだとか
- 開発が活発で情報はたくさんある
- フルマネージドサービスがあり、運用で楽ができる
他に注目しているワークフローエンジンは後で触れます
設計編
Airflowの基本的な使い方に関しては割愛します
例えば下記などの記事をご覧になってください
Airflow のアーキテクチャをざっくり理解して、どうやって使うのか学んでみた | Developers.IO
https://dev.classmethod.jp/articles/airflow-gs-arch-learn/
以後は下記の用語を使います
タスク: dagの中の1つ1つの処理
(例えば、12/2の"前処理"というタスク)
dag: それぞれの日ごとのタスクの集まり
(例えば、12/2の"[前処理, 推論, 後処理]"という一連の流れ)
DAG: dagを生成する元になるタスク集合の定義
(例えば"レコメンド"という名前の[前処理, 推論, 後処理]という一連の流れがあり、
その定義に従って12/1、12/2…のdagが生成される)
※dagという言葉は有向非巡回グラフ(Directed acyclic graph)の略です
https://ja.wikipedia.org/wiki/%E6%9C%89%E5%90%91%E9%9D%9E%E5%B7%A1%E5%9B%9E%E3%82%B0%E3%83%A9%E3%83%95
基本はKubernetesPodOperatorで実行
使い始めの頃はいろんなオペレータを使っていたのですが、今の所KubernetesPodOperatorに落ち着いています
こちらの記事でも書かれているのですが、
https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
Airflowのオペレータにはバグが有る可能性と、オペレータの仕様を読み込むコストが必要で、
それを考えると全部コンテナに入れたほうが実装が早い、という考えです
また、PythonOperatorでairflowの環境内でpythonを動かすこともできるのですが、
- 依存ライブラリをairflow環境に入れる必要がある
- スクリプトのテスト実行時などにairflow環境を作るのは面倒。コンテナ内で実行するほうが楽
- コンテナ化しておけば別環境への移植がやりやすくなる(ただワークフロー部分は書き直さないといけないけど)
などの理由でコンテナ化したほうがやりやすいと思います
podからAWS Batchを実行させる
消費リソースが多く、スケールアウトさせたい処理はAWS Batchで実行させるという手段があります
KubernetesPodOperatorが動くクラスタが大量のタスクを実行する巨大なクラスタであれば、その処理をクラスタ内でやることもできるのですが、
クラスタ自体は普段は小さいインスタンスで、重い処理だけを切り出したい場合に有効です。
AWS Batchであれば利用しているときだけ稼働させられ、スポットインスタンスでの実行もできます
例えば、m5.4xlargeやg4dn.xlargeを10並列でやりたいだとか、処理をもっと早くやりたいからさらにスケール数を上げるだとかの使い方ができます
AWS Batchの話はここに書きました
https://qiita.com/elyunim26/items/d0680fb8eff71ab1f134
運用編
airflow.cfgの設定
airflowの設定をいじることで振る舞いを変えることができます
https://airflow.apache.org/docs/stable/configurations-ref.html
例えば、DAGに渡すdefault_argsにdepends_on_pastというパラメータがあります。
https://airflow.apache.org/docs/stable/tutorial.html
こちらをFalseと設定すると(デフォルトTrue)、前日分のタスクが終わっていなくても翌日以降のタスクが実行されるようになります
日単位で冪等なタスク(つまり12/2のタスクの後に12/1の同じタスクを実行しても問題がない)にのみ利用できるのですが、並列度を上げられるので
例えば1ヶ月分を並列に一気にやりたいみたいな場合に使えます
ただその場合、デフォルトの並列設定だと並列度が高すぎてKubernetesPodOperatorで作成するpodのリソースが足りずにエラーになる場合があるので
いい感じに並列数を制限したいです
max_active_runs_per_dag:(cloud composerデフォルト15)
完了していない最も古い日付のdagに対して、何日までのdagを新規に起動するか
dag_concurrency:(cloud composerデフォルト15)
1つのDAGの中で同時に動くタスクの上限
cloud composerの場合は、設定画面から変えることができます。環境への反映も自動で行われます
一時停止の使い方
一時的にタスクを止めたくなることがあります
(本当はタスクの実行前提が満たされることをチェックして起動するとかしたほうがいいです、例えばbigqueryのテーブルができていることを確認するだとか)
タスクを止める方法は2種類あります
DAGを非アクティブにする
DAG画面左上のボタンをOffにします。下の図は非アクティブになっている状態です
この状態であれば新規にタスクが起動されなくなります
最新のタスクを失敗させる
前述したdepends_on_pastがTrueの場合にのみ利用できます
手動でタスクを失敗状態にしておくと、上記の状態では翌日移行の同じタスクが実行されないため、実行がそこで止まることになります
この場合、過去分のタスクが終わっていない場合でも新規に起動させられるため
この日以降は止めておきたいけど、まだ終わっていない過去のタスクは実行しておいて欲しい
という用途に使えます
ポップアップが出ます。右上の"Task Instances"をクリックします
ターゲットを選択して"Set state to 'failed'"をクリックして失敗させます
他に注目しているワークフローエンジン
Prefect
Prefectチームが作っている、オープンソースのワークフローエンジン
https://github.com/prefecthq/prefect
Prefect Cloudという有料のマネージドクラウドも提供されている
チームの用途としてはAirflowで運用できているのですが、よりモダンなプロダクトとして注目しています
Prefectの設計思想
コードが意図したとおりに実行されることを保証できれば、ワークフローシステムはまったく必要ないということです。ワークフロー管理が重要になるのは、問題が発生したときだけです。この観点から、ワークフローシステムはリスク管理ツールであり、適切に設計されている場合は、必要になるまでユーザーの邪魔にならないようにする必要があります。
Prefectの設計目標は、問題が発生した場合は侵襲性を最小限に抑え、問題が発生した場合は最大限に役立つことです。
Airflowとの差分
ご丁寧にその話題だけで記事になっています
https://docs.prefect.io/core/getting_started/why-not-airflow.html#overview
以下抜粋
- Airflowはスケジュールから外れた実行をしづらいが(例えば12/2分の実行だけを1回やりたいみたいな)、Prefectはスケジューラをシンプルに作っているため定期実行以外の用途もカバーできる
- Airflowでやる場合は専用にDAGを切り出して使うか、もしくはAirflow外でコンテナ実行しちゃいます
- Airflowは10秒ごとのポーリングでタスクを実行するが、Prefectはミリ秒レイテンシで実行する
- Prefectはタスク間でのデータのやり取りをより自然にできるようにした。(AirflowのXcomが使いにくいって言ってる)
- Xcomは使わないようにして、データはS3でやりとりしています
- Airflowはタスクのバージョン管理については自分でgit管理することになるが(なのでgit反映してないコードがデプロイされる可能性がある)、Prefectはバージョン管理をシステムに組み込んだ
- AirflowのGUIは素晴らしいので、PrefectもGUIに注力している
記事リンク
prefectはluigiなどの軽量ETLの後継になれそうって書いてあります
データ基盤を支える技術 - ETLフレームワークの実践的な選び方・組み合わせ方 - JX通信社エンジニアブログ
https://tech.jxpress.net/entry/dataplatform-etl
Kubeflow Pipeline
ZOZOの方がKubeflowのPipelinesのみを利用している事例を紹介してくださっています
Kubeflowは全体像が大きくて手が出せなかったのですが、活用事例を載せていただいて参考になります
各種フレームワークがGCP上で動く感じですね
AI Platform Pipelines (Kubeflow Pipelines)による機械学習パイプラインの構築と本番導入 - ZOZO Technologies TECH BLOG
https://techblog.zozo.com/entry/aip-pipelines-impl#Kubeflow-Pipelines
Kubeflowは実行フレームワークとしてArgoを利用しています
https://github.com/argoproj/argo
Argo
Kubeflowのバックエンドにも使われているプロダクトです
単体で利用する場合の印象は多機能性であることよりシンプルな実装であることを重視している印象です
前に見たときからいろいろ開発は進んでいました。
ただUIはシンプルにする思想のようで、
毎日実行するようなバッチでもリスト表示でpodが並ぶようなUIになっており
airflowのように日付とジョブで一覧できるような見やすいUIではなさそうです。
argoを試す方法
これで試せます
https://github.com/argoproj/argo
https://qiita.com/oguogura/items/3ab535e89d9762ea618e
minikube start
kubectl create namespace argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/stable/manifests/install.yaml
kubectl -n argo port-forward deployment/argo-server 2746:2746
minikube stop
Serverless Workflow Specification
イベントドリブンなワークフローエンジンのための仕様
ちゃんと話し合って良い仕様のプロダクトを作らないといけない、みたいな感じで仕様が話し合われています
今後これに則ったプロダクトが出てくることに期待してます
https://github.com/serverlessworkflow/specification/blob/0.5.x/specification.md
その他、MLOpsの参考記事
選択肢としては他にも様々な方法があります
小さく始めて大きく育てるMLOps2020 | AI tech studio
https://cyberagent.ai/blog/research/12898/
→試してないですがKedroというプロダクトが紹介されています
ゆるふわMLOps入門 - Re:ゼロから始めるML生活
https://www.nogawanogawa.com/entry/mlops
→mlopsまとめの他にアメリカの各社がどうしてるかがまとまっています
まとめ
前半でチームで利用しているAirflowの使い方の説明を行い
後半でAirflow以外の選択肢についてまとめました
Airflowの用途としては"固定化され時間がかかるワークロードを、固定化されたスケジュールで実行する"なので
基本的に用途があっていればAirflowを使えばいいと思います
その上でより良い手段があったときに、移行しやすいようにしておくというのが今の方針です