LoginSignup
10
7

More than 3 years have passed since last update.

Airflowでカスタムオペレーター

Last updated at Posted at 2019-12-17

この記事はぷりぷりあぷりけーしょんず Advent Calendar 2019の17日目の記事です。

カスタムオペレーターの作成

Airflowを使っていると、処理の内容によっては既存のオペレーターでは再現しきれない処理も出てきます。
そんな時のためにAirflowではカスタムオペレーターを作成することができます。

BaseOperatorと@apply_defaults

sample_custom_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class SampleCustomOperator(BaseOperator): # 1.

    @apply_defaults # 2.
    def __init__(self,
                 table_name,
                 gcp_conn_id, 
                 execute_date,
                 *args,
                 **kwargs):

        super(SampleCustomOperator, self).__init__(*args, **kwargs)
        self.table_name=table_name
        self.gcp_conn_id=gcp_conn_id
        self.execute_date=execute_date
  1. airflow.models.baseoperatorは全てのオペレーターのBaseクラスですので、継承するようにしてください。
  2. @apply_defaultsSampleCustomOperatorを使用する時のインスタンス生成時にdefault_argsを使用して引数を渡すためのデコレーターになりますので、こちらも書くようにしてください。

template_fieldsとtemplate_ext

sample_custom_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class SampleCustomOperator(BaseOperator):

    template_fields = ('table_name', 'execute_date') # 1.
    template_ext = ('.sql',) # 2.

    @apply_defaults
    def __init__(self,
                 table_name,
                 gcp_conn_id, 
                 execute_date,
                 *args,
                 **kwargs):

        super(SampleCustomOperator, self).__init__(*args, **kwargs)
        self.table_name=table_name
        self.gcp_conn_id=gcp_conn_id
        self.execute_date=execute_date
  1. Airflowでは変数やマクロをjinja templateとやらで表現することができます。
    インスタンス生成時に渡す引数として、テンプレートを使いそうなパラメータはtemplate_fieldsに入れておきます。
  2. パラメーターには、bashスクリプトやSQLファイルなどのファイル名を含めることもできます。
    使用されそうなファイルの拡張子はtemplate_extに入れておきます。

※Pythonでは()内に複数要素がある場合はタプルとして認識されますが、
一つの場合は、ただの値か式として認識されます。「,」をつけることで、Pythonがタプルと認識するそうです。

  • マクロの例
変数 説明 
{{ ds }} 実行日(YYYY-MM-DD)
{{ ds_nodash }} 実行日(YYYYMMDD)
{{ prev_ds }} 前回の実行日(YYYY-MM-DD)
{{ prev_ds_nodash }} 前回の実行日(YYYYMMDD)
{{ next_ds }} 次の実行日(YYYY-MM-DD)
{{ next_ds_nodash }} 次の実行日(YYYYMMDD)
{{ dag }} DAGオブジェクト
{{ task }} Taskオブジェクト

他にも様々なマクロがあります。
https://airflow.apache.org/docs/stable/macros.html

実装例の全体

sample_custom_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.bigquery_hook import BigQueryBaseCursor
from airflow.contrib.operators.gcs_to_bq import \
GoogleCloudStorageToBigQueryOperator
import logging

class SampleCustomOperator(BaseOperator):

    template_fields = ('sql', 'execute_date')

    @apply_defaults
    def __init__(self,
                 sql,
                 gcp_conn_id, 
                 execute_date,
                 *args,
                 **kwargs):

        super(SampleCustomOperator, self).__init__(*args, **kwargs)
        self.sql=sql
        self.gcp_conn_id=gcp_conn_id
        self.execute_date=execute_date

    def execute(self, context):
        cursor = self._connect_bigquery()
        self._query(cursor)

    def _connect_bigquery(self):
        bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, use_legacy_sql=False)
        bq_conn = bq_hook.get_conn()
        cursor = bq_conn.cursor()
        return cursor

    def _query(self, cursor):
        self.log.info('Querying SQL: {}'.format(self.sql))
        cursor.execute(self.sql)

        return

Hookはここから探せます。
https://airflow.apache.org/docs/stable/_api/index.html#hooks

参考

最後に

前回のAirflowの基本的なところで書きたかったことが書けました。
Airflowネタしか書いてないけど、とりあえずカレンダー書ききれてよかった😇
あとリファレンスはlatestとstableがあるみたいでたまに内容が違うことがあります。

明日は@shiminori0612さんの記事です!楽しみ♡

10
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
7