LoginSignup
7
4

More than 3 years have passed since last update.

Airflowのデータベースの話

Last updated at Posted at 2019-10-08

Airflowはいくつかのコンポーネントから構成されており、その一つにデータベースがあります。
あまり情報がなかったので、簡単にまとめてみました。

Cloud Composerのアーキテクチャー図だと、右上の「Tenant Project」にある「Airflow Database」の部分の話です。

いじってみる

どんなデータがあるかは、動いているAirflowのデータベースにクエリ投げるとわかりやすいです。

GUIから

Airflow画面からは、ヘッダーの「Ad Hoc Query」からデータベースにアクセス出来ます。
ちなみに、クエリー結果をCSVでエクスポートすることも出来ます。
スクリーンショット 2019-10-08 12.40.33.png

puckelさんのDocker

を使っている人は、

docker run -v ホストのディレクトリ:/usr/local/airflow -d -p 8080:8080 puckel/docker-airflow webserver

のように起動すると、ホストのSQLite3クライアントからアクセス出来て便利です。

Cloud Composer

そのうち書きます

DBの実体

Airflowのドキュメント曰く、「you should be able to use any database backend supported as a SqlAlchemy backend」らしいので、データベースの実体は、システム構成次第で、MySQLにもPostgreSQLにもSQLite3にもなりえます。
(ただし「We recommend using MySQL or Postgres」らしいです。)

ちなみに、Cloud ComposerのデータベースはMySQLのようです。
(ドキュメント見つけられませんでしたが、GKEのairflow-sqlproxyのDeployment Detailsから確認出来ます。)

どんあテーブルがあるのか

1.10系にあるテーブルをリストアップしてみました。

ちなみにテーブル作っているのはソースコードでいうとここらへんで、alembicというツールを使って管理しているようです。

テーブル 概要
alembic_version alembicの情報
chart Airlfowデータベースをグラフ化するChart機能の設定
connection 外部システムへの接続管理するconnection
dag DAGの情報
dag_pickle ?
dag_run DAGの実行(dag_run)の情報
import_error ?
job 内部処理(後述)
knonwn_event ?
known_event_type ?
kube_resource_version KubernatesExecutorの何か?
kube_worker_uuid KubernatesExecutorの何か?
log タスクのログ(workerで実行されたコマンド内容など)、および、UI画面での表示の履歴
sla_miss 一定のスケジュールで実行されなかった時(SLA miss)の情報
slot_pool タスクの種類によって同時実行数を制御するpoolの設定
task_fail 失敗したタスクインスタンスの履歴
task_instance タスクインスタンスの履歴
task_reschedule ?
users 認証・認可
variable 設定をAirlfowに保持出来るAirlfow Variable
xcom タスクインスタンス間の情報のやりとりに使うxcom

いくつかのテーブルの話

たくさんありましたが、知っておくと楽しそうなテーブルをいくつかご紹介。

dag_run

status・start_date・end_date・execution_dateに加え、DAG Runのパラメータ(※)なんかも確認出来ます。

※ Airflowでは、CLIからtrigger_dagオペレータ、もしくはTriggerDagRunOperatorからのキックでパラメータをDAG Runに設定出来ます。

参考:
* https://stackoverflow.com/questions/45945783/how-do-i-use-the-conf-option-in-airflow
* https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py

job

Airflowは「ジョブ管理システム」や「ジョブスケジューラ」と呼ばれるソフトウェアですが、ここのjobは、Airlfow内部の処理単位で、「 Jobs are processing items with state and duration that aren't task instances.」らしいです。

具体的には、jobにはScheduler処理や、Backfill処理Worker処理の三種類があり、jobテーブルには、それらの履歴が残っています。

xcom

Airflowのタスクインスタンス同士が情報をやりとりする時は、このxcomテーブル経由で行われます。
すなわち:

  • このテーブルを見ることで、過去のDAG Runのxcomを確認出来ます
  • xcom経由でやり取り出来るサイズは、テーブルの型に制限されます
    • 大量データはS3/GCSなりに保存して、XCOMにはパスだけを置くのが良さげです
  • xcomを使っているDAGの実行が増えるにつれ、テーブルの容量も増えます

Variable

Airflowのコンセプトページにはgetの例しかないですが、テーブルに保存しているだけなので、実はsetも出来ます。
(xcomと同じくサイズには制限があります)

7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4