公式ドキュメント
###公式DOC
Tutorial
####Tutorial
まずはこれを動かしながら読んで感覚掴む。
-
airflow test ${dag_id} ${task_id} ${start_date}
で依存無視して確認できる -
airflow backfill ${dag_id} -s ${start_date} ${end_date}
で依存性考慮して実行する- start_dateで指定した日付からDagがキューに積まれる
Celery Executor
Celery Executor
- workerをスケールアウトできる実行エンジン
- Redis, RabbitMQとかのバックエンドを用意する必要がある
- dagファイルはクラスタで同期してある必要がある
- 一箇所のGCSやgithubに配置するなど
- worker: タスクを実行する
- scheduler: queueにタスクを追加する
- web server: dagやtaskのステータス情報を提供する
- database: dag, task, variables, connectionsなどのステータスを保持
- celery: queueの仕組み
- celeryは2つの要素を持つ
- broker: 実行コマンドを保存
- result backend: 実行済コマンドの保存
- celeryは2つの要素を持つ
airflow.cfgで設定できる並行設定
Airflow parallelism
- parallelist: クラスタ全体でのタスクの最大実行数
- dag_concurrency: worker毎のタスク最大実行数
- max_active_runs_per_dag: dag毎のタスク最大実行数
- worker_concurrency: worker毎のスレッド最大数
- max_threads: schedulerのスレッド最大数
Poolとこの並行設定でチューニングを行う
Concepts
ここ一番読み応えあり。各種用語の説明がされる。
Dags
実行したいタスクを集めて、関係性を定義したもの。
Operators
実行したいタスクを定義する
Dag Assignment
いろんな方法が記載されてる
Tasks(Life Cycle)
Taskにどういうステータスがあるのかがわかる
Hook
外部システムとのインターフェイスらしい。Operatorでも普通に外部システムとやり取りするのでなぜこれを切り出したのかは謎。
Pool
- 並列実行の制限数
- Menu -> Admin -> Pools で確認できる
- タスクが作られるときにどこかのPoolにアサインされる
- priority_numberでqueueからPoolにアサインされる順位が変わる
- poolがないとqueueにつめられる
- default の pool数は 128
Connections
- UIから設定できる外部への接続情報
- Menu -> Admin -> Connections で確認できる
- GCPとの接続情報とか、DBとの接続情報とかを入れたりできる
- 設定をDagにハードコーディングしたくない人用。
Queues
- Poolにアサインされて実行されるまでに積まれる場所
- Celeryだとどのqueueに積むかを指定できる
- workerは1つでも、複数でもqueueを持てる
- どのタスクをどのキューに入れるかで優先順位を制御できたりする
XComs
- タスク同士でメッセージのやり取りを実現させるやつ
Variables
- あらかじめ変数を設定しておきたい場合にUIから設定できる
Branching
- タスク同士の依存に条件つけできたりする
- このタスク失敗したら、こっちのタスクだけ実行するみたいな
SubDags
- 複数タスクを1まとめにして、サブダグとして管理できる
- 同じようなタスクがいっぱいあるとき便利
Trigger Rule
- タスクのトリガー条件を決められる
- 親タスクが全て失敗したら...とか、親タスクが1つでも失敗したら ...など
Latest Only
- リリースしたときに、直近のタスクだけ実行するようにできる
- データのバックアップなどのように、直近分1回完了していればいいようなタスクの場合は便利
Jinja Template
- Jinja Templateが使えるパラメータはコード見に行って、 パラメータに
templated
って書いてあれば使えるって。笑 - マクロも便利
Command Line
Command Line
Scheduling
Scheduling
- Dagのスケジューリングの定義の仕方
- catchupをoffにすると、start_dateからdagがいっぱい生成されない
- TaskやDagを clearすると再実行されるよ
Timezone
https://airflow.apache.org/timezone.html
- DBにはUTCで入っている
- UIもUTCで表示される
Integration
Macros
Macro
- jinjaで使えるMacro