Cloud Composerでは外部に公開されていない、テナントプロジェクトのCloudSQL(MySQL)にAirflowメタデータが格納されている。(参考)
メタデータの内容を見て、レポーティングしたいようなDAGを作る際に、以下のようなコードで、メタデータにアクセスすることができる。
実装のポイント
- MySqlOperatorはrecordをfetchしてくれないので、MySqlHookを組み合わせる。
-
mysql_conn_id
にairflow_db
を指定する。
sample_task.py
import json
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator
DAG_NAME = "sample_task"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"email": ["foo@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=10),
}
dag = DAG(dag_id=DAG_NAME, default_args=default_args, schedule_interval="0 * * * *")
class FetchMySqlOperator(MySqlOperator):
def execute(self, context):
hook = MySqlHook(
mysql_conn_id=self.mysql_conn_id,
)
return hook.get_records(self.sql, parameters=self.parameters)
def execute_sql(**context):
def date_handler(obj):
if hasattr(obj, "isoformat"):
return obj.isoformat()
elif isinstance(obj, timedelta):
return str(obj)
elif isinstance(obj, bytes):
return obj.hex()
else:
raise TypeError(
"Unserializable object {} of type {}".format(obj, type(obj))
)
operator = FetchMySqlOperator(
task_id="foo_mysql",
mysql_conn_id="airflow_db",
sql="(メタデータに対するselect文などを書く)",
)
result = operator.execute(context)
# resultに結果が入る
t1 = PythonOperator(
task_id="exequte_sql", provide_context=True, python_callable=execute_sql, dag=dag
)
t1