3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【データ基盤構築/AWS Lambda】Pythonを使ってSnowflakeのデータをRDSにinsertする

Posted at

今回の課題

Snowflakeにanimalsというテーブルというテーブルがあるとして、
これをpythonでRDSのMySqlにロードできるようにコードを作成する。

※pythonはAWS Lambdaで実行するとする。

id animal_name updated_at
1 dog 2023-01-01 00:00:00
2 cat 2023-01-12 00:00:00
3 bear 2023-01-23 00:00:00

実装したコード

こういったコードをAWSのLambdaで実行することで、SnowflakeからRDSにデータをinsertすることができた。

import sys
import json
import boto3
import ast
import os
import snowflake.connector
import pymysql
from snowflake.connector import DictCursor
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from datetime import datetime

def lambda_handler(event, context):
    # 今日の日付とSQLを実行する日時を変数で用意
    today          = datetime.now()
    updated_at_str = datetime.strftime(today, '%Y-%m-%d %H:%M:%S')

    ## Snowflakeにアクセスするための情報を用意
    secret_name = "Snowflakeのsecretname"
    region_name = "Snowflakeのリージョン名"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    get_secret_value = client.get_secret_value(
            SecretId=secret_name
    )
    secret = ast.literal_eval(get_secret_value['SecretString']) # ast.literal_eval・・・文字列をリストや辞書に変換
    snowflake_user      = secret['USER']
    snowflake_password  = secret['PASSWORD']
    snowflake_account   = secret['ACCOUNT']
    snowflake_database  = secret['DATABASE']
    snowflake_warehouse = secret['WAREHOUSE']
    snowflake_schema    = secret['SCHEMA']

    try:
        animals = []
        conn = snowflake.connector.connect(
            user=snowflake_user,
            password=snowflake_password,
            account=snowflake_account,
            database=snowflake_database,
            warehouse=snowflake_warehouse,
            schema=snowflake_schema
            )
        # DictCursorはCursorと違って、dictを返す
        cursor = conn.cursor(DictCursor)
    
        ## animals
        cursor.execute("SELECT id, animal_name from u1.animals")
        results = cursor.fetchall()
        for row in results:
            id                = row['NAME']
            ainmal_name = row['ANIMAL_NAME']
            animals.append({
                "id": id,
                "animal_name": animal_name
            })
        
        cursor.close()
        conn.close()

    except Exception as e:
        print(e)
    

    ## get mysql access info
    secret_name = "mysqlのsecretname"
    region_name = "mysqlを使用しているRDSのリージョン名"

    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    get_secret_value = client.get_secret_value(
            SecretId=secret_name
    )
    secret = ast.literal_eval(get_secret_value['SecretString'])
    mysql_user     = secret['username']
    mysql_password = secret['password']
    mysql_host     = secret['host']
    mysql_dbname   = secret['dbname']

    ## update DB
    try:
        engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(
              mysql_user,
              mysql_password,
              mysql_host,
              mysql_dbname
          ))
        with engine.begin() as conn:
          # animals
          for row in items:
            id                = row['id']
            item_name = row['animal_name']
            insert_sql = text("""
                  insert into animals
                  (id, animal_name)
                  values('{}','{}')
                  on duplicate key update
                  animal_name = '{}', updated_at = '{}'
            """.format(id, animal_name, animal_name, updated_at_str))
            conn.execute(insert_sql)

    except Exception as e:
        print(e)

    return {
        'statusCode': 200,
        'body': json.dumps('succeeded')
    }

コードの理解

コピペをしながらコードを作成したので、
きちんと理解するためにコードを細かく分解して理解してみた。

1)Snowflakeに接続するために必要な情報の準備

以下の部分で、pythonのコードでSnowflakeに接続するための情報を準備している。

secret_name = "Snowflakeのsecretname"
region_name = "Snowflakeのリージョン名"

session = boto3.session.Session()
client = session.client(
    service_name='secretsmanager',
    region_name=region_name
)
get_secret_value = client.get_secret_value(
        SecretId=secret_name
)
secret = ast.literal_eval(get_secret_value['SecretString']) # ast.literal_eval・・・文字列をリストや辞書に変換
snowflake_user      = secret['USER']
snowflake_password  = secret['PASSWORD']
snowflake_account   = secret['ACCOUNT']
snowflake_database  = secret['DATABASE']
snowflake_warehouse = secret['WAREHOUSE']
snowflake_schema    = secret['SCHEMA']
  • boto3.session.session()

    • このコードで設定状態を保存することができる。ここで保存した設定を使い回して、client()resource()でAWSのリソースに接続して操作できるようにしている。
      • SnowflakeはAWSのリージョン上で動いているので、この接続設定を行うことでSnowflakeも操作できるようになると理解した。
  • session.client()

    • 上記に記載したとおり、client()で実際にAWSのリソースに接続している。上記では、AWS Secrets Managerに接続している。
  • get_secret_value

    • ここでSecretIdにSnowflakeのsecretnameを指定して、AWS Secrets Managerに登録しているSnowflakeの情報を辞書型で抽出している。
  • ast.literal_eval(get_secret_value['SecretString'])

    • get_secret_value()で取得したデータをリスト型や辞書型に変換している。
      • []でくくられている文字列はリスト型、{}でくくられている文字列は辞書型に変換されるイメージ。
    • get_secret_value()で抽出した辞書型の文字列の、SecretStringというキーに欲しいSnowflakeの情報が入っているので抽出している。

2)Snowflakeに接続してクエリを実行する

1)で用意した情報をもとに、以下のコードで実際にSnowflakeへの接続を実施している。

try:
    animals = []
    conn = snowflake.connector.connect(
        user      = snowflake_user,
        password  = snowflake_password,
        account   = snowflake_account,
        database  = snowflake_database,
        warehouse = snowflake_warehouse,
        schema    = snowflake_schema
        )
    # DictCursorはCursorと違って、dictを返す
    cursor = conn.cursor(DictCursor)

    ## animals
    cursor.execute("select id, animal_name from u1.animals") # クエリを実行
    results = cursor.fetchall()
    for row in results:
        id          = row['ID']
        ainmal_name = row['ANIMAL_NAME']
        animals.append({
            "id": id,
            "animal_name": animal_name
        })
    
    cursor.close()
    conn.close()

except Exception as e:
    print(e)
  • snowflake.connector.connect

    • 1)で準備した情報を使ってSnowflakeに接続。
  • conn.cursor(DictCursor)

    • SQLを実行するためにcursor()を用意する必要があるので用意している。
    • 引数にDictCursorを渡すことで、クエリの結果を辞書型で返すことができる。
      • fetchall()でSQLを実行することで、辞書型でクエリ実行結果を全行抽出している。
  • cursor.close()

    • SQLを実行し終えたら、SQLを実行するために用意したcursor()を閉じる。
  • conn.close()

    • Snowflakeへの接続が不要になったら、snowflake.connector.connectを閉じる。

※mysqlに接続するために必要な情報を用意する方法は1)の、
 Snowflakeに接続するために必要な情報の準備の方法とほぼ同じのため割愛

3)SQLAlchemyでRDSにデータを挿入する

以下の部分で、SQLAlchemyを使用してRDSにデータをinsertしている。

try:
    engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(
          mysql_user,
          mysql_password,
          mysql_host,
          mysql_dbname
      ))
    with engine.begin() as conn:
      # animals
      for row in items:
        id                = row['id']
        item_name = row['animal_name']
        insert_sql = text("""
              insert into animals
              (id, animal_name)
              values('{}','{}')
              on duplicate key update
              animal_name = '{}', updated_at = '{}'
        """.format(id, animal_name, animal_name, updated_at_str))
        conn.execute(insert_sql)

except Exception as e:
    print(e)
  • create_engine('mysql+pymysql://ユーザー名:パスワード@ホスト名:ポート番号/データベース名?charset=utf8')

    • SQLAlchemyを使ってMySQLに接続している。
    • mysql+pymysqlはMySQL用のドライバのこと。
  • engine.begin()

    • トランザクションを開始することができる。
    • conn.execute()で、トランザクション内に記載したクエリを実行している。
      • 上記のクエリでは、insert文を実行している。
      • conn.rollback()で、クエリの実行だけでなく、ロールバックも行えるようだった。

まとめ

上記のようなpythonコードを作成することで、Snowflake→RDSのデータロードができるようになった!

おまけ

出てきた用語について、勉強のために調査をした。

AWS Secrets Managerとは

機密情報を一元管理して、APIで取得できるサービス。
今回のようにAWS Secrets Managerで管理しているパスワードなどを、セキュリティ的に安全にコードの中で使用できる。

以下の記事のように、AWS Secrets Managerでシークレット情報を登録しておくことで、上記の実装したコードに記載したコードのように、python内でシークレット情報を使用することができる。

SQLAlchemyとは

Pythonの中で利用されるORM(Object Relational Mapper)の一つ。
DBの種類によらず同じコードでSQLを実行できるので、DB毎にSQLを書き換えなくても良いので便利。

ORM(Object Relational Mapper)

DBとPythonのオブジェクトを関連づけるライブラリのこと。
Pythonのコードを通常通り書くように、SQL操作ができる。
SQLをクラスとして扱えるようにしたもので、SQLをオブジェクト指向で書くことができる。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?