1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Airflow / Cloud Composer メモ

Last updated at Posted at 2019-09-10

DAGのシンプルな構成

公式ドキュメント

ローカル開発環境の作成(docker)

このimageをpullして使うのが便利。

ジョブパイプラインのチェック

python dags/hoge.py でチェックできる。

airflow CLI の list_dags で↑と同じ結果が見れるけど、大元のDAGに結びついていない
subDAGのファイルはチェックされないのでファイル名をfetchして個別に python xxx.py と叩いたほうが網羅できるかも。

Slackに通知したい(SlackWebHookOperator, SlackPostAPIOperator)

  • SlackWebHookOperator
    • WebHook URL作成、Airflow Conn 作る必要ある。
    • attachments 書けない(1.10.2時点)
    • slack用のパッケージインストールなどは不要。
  • SlackPostAPIOperator
    • Airflow Connは不要。slack API tokenを引数にわたす必要ある。
    • slackclient(<2.0)のインストールが必要。
    • attachements書ける。

どちら使うにしてもterraformのようなツールで
Connection もしくは pythonパッケージを追加するプロビジョニングがしたくなるので悩ましい。

ジョブの成否、retryステータスをhookして通知したい。

BaseOperator に on_{success|failure|retry}_callback という引数があり、
通知のメソッドを引数に渡せばよい。

JSTで日付データを扱いたい(user_defined_filters)

ref.

SubDAGから別のDAGの結果と依存関係を作りたい(ExternalTaskSensor)

CloudComposerのパッケージに依存しないジョブを実行したい(KubernetesPodOperator)

  • image_pull_policy の初期値は IfNotPresent なので Always にする。
    未指定だと、imageを最新にしても元のimageを消しても最新にならないので注意。

ref.

その他

  • schedulerによる実行時の execution_date と Web UIのtriggerから実行するときのそれに入る値にズレがあるのが違和感。
  • AirflowSkipException についてもあとで書く。
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?