1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Composer からCloud SQL へ接続しQueryを投げる方法

Last updated at Posted at 2019-11-19

目的

PythonOperatorの中で何かしらデータに加工処理を行って、その上でCloudSQLへ接続してInsertしたい、といったこともありそうです。
後継の参照サイトに沿って試してみました。

Cloud IAM

  1. 「プロジェクト編集権限」のあるサービスアカウントを作成する。
  2. 上記サービスアカウントのサービストークンを取得する。
  • 後継の「credentials.json」が、サービストークンをリネームしたものです。

Cloud SQL

  • CloudSQLにDBを作成し、適当なテーブルを作成しておく

Cloud Composer

1. 環境を構築する

環境の作成

2. GKEに接続する

   gcloud container clusters get-credentials [YOUR_CLUSTER] \
   --zone [YOUR_ZPNE] \
   --project [YOUR_REGION]

3. Secretを作成する

   kubectl create secret generic service-account-token \
   --from-file=credentials.json=$PATH/credentials.json

Secretの解説及び作成方法等

4. sql-proxyをdeployする

sqlproxy-deployment.yamlを作成しをapplyする。

sqlproxy-deployment.yaml
      apiVersion: extensions/v1beta1
      kind: Deployment
      metadata:
         name: cloudsqlproxy
      spec:
         replicas: 1
         template:
            metadata:
               labels:
                  app: cloudsqlproxy
            spec:
               containers:
               # Make sure to specify image tag in production
               # Check out the newest version in release page
               # https://github.com/GoogleCloudPlatform/cloudsql-proxy/releases
               - image: b.gcr.io/cloudsql-docker/gce-proxy:latest
                # 'Always' if imageTag is 'latest', else set to 'IfNotPresent'
               imagePullPolicy: Always
                 name: cloudsqlproxy
                 command:
                 - /cloud_sql_proxy
                 - -dir=/cloudsql
                 - -instances=[INSTANCE_CONNECTION_NAME]:[DB_NAME]=tcp:0.0.0.0:3306
                 - -credential_file=/credentials/credentials.json
                 # set term_timeout if require graceful handling of shutdown
                 # NOTE: proxy will stop accepting new connections; only wait on existing connections
                 - term_timeout=10s
               lifecycle:
                  preStop:
                     exec:
                       # (optional) add a preStop hook so that termination is delayed
                       # this is required if your server still require new connections (e.g., connection pools)
                       command: ['sleep', '10']
               ports:
               - name: port-db1
                 containerPort: 3306
               volumeMounts:
               - mountPath: /cloudsql
                 name: cloudsql
               - mountPath: /credentials
                 name: service-account-token
               volumes:
               - name: cloudsql
                 emptyDir:
               - name: service-account-token
                 secret:
                  secretName: service-account-token

kubectlでapplyする。

   kubectl apply -f sqlproxy-deployment.yaml

5. cluster内にサービスとして公開する。

sqlproxy-services.yamlを作成しapplyする。

sqlproxy-services.yaml
      apiVersion: v1
      kind: Service
      metadata:
         name: sqlproxy-service-db1
      spec:
         ports:
         - port: 3306
            targetPort: port-db1
         selector:
            app: cloudsqlproxy

kubectlでapplyする。

   kubectl apply -f sqlproxy-services.yaml

6. PythonOperatorの記述

  1. 上記sql-proxyを経由して、pymysqlからCloudSQLに接続する。
  2. PythonOperatorのサンプルはt次の通り。
cloudsql_via_proxy.py
      #!/usr/bin/env python3
      # -*- coding: utf-8 -*-

      from datetime import timedelta, datetime, timezone

      from airflow.models import DAG
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.operators.python_operator import PythonOperator

      import pymysql
      import pymysql.cursors

      import pendulum

      local_tz = pendulum.timezone("Asia/Tokyo")

      default_args = {
         'owner': 'Airflow',
         'start_date': datetime(2019, 1, 1, tzinfo=local_tz),
         # 'end_date': datetime(2020, 12, 31),
         'depends_on_past': True,
         'retries': 1,
         'retry_delay': timedelta(minutes=1),
         'catchup_by_default': False
      }

      dag = DAG('cloudsql_via_proxy', schedule_interval='@once',
               default_args=default_args)


      # 最初にダミーオペレーターを動かしてみる。
      dummy = DummyOperator(
         task_id='dummy',
         trigger_rule='all_success',
         dag=dag,
      )


      def fn_select():
         sql = "SELECT * FROM [CLOUDSQL_TABLE]"
         # sql-proxy経由でCloudSQLへ接続
         # 'host'にpodで追加したsql-proxyの'クラスターIP'を入れます
         connection = pymysql.connect(
            host='[SQLPROXY_CLUSTER_IP]',
            port=3306,
            user='[CLOUDSQL_USER]',
            password='[CLOUDSQL_PASSWORD]',
            db='[CLOUDSQL_DB]',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)

         # SELECT
         with connection.cursor() as cursor:
            cursor.execute(sql)
            dbdata = cursor.fetchall()
            for i in dbdata:
                  print(i)
            connection.commit()


      fn_select = PythonOperator(
         task_id='fn_select',
         python_callable=fn_select,
         dag=dag)


      def fn_insert():
         sql = "INSERT INTO [CLOUDSQL_TABLE] (col1, col2, col3, col4) VALUES (%s, %s, %s, %s)"
         # 上に同じ
         connection = pymysql.connect(
            host='[SQLPROXY_CLUSTER_IP]',
            port=3306,
            user='[CLOUDSQL_USER]',
            password='[CLOUDSQL_PASSWORD]',
            db='[CLOUDSQL_DB]',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)

         # INSERT
         with connection.cursor() as cursor:
            r = cursor.execute(sql, (9999, "insert2", "insert3", "insert4"))
            connection.commit()


      fn_insert = PythonOperator(
         task_id='fn_insert',
         python_callable=fn_insert,
         dag=dag)


      def fn_update():
         sql = "UPDATE [CLOUDSQL_TABLE] SET col2 = %s WHERE id = %s"
         # 上に同じ
         connection = pymysql.connect(
            host='[SQLPROXY_CLUSTER_IP]',
            port=3306,
            user='[CLOUDSQL_USER]',
            password='[CLOUDSQL_PASSWORD]',
            db='[CLOUDSQL_DB]',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)

         # UPDATE
         with connection.cursor() as cursor:
            r = cursor.execute(sql, ("update2", 3))
            connection.commit()


      fn_update = PythonOperator(
         task_id='fn_update',
         python_callable=fn_update,
         dag=dag)

      dummy >> fn_select >> fn_insert >> fn_update

7. cloudsql_via_proxy.pyをdagsにimportする。

@onceの設定なので、直ちに動き出します。

   gcloud composer environments storage dags import \
                            --environment @@@@-dev \
                            --location us-central1 \
                            --source ./cloudsql_via_proxy.py

参照サイト

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?