概要
DAGのOperatorでは、デフォルトでairflowのDockerコンテナが利用されるが、executor_configパラメータを使用することで、指定したDockerコンテナで処理を実行することができる。
ここでは、JdbcOperatorでjreのDocker環境を指定する方法について書いた。
以下のページに書いたTeradataに接続するためのDAGを、指定したDockerコンテナで実行する。
関連:【Airflow on Kubernetes】JdbcOperatorの使い方
目次
Version
Dockerイメージを作成
airflowコマンドが実行可能な環境にする必要があるため、airflowのDockerイメージをベースとして、openjdk-11-jdk
のイメージ作成する。
From airflow:latest
RUN apt-get upgrade -y
RUN apt-get install -y openjdk-11-jdk
# Teradata用JDBCドライバー
COPY jdbc/terajdbc4.jar /opt/jdbc/terajdbc4.jar
COPY jdbc/tdgssconfig.jar /opt/jdbc/tdgssconfig.jar
$ sudo docker build -t airflow-jre:11 ./
setup.py
JdbcOperatorを利用するには、airflowコンテナの方にJayDeBeApiとJPype1が必要となる。
setup.pyの以下箇所にpipモジュールを追記する。
def do_setup():
"""Perform the Airflow package setup."""
write_version()
setup(
name='apache-airflow',
# **snip**
install_requires=[
'tzlocal>=1.4,<2.0.0',
'unicodecsv>=0.14.1',
'zope.deprecation>=4.0, <5.0',
+ 'JayDeBeApi>=1.1.1',
+ 'JPype1==0.6.3',
],
build.shを実行してairflowのDockerイメージを作成する。
DAG
executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}}
を指定する。
# -*- coding:utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'sample_teradata',
default_args=default_args,
description='A sample DAG with JDBC',
schedule_interval='0 * * * *',
)
# Lotate
lotate_date = (datetime.date.today() - timedelta(days=29)).strftime("%Y%m%d")
sql_task1 = JdbcOperator(
task_id='sql_delete',
+ executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}},
# Conn Id you created on WEB UI.
jdbc_conn_id='teradata',
sql=['delete from YOUR_TABLE_NAME where log_date < \'{}\''.format(lotate_date)],
params={"db":'YOUR_DB_NAME'},
dag=dag
)
# DeleteInsert
log_date = datetime.date.today().strftime("%Y%m%d")
name = 'Smith'
age = 30
sql_task2 = JdbcOperator(
task_id='sql_insert',
+ executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}},
# Conn Id you created on WEB UI.
jdbc_conn_id='teradata',
sql=[
'delete from YOUR_TABLE_NAME where log_date=\'{}\''.format(log_date),
'insert into YOUR_TABLE_NAME (log_date, name, age) values(\'{}\', \'{}\', {})'.format(log_date, name, age)
],
params={"db":'YOUR_DB_NAME'},
dag=dag
)
sql_task1 >> sql_task2
これでDAGを実行すると、airflow-jre:11コンテナで処理が実行される。