Airflowはいくつかのコンポーネントから構成されており、その一つにデータベースがあります。
あまり情報がなかったので、簡単にまとめてみました。
Cloud Composerのアーキテクチャー図だと、右上の「Tenant Project」にある「Airflow Database」の部分の話です。
いじってみる
どんなデータがあるかは、動いているAirflowのデータベースにクエリ投げるとわかりやすいです。
GUIから
Airflow画面からは、ヘッダーの「Ad Hoc Query」からデータベースにアクセス出来ます。
ちなみに、クエリー結果をCSVでエクスポートすることも出来ます。
puckelさんのDocker
を使っている人は、
docker run -v ホストのディレクトリ:/usr/local/airflow -d -p 8080:8080 puckel/docker-airflow webserver
のように起動すると、ホストのSQLite3クライアントからアクセス出来て便利です。
Cloud Composer
そのうち書きます
DBの実体
Airflowのドキュメント曰く、「you should be able to use any database backend supported as a SqlAlchemy backend」らしいので、データベースの実体は、システム構成次第で、MySQLにもPostgreSQLにもSQLite3にもなりえます。
(ただし「We recommend using MySQL or Postgres」らしいです。)
ちなみに、Cloud ComposerのデータベースはMySQLのようです。
(ドキュメント見つけられませんでしたが、GKEのairflow-sqlproxyのDeployment Detailsから確認出来ます。)
どんあテーブルがあるのか
1.10系にあるテーブルをリストアップしてみました。
ちなみにテーブル作っているのはソースコードでいうとここらへんで、alembicというツールを使って管理しているようです。
テーブル | 概要 |
---|---|
alembic_version | alembicの情報 |
chart | Airlfowデータベースをグラフ化するChart機能の設定 |
connection | 外部システムへの接続管理するconnection |
dag | DAGの情報 |
dag_pickle | ? |
dag_run | DAGの実行(dag_run)の情報 |
import_error | ? |
job | 内部処理(後述) |
knonwn_event | ? |
known_event_type | ? |
kube_resource_version | KubernatesExecutorの何か? |
kube_worker_uuid | KubernatesExecutorの何か? |
log | タスクのログ(workerで実行されたコマンド内容など)、および、UI画面での表示の履歴 |
sla_miss | 一定のスケジュールで実行されなかった時(SLA miss)の情報 |
slot_pool | タスクの種類によって同時実行数を制御するpoolの設定 |
task_fail | 失敗したタスクインスタンスの履歴 |
task_instance | タスクインスタンスの履歴 |
task_reschedule | ? |
users | 認証・認可 |
variable | 設定をAirlfowに保持出来るAirlfow Variable |
xcom | タスクインスタンス間の情報のやりとりに使うxcom |
いくつかのテーブルの話
たくさんありましたが、知っておくと楽しそうなテーブルをいくつかご紹介。
dag_run
status・start_date・end_date・execution_dateに加え、DAG Runのパラメータ(※)なんかも確認出来ます。
※ Airflowでは、CLIからtrigger_dagオペレータ、もしくはTriggerDagRunOperatorからのキックでパラメータをDAG Runに設定出来ます。
参考:
- https://stackoverflow.com/questions/45945783/how-do-i-use-the-conf-option-in-airflow
- https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
job
Airflowは「ジョブ管理システム」や「ジョブスケジューラ」と呼ばれるソフトウェアですが、ここのjobは、Airlfow内部の処理単位で、「 Jobs are processing items with state and duration that aren't task instances.」らしいです。
具体的には、jobにはScheduler処理や、Backfill処理、Worker処理の三種類があり、jobテーブルには、それらの履歴が残っています。
xcom
Airflowのタスクインスタンス同士が情報をやりとりする時は、このxcomテーブル経由で行われます。
すなわち:
- このテーブルを見ることで、過去のDAG Runのxcomを確認出来ます
- xcom経由でやり取り出来るサイズは、テーブルの型に制限されます
- 大量データはS3/GCSなりに保存して、XCOMにはパスだけを置くのが良さげです
- xcomを使っているDAGの実行が増えるにつれ、テーブルの容量も増えます
Variable
Airflowのコンセプトページにはgetの例しかないですが、テーブルに保存しているだけなので、実はsetも出来ます。
(xcomと同じくサイズには制限があります)