LoginSignup
3
0

More than 3 years have passed since last update.

【Airflow on Kubernetes】JdbcOperatorの使い方

Last updated at Posted at 2019-08-22

概要

AirflowでJdbcOperatorを利用してTeradataにアクセスする。

目次

Version

Requirements

Container

airflowのDockerに、前述のRequirementsをすべて追加する。
以下は一例なので、別の方法でもよい。

Dockerfileに以下を追記する。(別途jreのDockerイメージを作成し、DAGでexecutor_configのKubernetesExecutorにimageを指定する方法でもよい)
また、TeradataのDriverであるterajdbc4.jarとtdgssconfig.jarを用意する。

# Teradata
RUN apt-get install -y openjdk-11-jdk
COPY jdbc/terajdbc4.jar /opt/jdbc/terajdbc4.jar
COPY jdbc/tdgssconfig.jar /opt/jdbc/tdgssconfig.jar

setup.pyの以下箇所にpipモジュールを追記する。

setup.py
def do_setup():
    """Perform the Airflow package setup."""
    write_version()
    setup(
        name='apache-airflow',
        # **snip**
        install_requires=[
            'tzlocal>=1.4,<2.0.0',
            'unicodecsv>=0.14.1',
            'zope.deprecation>=4.0, <5.0',
+           'JayDeBeApi>=1.1.1',
+           'JPype1==0.6.3',
        ],

build

build.shを実行してairflowのDockerイメージを作成する。

Connectionを追加

WEB UIでConnectionの設定をする。
https://{YOUR_HOST}/connection/list/

Screen Shot 2019-08-22 at 12.55.34.png

Keys Values DESC
Conn Id * teradata 任意の名前
Conn Type Jdbc Connection
Connection URL jdbc:teradata://{TERADATA_HOST}, CHARSET=UTF8 LDAPの場合: jdbc:teradata://{TERADATA_HOST}/LOGMECH=LDAP
Login {YOUR_USERNAME}
Password {YOUR_PASSWORD}
Driver Path /opt/jdbc/tdgssconfig.jar,/opt/jdbc/terajdbc4.jar 複数ある場合はカンマ区切り
Driver Class com.teradata.jdbc.TeraDriver

DAG

例として、以下の処理を実行するDAGを作成する。

  1. log_dateカラムが30日以上前のレコードをDelete
  2. log_dateが今日の日付のレコードをDelete/Insert
  • 以下のカラムを持つテーブルを用意する
    • log_date DATE FORMAT 'YYYYMMDD' NOT NULL
    • name VARCHAR(50) CHARACTER SET UNICODE NOT CASESPECIFIC
    • age INTEGER
# -*- coding:utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
import datetime
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    # Trueの場合、前回の実行が正常終了していない場合は実行しない
    'depends_on_past': False,
    # 指定した日時から現在時刻までに実行されていないjobがすべて直ちに実行される
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    # 処理が失敗した時にメール通知するか
    'email_on_failure': False,
    # リトライ実行時にメール通知をするか
    'email_on_retry': False,
    # リトライ回数
    'retries': 1,
    # リトライ間隔
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    # DAG名
    'sample_teradata',
    default_args=default_args,
    description='A sample DAG with JDBC',
    # 実行スケジュール
    schedule_interval='0 * * * *',
)

# Lotate
lotate_date = (datetime.date.today() - timedelta(days=29)).strftime("%Y%m%d")
sql_task1 = JdbcOperator(
    task_id='sql_delete',
    # 設定したConn Idを入力することで接続情報がセットされる
    jdbc_conn_id='teradata',
    sql=['delete from YOUR_TABLE_NAME where log_date < \'{}\''.format(lotate_date)],
    params={"db":'YOUR_DB_NAME'},
    dag=dag
)

# DeleteInsert
log_date = datetime.date.today().strftime("%Y%m%d")
name = 'Smith'
age = 30

sql_task2 = JdbcOperator(
    task_id='sql_insert',
    # 設定したConn Idを入力することで接続情報がセットされる
    jdbc_conn_id='teradata',
    sql=[
        'delete from YOUR_TABLE_NAME where log_date=\'{}\''.format(log_date),
        'insert into YOUR_TABLE_NAME (log_date, name, age) values(\'{}\', \'{}\', {})'.format(log_date, name, age)
    ],
    params={"db":'YOUR_DB_NAME'},
    dag=dag
)

sql_task1 >> sql_task2

Graph View

Screen Shot 2019-08-22 at 14.23.10.png

Pod

DAG実行中のPodの挙動

$ sudo kubectl get pod -w
airflow-58ccbb7c66-p9ckz                                   2/2     Running             0          111s
postgres-airflow-84dfd85977-6tpdh                          1/1     Running             0          7d17h
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     Pending              0          0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     Pending              0          0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     ContainerCreating    0          0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   1/1     Running              0          2s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     Completed            0          8s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     Terminating          0          10s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec   0/1     Terminating          0          10s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     Pending              0          0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     Pending              0          0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     ContainerCreating    0          0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   1/1     Running              0          1s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     Completed            0          8s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     Terminating          0          10s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38   0/1     Terminating          0          10s

確認

Teradataで実際にデータがDelete、Insertされていることが確認できればOK。

関連記事

参考

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0