はじめに
AWSではDBの接続情報やAPIキーなどはSecretsManagerで管理されていることが多いが、これをLambda関数から取得してRedshiftへ接続しSQLクエリを発行するアーキテクチャと実装コード例を記事にしました。
使うAWSサービス
・AWS SecretsManager
・AWS Lambda
・Amazon Redshift
実現方法
アーキテクチャ図
Lambda関数コード
※前提
SecretsManagerには「DBクラスタ名、データベース名、ユーザ名」が保存されている場合
①SecretsManagerへの認証情報要求、②認証情報の返信(取得)
sample.py
session = boto3.session.Session()
client = session.client(
service_name="secretsmanager",
region_name=【リージョン名】# シークレットマネージャのリージョン名を指定する(Lambdaの環境変数などから取得)
)
try:
# ①SecretsManagerへの認証情報要求
get_secret_value_response = client.get_secret_value(
SecretId=【シークレットID】# 取得するシークレットIDを指定する(Lambdaの環境変数などから取得)
)
except ClientError as e:
raise e
else:
# ②認証情報の返信(取得)
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(
get_secret_value_response['SecretBinary']
)
③認証情報で接続確立、④SQLクエリ発行、⑤クエリ結果返信(取得)
sample.py
data_client = boto3.client('redshift-data')
# ③認証情報で接続確立、④SQLクエリ発行
# 個々の接続情報の指定にシークレットマネージャから取得した値を使用する
result = data_client.execute_statement(
ClusterIdentifier=secret["dbClusterIdentifier"],
Database=secret["database"],
DbUser=secret["username"],
Sql=【SQL文字列】,# 目的のSQLクエリを文字列で渡す
)
# 実行IDを取得
id = result['Id']
# クエリが終わるのを待つ
statement = ''
status = ''
while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
statement = data_client.describe_statement(Id=id
status = statement['Status']
time.sleep(1)
# ③クエリ結果返信(取得)
if status == 'FINISHED':
if int(statement['ResultSize']) > 0:
# selectの結果なども戻り値があるSQLの結果の処理
result = data_client.get_statement_result(Id=id)
else:
# 戻り値がないSQLの結果に対する処理
elif status == 'FAILED':
# 処理が失敗時に何か処理
elif status == 'ABORTED':
# 処理を中断したときの処理
さいごに
Lambdaは環境変数の設定ができるのでついついそこで様々な変数を管理をしたい気持ちになりますが、他のLambdaで共通で使用している設定情報やより機密性の高いDBの認証情報などはSecretsManagerで一元管理して利用したほうがよいです。