2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Cloud Composer(Airflow)のDAGからメタデータCloud SQLにアクセスする

Last updated at Posted at 2020-11-26

Cloud Composerでは外部に公開されていない、テナントプロジェクトのCloudSQL(MySQL)にAirflowメタデータが格納されている。(参考

メタデータの内容を見て、レポーティングしたいようなDAGを作る際に、以下のようなコードで、メタデータにアクセスすることができる。

実装のポイント

  • MySqlOperatorはrecordをfetchしてくれないので、MySqlHookを組み合わせる。
  • mysql_conn_idairflow_db を指定する。
sample_task.py
import json
from datetime import timedelta

import airflow
from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator


DAG_NAME = "sample_task"

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "email": ["foo@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=10),
}


dag = DAG(dag_id=DAG_NAME, default_args=default_args, schedule_interval="0 * * * *")


class FetchMySqlOperator(MySqlOperator):
    def execute(self, context):
        hook = MySqlHook(
            mysql_conn_id=self.mysql_conn_id,
        )

        return hook.get_records(self.sql, parameters=self.parameters)


def execute_sql(**context):
    def date_handler(obj):
        if hasattr(obj, "isoformat"):
            return obj.isoformat()
        elif isinstance(obj, timedelta):
            return str(obj)
        elif isinstance(obj, bytes):
            return obj.hex()
        else:
            raise TypeError(
                "Unserializable object {} of type {}".format(obj, type(obj))
            )

    operator = FetchMySqlOperator(
        task_id="foo_mysql",
        mysql_conn_id="airflow_db",
        sql="(メタデータに対するselect文などを書く)",
    )
    result = operator.execute(context)
    # resultに結果が入る


t1 = PythonOperator(
    task_id="exequte_sql", provide_context=True, python_callable=execute_sql, dag=dag
)

t1

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?