目的
PythonOperatorの中で何かしらデータに加工処理を行って、その上でCloudSQLへ接続してInsertしたい、といったこともありそうです。
後継の参照サイトに沿って試してみました。
- なお、Google Cloud Sql Operators というOperatorはあります。
公式ドキュメント
Cloud IAM
- 「プロジェクト編集権限」のあるサービスアカウントを作成する。
- 上記サービスアカウントのサービストークンを取得する。
- 後継の「credentials.json」が、サービストークンをリネームしたものです。
Cloud SQL
- CloudSQLにDBを作成し、適当なテーブルを作成しておく
Cloud Composer
1. 環境を構築する
- pymysqlが必要。
Python 依存関係のインストール
2. GKEに接続する
gcloud container clusters get-credentials [YOUR_CLUSTER] \
--zone [YOUR_ZPNE] \
--project [YOUR_REGION]
3. Secretを作成する
kubectl create secret generic service-account-token \
--from-file=credentials.json=$PATH/credentials.json
4. sql-proxyをdeployする
sqlproxy-deployment.yamlを作成しをapplyする。
sqlproxy-deployment.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: cloudsqlproxy
spec:
replicas: 1
template:
metadata:
labels:
app: cloudsqlproxy
spec:
containers:
# Make sure to specify image tag in production
# Check out the newest version in release page
# https://github.com/GoogleCloudPlatform/cloudsql-proxy/releases
- image: b.gcr.io/cloudsql-docker/gce-proxy:latest
# 'Always' if imageTag is 'latest', else set to 'IfNotPresent'
imagePullPolicy: Always
name: cloudsqlproxy
command:
- /cloud_sql_proxy
- -dir=/cloudsql
- -instances=[INSTANCE_CONNECTION_NAME]:[DB_NAME]=tcp:0.0.0.0:3306
- -credential_file=/credentials/credentials.json
# set term_timeout if require graceful handling of shutdown
# NOTE: proxy will stop accepting new connections; only wait on existing connections
- term_timeout=10s
lifecycle:
preStop:
exec:
# (optional) add a preStop hook so that termination is delayed
# this is required if your server still require new connections (e.g., connection pools)
command: ['sleep', '10']
ports:
- name: port-db1
containerPort: 3306
volumeMounts:
- mountPath: /cloudsql
name: cloudsql
- mountPath: /credentials
name: service-account-token
volumes:
- name: cloudsql
emptyDir:
- name: service-account-token
secret:
secretName: service-account-token
kubectlでapplyする。
kubectl apply -f sqlproxy-deployment.yaml
5. cluster内にサービスとして公開する。
sqlproxy-services.yamlを作成しapplyする。
sqlproxy-services.yaml
apiVersion: v1
kind: Service
metadata:
name: sqlproxy-service-db1
spec:
ports:
- port: 3306
targetPort: port-db1
selector:
app: cloudsqlproxy
kubectlでapplyする。
kubectl apply -f sqlproxy-services.yaml
6. PythonOperatorの記述
- 上記sql-proxyを経由して、pymysqlからCloudSQLに接続する。
- PythonOperatorのサンプルはt次の通り。
cloudsql_via_proxy.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import timedelta, datetime, timezone
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
import pymysql
import pymysql.cursors
import pendulum
local_tz = pendulum.timezone("Asia/Tokyo")
default_args = {
'owner': 'Airflow',
'start_date': datetime(2019, 1, 1, tzinfo=local_tz),
# 'end_date': datetime(2020, 12, 31),
'depends_on_past': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'catchup_by_default': False
}
dag = DAG('cloudsql_via_proxy', schedule_interval='@once',
default_args=default_args)
# 最初にダミーオペレーターを動かしてみる。
dummy = DummyOperator(
task_id='dummy',
trigger_rule='all_success',
dag=dag,
)
def fn_select():
sql = "SELECT * FROM [CLOUDSQL_TABLE]"
# sql-proxy経由でCloudSQLへ接続
# 'host'にpodで追加したsql-proxyの'クラスターIP'を入れます
connection = pymysql.connect(
host='[SQLPROXY_CLUSTER_IP]',
port=3306,
user='[CLOUDSQL_USER]',
password='[CLOUDSQL_PASSWORD]',
db='[CLOUDSQL_DB]',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# SELECT
with connection.cursor() as cursor:
cursor.execute(sql)
dbdata = cursor.fetchall()
for i in dbdata:
print(i)
connection.commit()
fn_select = PythonOperator(
task_id='fn_select',
python_callable=fn_select,
dag=dag)
def fn_insert():
sql = "INSERT INTO [CLOUDSQL_TABLE] (col1, col2, col3, col4) VALUES (%s, %s, %s, %s)"
# 上に同じ
connection = pymysql.connect(
host='[SQLPROXY_CLUSTER_IP]',
port=3306,
user='[CLOUDSQL_USER]',
password='[CLOUDSQL_PASSWORD]',
db='[CLOUDSQL_DB]',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# INSERT
with connection.cursor() as cursor:
r = cursor.execute(sql, (9999, "insert2", "insert3", "insert4"))
connection.commit()
fn_insert = PythonOperator(
task_id='fn_insert',
python_callable=fn_insert,
dag=dag)
def fn_update():
sql = "UPDATE [CLOUDSQL_TABLE] SET col2 = %s WHERE id = %s"
# 上に同じ
connection = pymysql.connect(
host='[SQLPROXY_CLUSTER_IP]',
port=3306,
user='[CLOUDSQL_USER]',
password='[CLOUDSQL_PASSWORD]',
db='[CLOUDSQL_DB]',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# UPDATE
with connection.cursor() as cursor:
r = cursor.execute(sql, ("update2", 3))
connection.commit()
fn_update = PythonOperator(
task_id='fn_update',
python_callable=fn_update,
dag=dag)
dummy >> fn_select >> fn_insert >> fn_update
7. cloudsql_via_proxy.pyをdagsにimportする。
@onceの設定なので、直ちに動き出します。
gcloud composer environments storage dags import \
--environment @@@@-dev \
--location us-central1 \
--source ./cloudsql_via_proxy.py
参照サイト