LoginSignup
3
1

More than 3 years have passed since last update.

【Airflow on Kubernetes】DAGを指定したDockerコンテナで実行する

Posted at

概要

DAGのOperatorでは、デフォルトでairflowのDockerコンテナが利用されるが、executor_configパラメータを使用することで、指定したDockerコンテナで処理を実行することができる。

ここでは、JdbcOperatorでjreのDocker環境を指定する方法について書いた。
以下のページに書いたTeradataに接続するためのDAGを、指定したDockerコンテナで実行する。

関連:【Airflow on Kubernetes】JdbcOperatorの使い方

目次

Version

Dockerイメージを作成

airflowコマンドが実行可能な環境にする必要があるため、airflowのDockerイメージをベースとして、openjdk-11-jdkのイメージ作成する。

Dockerfile
From airflow:latest

RUN apt-get upgrade -y
RUN apt-get install -y openjdk-11-jdk
# Teradata用JDBCドライバー
COPY jdbc/terajdbc4.jar /opt/jdbc/terajdbc4.jar
COPY jdbc/tdgssconfig.jar /opt/jdbc/tdgssconfig.jar
$ sudo docker build -t airflow-jre:11 ./

setup.py

JdbcOperatorを利用するには、airflowコンテナの方にJayDeBeApiとJPype1が必要となる。

setup.pyの以下箇所にpipモジュールを追記する。

setup.py
def do_setup():
    """Perform the Airflow package setup."""
    write_version()
    setup(
        name='apache-airflow',
        # **snip**
        install_requires=[
            'tzlocal>=1.4,<2.0.0',
            'unicodecsv>=0.14.1',
            'zope.deprecation>=4.0, <5.0',
+           'JayDeBeApi>=1.1.1',
+           'JPype1==0.6.3',
        ],

build.shを実行してairflowのDockerイメージを作成する。

DAG

executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}}を指定する。

# -*- coding:utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'sample_teradata',
    default_args=default_args,
    description='A sample DAG with JDBC',
    schedule_interval='0 * * * *',
)

# Lotate
lotate_date = (datetime.date.today() - timedelta(days=29)).strftime("%Y%m%d")
sql_task1 = JdbcOperator(
    task_id='sql_delete',
+   executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}},
    # Conn Id you created on WEB UI.
    jdbc_conn_id='teradata',
    sql=['delete from YOUR_TABLE_NAME where log_date < \'{}\''.format(lotate_date)],
    params={"db":'YOUR_DB_NAME'},
    dag=dag
)

# DeleteInsert
log_date = datetime.date.today().strftime("%Y%m%d")
name = 'Smith'
age = 30

sql_task2 = JdbcOperator(
    task_id='sql_insert',
+   executor_config={'KubernetesExecutor': {'image': 'airflow-jre:11'}},
    # Conn Id you created on WEB UI.
    jdbc_conn_id='teradata',
    sql=[
        'delete from YOUR_TABLE_NAME where log_date=\'{}\''.format(log_date),
        'insert into YOUR_TABLE_NAME (log_date, name, age) values(\'{}\', \'{}\', {})'.format(log_date, name, age)
    ],
    params={"db":'YOUR_DB_NAME'},
    dag=dag
)

sql_task1 >> sql_task2

これでDAGを実行すると、airflow-jre:11コンテナで処理が実行される。

参考

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1