Help us understand the problem. What is going on with this article?

Airflowのデータベースの話

Airflowはいくつかのコンポーネントから構成されており、その一つにデータベースがあります。
あまり情報がなかったので、簡単にまとめてみました。

Cloud Composerのアーキテクチャー図だと、右上の「Tenant Project」にある「Airflow Database」の部分の話です。

いじってみる

どんなデータがあるかは、動いているAirflowのデータベースにクエリ投げるとわかりやすいです。

GUIから

Airflow画面からは、ヘッダーの「Ad Hoc Query」からデータベースにアクセス出来ます。
ちなみに、クエリー結果をCSVでエクスポートすることも出来ます。
スクリーンショット 2019-10-08 12.40.33.png

puckelさんのDocker

https://github.com/puckel/docker-airflow

を使っている人は、

docker run -v ホストのディレクトリ:/usr/local/airflow -d -p 8080:8080 puckel/docker-airflow webserver

のように起動すると、ホストのSQLite3クライアントからアクセス出来て便利です。

Cloud Composer

そのうち書きます

DBの実体

Airflowのドキュメント曰く、「you should be able to use any database backend supported as a SqlAlchemy backend」らしいので、データベースの実体は、システム構成次第で、MySQLにもPostgreSQLにもSQLite3にもなりえます。
(ただし「We recommend using MySQL or Postgres」らしいです。)

ちなみに、Cloud ComposerのデータベースはMySQLのようです。
(ドキュメント見つけられませんでしたが、GKEのairflow-sqlproxyのDeployment Detailsから確認出来ます。)

どんあテーブルがあるのか

1.10系にあるテーブルをリストアップしてみました。

ちなみにテーブル作っているのはソースコードでいうとここらへんで、alembicというツールを使って管理しているようです。

テーブル 概要
alembic_version alembicの情報
chart Airlfowデータベースをグラフ化するChart機能の設定
connection 外部システムへの接続管理するconnection
dag DAGの情報
dag_pickle ?
dag_run DAGの実行(dag_run)の情報
import_error ?
job 内部処理(後述)
knonwn_event ?
known_event_type ?
kube_resource_version KubernatesExecutorの何か?
kube_worker_uuid KubernatesExecutorの何か?
log タスクのログ(workerで実行されたコマンド内容など)、および、UI画面での表示の履歴
sla_miss 一定のスケジュールで実行されなかった時(SLA miss)の情報
slot_pool タスクの種類によって同時実行数を制御するpoolの設定
task_fail 失敗したタスクインスタンスの履歴
task_instance タスクインスタンスの履歴
task_reschedule ?
users 認証・認可
variable 設定をAirlfowに保持出来るAirlfow Variable
xcom タスクインスタンス間の情報のやりとりに使うxcom

いくつかのテーブルの話

たくさんありましたが、知っておくと楽しそうなテーブルをいくつかご紹介。

dag_run

status・start_date・end_date・execution_dateに加え、DAG Runのパラメータ(※)なんかも確認出来ます。

※ Airflowでは、CLIからtrigger_dagオペレータ、もしくはTriggerDagRunOperatorからのキックでパラメータをDAG Runに設定出来ます。

参考:
* https://stackoverflow.com/questions/45945783/how-do-i-use-the-conf-option-in-airflow
* https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py

job

Airflowは「ジョブ管理システム」や「ジョブスケジューラ」と呼ばれるソフトウェアですが、ここのjobは、Airlfow内部の処理単位で、「 Jobs are processing items with state and duration that aren't task instances.」らしいです。

具体的には、jobにはScheduler処理や、Backfill処理Worker処理の三種類があり、jobテーブルには、それらの履歴が残っています。

xcom

Airflowのタスクインスタンス同士が情報をやりとりする時は、このxcomテーブル経由で行われます。
すなわち:

  • このテーブルを見ることで、過去のDAG Runのxcomを確認出来ます
  • xcom経由でやり取り出来るサイズは、テーブルの型に制限されます
    • 大量データはS3/GCSなりに保存して、XCOMにはパスだけを置くのが良さげです
  • xcomを使っているDAGの実行が増えるにつれ、テーブルの容量も増えます

Variable

Airflowのコンセプトページにはgetの例しかないですが、テーブルに保存しているだけなので、実はsetも出来ます。
(xcomと同じくサイズには制限があります)

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away