概要
AirflowでJdbcOperatorを利用してTeradataにアクセスする。
目次
Version
Requirements
- openjdk
- openjdk-11-jdk
- pip
- Teradata Driver
- terajdbc4.jar
- tdgssconfig.jar
Container
airflowのDockerに、前述のRequirementsをすべて追加する。
以下は一例なので、別の方法でもよい。
Dockerfileに以下を追記する。(別途jreのDockerイメージを作成し、DAGでexecutor_configのKubernetesExecutorにimageを指定する方法でもよい)
また、TeradataのDriverであるterajdbc4.jarとtdgssconfig.jarを用意する。
# Teradata
RUN apt-get install -y openjdk-11-jdk
COPY jdbc/terajdbc4.jar /opt/jdbc/terajdbc4.jar
COPY jdbc/tdgssconfig.jar /opt/jdbc/tdgssconfig.jar
setup.pyの以下箇所にpipモジュールを追記する。
setup.py
def do_setup():
"""Perform the Airflow package setup."""
write_version()
setup(
name='apache-airflow',
# **snip**
install_requires=[
'tzlocal>=1.4,<2.0.0',
'unicodecsv>=0.14.1',
'zope.deprecation>=4.0, <5.0',
+ 'JayDeBeApi>=1.1.1',
+ 'JPype1==0.6.3',
],
build
build.shを実行してairflowのDockerイメージを作成する。
Connectionを追加
WEB UIでConnectionの設定をする。
https://{YOUR_HOST}/connection/list/
Keys | Values | DESC |
---|---|---|
Conn Id * | teradata | 任意の名前 |
Conn Type | Jdbc Connection | |
Connection URL | jdbc:teradata://{TERADATA_HOST}, CHARSET=UTF8 | LDAPの場合: jdbc:teradata://{TERADATA_HOST}/LOGMECH=LDAP
|
Login | {YOUR_USERNAME} | |
Password | {YOUR_PASSWORD} | |
Driver Path | /opt/jdbc/tdgssconfig.jar,/opt/jdbc/terajdbc4.jar | 複数ある場合はカンマ区切り |
Driver Class | com.teradata.jdbc.TeraDriver |
DAG
例として、以下の処理を実行するDAGを作成する。
- log_dateカラムが30日以上前のレコードをDelete
- log_dateが今日の日付のレコードをDelete/Insert
- 以下のカラムを持つテーブルを用意する
-
log_date
DATE FORMAT 'YYYYMMDD' NOT NULL -
name
VARCHAR(50) CHARACTER SET UNICODE NOT CASESPECIFIC -
age
INTEGER
-
# -*- coding:utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
# Trueの場合、前回の実行が正常終了していない場合は実行しない
'depends_on_past': False,
# 指定した日時から現在時刻までに実行されていないjobがすべて直ちに実行される
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
# 処理が失敗した時にメール通知するか
'email_on_failure': False,
# リトライ実行時にメール通知をするか
'email_on_retry': False,
# リトライ回数
'retries': 1,
# リトライ間隔
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
# DAG名
'sample_teradata',
default_args=default_args,
description='A sample DAG with JDBC',
# 実行スケジュール
schedule_interval='0 * * * *',
)
# Lotate
lotate_date = (datetime.date.today() - timedelta(days=29)).strftime("%Y%m%d")
sql_task1 = JdbcOperator(
task_id='sql_delete',
# 設定したConn Idを入力することで接続情報がセットされる
jdbc_conn_id='teradata',
sql=['delete from YOUR_TABLE_NAME where log_date < \'{}\''.format(lotate_date)],
params={"db":'YOUR_DB_NAME'},
dag=dag
)
# DeleteInsert
log_date = datetime.date.today().strftime("%Y%m%d")
name = 'Smith'
age = 30
sql_task2 = JdbcOperator(
task_id='sql_insert',
# 設定したConn Idを入力することで接続情報がセットされる
jdbc_conn_id='teradata',
sql=[
'delete from YOUR_TABLE_NAME where log_date=\'{}\''.format(log_date),
'insert into YOUR_TABLE_NAME (log_date, name, age) values(\'{}\', \'{}\', {})'.format(log_date, name, age)
],
params={"db":'YOUR_DB_NAME'},
dag=dag
)
sql_task1 >> sql_task2
Graph View
Pod
DAG実行中のPodの挙動
$ sudo kubectl get pod -w
airflow-58ccbb7c66-p9ckz 2/2 Running 0 111s
postgres-airflow-84dfd85977-6tpdh 1/1 Running 0 7d17h
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 Pending 0 0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 Pending 0 0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 ContainerCreating 0 0s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 1/1 Running 0 2s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 Completed 0 8s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 Terminating 0 10s
sampleteradatasqldelete-cbdc29dad6814d11a721d9fe9416a3ec 0/1 Terminating 0 10s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 Pending 0 0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 Pending 0 0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 ContainerCreating 0 0s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 1/1 Running 0 1s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 Completed 0 8s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 Terminating 0 10s
sampleteradatasqlinsert-db4b5d25ad6f47c691d3992f62a48d38 0/1 Terminating 0 10s
確認
Teradataで実際にデータがDelete、Insertされていることが確認できればOK。