今回の課題
Snowflakeにanimals
というテーブルというテーブルがあるとして、
これをpythonでRDSのMySqlにロードできるようにコードを作成する。
※pythonはAWS Lambdaで実行するとする。
id | animal_name | updated_at |
---|---|---|
1 | dog | 2023-01-01 00:00:00 |
2 | cat | 2023-01-12 00:00:00 |
3 | bear | 2023-01-23 00:00:00 |
実装したコード
こういったコードをAWSのLambdaで実行することで、SnowflakeからRDSにデータをinsertすることができた。
import sys
import json
import boto3
import ast
import os
import snowflake.connector
import pymysql
from snowflake.connector import DictCursor
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from datetime import datetime
def lambda_handler(event, context):
# 今日の日付とSQLを実行する日時を変数で用意
today = datetime.now()
updated_at_str = datetime.strftime(today, '%Y-%m-%d %H:%M:%S')
## Snowflakeにアクセスするための情報を用意
secret_name = "Snowflakeのsecretname"
region_name = "Snowflakeのリージョン名"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value = client.get_secret_value(
SecretId=secret_name
)
secret = ast.literal_eval(get_secret_value['SecretString']) # ast.literal_eval・・・文字列をリストや辞書に変換
snowflake_user = secret['USER']
snowflake_password = secret['PASSWORD']
snowflake_account = secret['ACCOUNT']
snowflake_database = secret['DATABASE']
snowflake_warehouse = secret['WAREHOUSE']
snowflake_schema = secret['SCHEMA']
try:
animals = []
conn = snowflake.connector.connect(
user=snowflake_user,
password=snowflake_password,
account=snowflake_account,
database=snowflake_database,
warehouse=snowflake_warehouse,
schema=snowflake_schema
)
# DictCursorはCursorと違って、dictを返す
cursor = conn.cursor(DictCursor)
## animals
cursor.execute("SELECT id, animal_name from u1.animals")
results = cursor.fetchall()
for row in results:
id = row['NAME']
ainmal_name = row['ANIMAL_NAME']
animals.append({
"id": id,
"animal_name": animal_name
})
cursor.close()
conn.close()
except Exception as e:
print(e)
## get mysql access info
secret_name = "mysqlのsecretname"
region_name = "mysqlを使用しているRDSのリージョン名"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value = client.get_secret_value(
SecretId=secret_name
)
secret = ast.literal_eval(get_secret_value['SecretString'])
mysql_user = secret['username']
mysql_password = secret['password']
mysql_host = secret['host']
mysql_dbname = secret['dbname']
## update DB
try:
engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(
mysql_user,
mysql_password,
mysql_host,
mysql_dbname
))
with engine.begin() as conn:
# animals
for row in items:
id = row['id']
item_name = row['animal_name']
insert_sql = text("""
insert into animals
(id, animal_name)
values('{}','{}')
on duplicate key update
animal_name = '{}', updated_at = '{}'
""".format(id, animal_name, animal_name, updated_at_str))
conn.execute(insert_sql)
except Exception as e:
print(e)
return {
'statusCode': 200,
'body': json.dumps('succeeded')
}
コードの理解
コピペをしながらコードを作成したので、
きちんと理解するためにコードを細かく分解して理解してみた。
1)Snowflakeに接続するために必要な情報の準備
以下の部分で、pythonのコードでSnowflakeに接続するための情報を準備している。
secret_name = "Snowflakeのsecretname"
region_name = "Snowflakeのリージョン名"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value = client.get_secret_value(
SecretId=secret_name
)
secret = ast.literal_eval(get_secret_value['SecretString']) # ast.literal_eval・・・文字列をリストや辞書に変換
snowflake_user = secret['USER']
snowflake_password = secret['PASSWORD']
snowflake_account = secret['ACCOUNT']
snowflake_database = secret['DATABASE']
snowflake_warehouse = secret['WAREHOUSE']
snowflake_schema = secret['SCHEMA']
-
boto3.session.session()
- このコードで設定状態を保存することができる。ここで保存した設定を使い回して、
client()
やresource()
でAWSのリソースに接続して操作できるようにしている。- SnowflakeはAWSのリージョン上で動いているので、この接続設定を行うことでSnowflakeも操作できるようになると理解した。
- このコードで設定状態を保存することができる。ここで保存した設定を使い回して、
-
session.client()
- 上記に記載したとおり、
client()
で実際にAWSのリソースに接続している。上記では、AWS Secrets Manager
に接続している。
- 上記に記載したとおり、
-
get_secret_value
- ここで
SecretId
にSnowflakeのsecretnameを指定して、AWS Secrets Manager
に登録しているSnowflakeの情報を辞書型で抽出している。
- ここで
-
ast.literal_eval(get_secret_value['SecretString'])
-
get_secret_value()
で取得したデータをリスト型や辞書型に変換している。-
[]
でくくられている文字列はリスト型、{}
でくくられている文字列は辞書型に変換されるイメージ。
-
-
get_secret_value()
で抽出した辞書型の文字列の、SecretString
というキーに欲しいSnowflakeの情報が入っているので抽出している。
-
2)Snowflakeに接続してクエリを実行する
1)
で用意した情報をもとに、以下のコードで実際にSnowflakeへの接続を実施している。
try:
animals = []
conn = snowflake.connector.connect(
user = snowflake_user,
password = snowflake_password,
account = snowflake_account,
database = snowflake_database,
warehouse = snowflake_warehouse,
schema = snowflake_schema
)
# DictCursorはCursorと違って、dictを返す
cursor = conn.cursor(DictCursor)
## animals
cursor.execute("select id, animal_name from u1.animals") # クエリを実行
results = cursor.fetchall()
for row in results:
id = row['ID']
ainmal_name = row['ANIMAL_NAME']
animals.append({
"id": id,
"animal_name": animal_name
})
cursor.close()
conn.close()
except Exception as e:
print(e)
-
snowflake.connector.connect
-
1)
で準備した情報を使ってSnowflakeに接続。
-
-
conn.cursor(DictCursor)
- SQLを実行するために
cursor()
を用意する必要があるので用意している。 - 引数に
DictCursor
を渡すことで、クエリの結果を辞書型で返すことができる。-
fetchall()
でSQLを実行することで、辞書型でクエリ実行結果を全行抽出している。
-
- SQLを実行するために
-
cursor.close()
- SQLを実行し終えたら、SQLを実行するために用意した
cursor()
を閉じる。
- SQLを実行し終えたら、SQLを実行するために用意した
-
conn.close()
- Snowflakeへの接続が不要になったら、
snowflake.connector.connect
を閉じる。
- Snowflakeへの接続が不要になったら、
※mysqlに接続するために必要な情報を用意する方法は1)
の、
Snowflakeに接続するために必要な情報の準備の方法とほぼ同じのため割愛
3)SQLAlchemyでRDSにデータを挿入する
以下の部分で、SQLAlchemyを使用してRDSにデータをinsertしている。
try:
engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(
mysql_user,
mysql_password,
mysql_host,
mysql_dbname
))
with engine.begin() as conn:
# animals
for row in items:
id = row['id']
item_name = row['animal_name']
insert_sql = text("""
insert into animals
(id, animal_name)
values('{}','{}')
on duplicate key update
animal_name = '{}', updated_at = '{}'
""".format(id, animal_name, animal_name, updated_at_str))
conn.execute(insert_sql)
except Exception as e:
print(e)
-
create_engine('mysql+pymysql://ユーザー名:パスワード@ホスト名:ポート番号/データベース名?charset=utf8')
- SQLAlchemyを使ってMySQLに接続している。
-
mysql+pymysql
はMySQL用のドライバのこと。
-
engine.begin()
- トランザクションを開始することができる。
-
conn.execute()
で、トランザクション内に記載したクエリを実行している。- 上記のクエリでは、insert文を実行している。
-
conn.rollback()
で、クエリの実行だけでなく、ロールバックも行えるようだった。
まとめ
上記のようなpythonコードを作成することで、Snowflake→RDSのデータロードができるようになった!
おまけ
出てきた用語について、勉強のために調査をした。
AWS Secrets Managerとは
機密情報を一元管理して、APIで取得できるサービス。
今回のようにAWS Secrets Manager
で管理しているパスワードなどを、セキュリティ的に安全にコードの中で使用できる。
以下の記事のように、AWS Secrets Manager
でシークレット情報を登録しておくことで、上記の実装したコード
に記載したコードのように、python内でシークレット情報を使用することができる。
SQLAlchemyとは
Pythonの中で利用されるORM(Object Relational Mapper)の一つ。
DBの種類によらず同じコードでSQLを実行できるので、DB毎にSQLを書き換えなくても良いので便利。
ORM(Object Relational Mapper)
DBとPythonのオブジェクトを関連づけるライブラリのこと。
Pythonのコードを通常通り書くように、SQL操作ができる。
SQLをクラスとして扱えるようにしたもので、SQLをオブジェクト指向で書くことができる。