この記事はぷりぷりあぷりけーしょんず Advent Calendar 2019の17日目の記事です。
カスタムオペレーターの作成
Airflowを使っていると、処理の内容によっては既存のオペレーターでは再現しきれない処理も出てきます。
そんな時のためにAirflowではカスタムオペレーターを作成することができます。
BaseOperatorと@apply_defaults
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class SampleCustomOperator(BaseOperator): # 1.
@apply_defaults # 2.
def __init__(self,
table_name,
gcp_conn_id,
execute_date,
*args,
**kwargs):
super(SampleCustomOperator, self).__init__(*args, **kwargs)
self.table_name=table_name
self.gcp_conn_id=gcp_conn_id
self.execute_date=execute_date
-
airflow.models.baseoperator
は全てのオペレーターのBaseクラスですので、継承するようにしてください。
-
@apply_defaults
はSampleCustomOperator
を使用する時のインスタンス生成時にdefault_args
を使用して引数を渡すためのデコレーターになりますので、こちらも書くようにしてください。
template_fieldsとtemplate_ext
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class SampleCustomOperator(BaseOperator):
template_fields = ('table_name', 'execute_date') # 1.
template_ext = ('.sql',) # 2.
@apply_defaults
def __init__(self,
table_name,
gcp_conn_id,
execute_date,
*args,
**kwargs):
super(SampleCustomOperator, self).__init__(*args, **kwargs)
self.table_name=table_name
self.gcp_conn_id=gcp_conn_id
self.execute_date=execute_date
- Airflowでは変数やマクロを
jinja template
とやらで表現することができます。
インスタンス生成時に渡す引数として、テンプレートを使いそうなパラメータはtemplate_fields
に入れておきます。 - パラメーターには、bashスクリプトやSQLファイルなどのファイル名を含めることもできます。
使用されそうなファイルの拡張子はtemplate_ext
に入れておきます。
※Pythonでは()内に複数要素がある場合はタプルとして認識されますが、
一つの場合は、ただの値か式として認識されます。「,」をつけることで、Pythonがタプルと認識するそうです。
- マクロの例
変数 | 説明 |
---|---|
{{ ds }} | 実行日(YYYY-MM-DD) |
{{ ds_nodash }} | 実行日(YYYYMMDD) |
{{ prev_ds }} | 前回の実行日(YYYY-MM-DD) |
{{ prev_ds_nodash }} | 前回の実行日(YYYYMMDD) |
{{ next_ds }} | 次の実行日(YYYY-MM-DD) |
{{ next_ds_nodash }} | 次の実行日(YYYYMMDD) |
{{ dag }} | DAGオブジェクト |
{{ task }} | Taskオブジェクト |
他にも様々なマクロがあります。
https://airflow.apache.org/docs/stable/macros.html
実装例の全体
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.bigquery_hook import BigQueryBaseCursor
from airflow.contrib.operators.gcs_to_bq import \
GoogleCloudStorageToBigQueryOperator
import logging
class SampleCustomOperator(BaseOperator):
template_fields = ('sql', 'execute_date')
@apply_defaults
def __init__(self,
sql,
gcp_conn_id,
execute_date,
*args,
**kwargs):
super(SampleCustomOperator, self).__init__(*args, **kwargs)
self.sql=sql
self.gcp_conn_id=gcp_conn_id
self.execute_date=execute_date
def execute(self, context):
cursor = self._connect_bigquery()
self._query(cursor)
def _connect_bigquery(self):
bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, use_legacy_sql=False)
bq_conn = bq_hook.get_conn()
cursor = bq_conn.cursor()
return cursor
def _query(self, cursor):
self.log.info('Querying SQL: {}'.format(self.sql))
cursor.execute(self.sql)
return
Hookはここから探せます。
https://airflow.apache.org/docs/stable/_api/index.html#hooks
参考
最後に
前回のAirflowの基本的なところで書きたかったことが書けました。
Airflowネタしか書いてないけど、とりあえずカレンダー書ききれてよかった😇
あとリファレンスはlatestとstableがあるみたいでたまに内容が違うことがあります。
明日は@shiminori0612さんの記事です!楽しみ♡