DAGのシンプルな構成
公式ドキュメント
-
Apache Airflow
- http://airflow.apache.org
- バージョン毎の使えたり使えなかったりが多いので使っているバージョンで固定して調べるのが良さそう。
-
Google Cloud Composer
- Airflowのマネージドサービス
- https://cloud.google.com/composer/docs/concepts/overview
ローカル開発環境の作成(docker)
このimageをpullして使うのが便利。
ジョブパイプラインのチェック
python dags/hoge.py でチェックできる。
airflow CLI の list_dags で↑と同じ結果が見れるけど、大元のDAGに結びついていない
subDAGのファイルはチェックされないのでファイル名をfetchして個別に python xxx.py と叩いたほうが網羅できるかも。
Slackに通知したい(SlackWebHookOperator, SlackPostAPIOperator)
- SlackWebHookOperator
- WebHook URL作成、Airflow Conn 作る必要ある。
- attachments 書けない(1.10.2時点)
- slack用のパッケージインストールなどは不要。
- SlackPostAPIOperator
- Airflow Connは不要。slack API tokenを引数にわたす必要ある。
- slackclient(<2.0)のインストールが必要。
- attachements書ける。
どちら使うにしてもterraformのようなツールで
Connection もしくは pythonパッケージを追加するプロビジョニングがしたくなるので悩ましい。
ジョブの成否、retryステータスをhookして通知したい。
BaseOperator に on_{success|failure|retry}_callback
という引数があり、
通知のメソッドを引数に渡せばよい。
JSTで日付データを扱いたい(user_defined_filters)
ref.
SubDAGから別のDAGの結果と依存関係を作りたい(ExternalTaskSensor)
CloudComposerのパッケージに依存しないジョブを実行したい(KubernetesPodOperator)
- image_pull_policy の初期値は
IfNotPresent
なのでAlways
にする。
未指定だと、imageを最新にしても元のimageを消しても最新にならないので注意。
ref.
- https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator
- https://kubernetes.io/docs/concepts/configuration/overview/#container-images
その他
- schedulerによる実行時の execution_date と Web UIのtriggerから実行するときのそれに入る値にズレがあるのが違和感。
- スケジューリングの動作についてわかりやかった説明
https://medium.com/programming-soda/590ad9f9fb80
- スケジューリングの動作についてわかりやかった説明
- AirflowSkipException についてもあとで書く。