はじめに
2023年12月にAuroraServerlessV2でもRDS Data APIが使えるようになりました。
LambdaからAuroraへの接続をRDS ProxyからRDS Data APIに切り替えた話はこちら。
主にコストと構成面の変更についてまとめています。
この記事では、実際にLambdaのPythonからData APIを使用してSQLを実行するコードを紹介します。
DBエンジンはpostgresです。
Data APIの利用設定
こちらの公式ドキュメントを参考に以下を実施してください。
- Secrets ManagerにDBシークレットを作成する(Lambda→RDSの接続で既に作っているものがあればそれを使い回しでOK)
- RDS Data API の有効化
- Lambdaの実行ロールにアクセス権を追加
- この記事記載のコードでは最低限
secretsmanager:GetSecretValue
とrds-data:ExecuteStatement
の権限があればOK
- この記事記載のコードでは最低限
コード
LambdaのPythonからAWSのリソースを操作するので、boto3
を使用します。
boto3
はLambda環境にデフォルトでインストールされています。
SQLの発行をするときはexecute_statement
関数を使用します。
execute_statement
単体実行の場合、変更は自動でコミットされます。
_boto = boto3.Session()
_db_client = _boto.client("rds-data")
sql = "SELECT * FROM users WHERE user_id = :user_id"
parameters = [
{"name": "user_id", "value": {"stringValue": "user1"}},
]
response = _db_client.execute_statement(
resourceArn=os.environ.get("DB_CLUSTER_ARN"),
secretArn=os.environ.get("DB_SECRETS_ARN"),
sql=sql,
database=os.environ.get("DB_NAME"),
includeResultMetadata=True,
parameters=parameters,
formatRecordsAs='JSON',
)
DB接続情報の指定
以下の3つで接続先を指定します。
-
resourceArn
:AuroraクラスターのARNを指定(arn:aws:rds:<リージョン>:<アカウントID>:cluster:<クラスター名>
の形式)。AuroraでDBを作成するとクラスターにインスタンスがぶら下がった形になりますが、インスタンスではなくクラスターのARNを指定してください -
secretArn
:Data APIの利用設定で作成したシークレットのARNを指定 -
database
:接続先のDB名
SQL文とパラメータの指定
sql
とparameters
引数にそれぞれ実行するSQL文とパラメータを渡します。
パラメータ指定は上記コードの:user_id
のように注入したい箇所にキーワード引数を指定し、parameters
に連想配列形式で引数の値を定義します。
value
部分は置き換える値の型に応じてキーの部分を変えます。
isNull
, booleanValue
, longValue
, doubleValue
, stringValue
, blobValue
, arrayValue
から選択します。
整数を指定するときはlongValue
です。
arrayValue
はドキュメント記載の通り指定したらエラーになったので使えていません。
日時型の扱い
Data APIはTIMESTAMPTZ
(timestamp with time zone)をサポートしていないので、postgresのCURRENT_DATE
やCURRENT_TIMESTAMP
が使えません。
またtimezoneもUTCで固定されているので、列定義でinsert_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
などを指定していても、日本時間で登録されません。
そのためINSERT時には以下のようにLambdaのコード側で時刻を生成し、パラメータに指定する必要があります。
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
sql = """
INSERT INTO user(user_id, user_name, insert_date)
VALUES (:user_id, :user_name, :insert_date
"""
parameters = [
{"name": "user_id", "value": {"stringValue": "user1"}},
{"name": "user_name", "value": {"stringValue": "ユーザー1"}},
{"name": "insert_date", "value": {"stringValue": current_timestamp}, "typeHint": "TIMESTAMP"},
]
parameters
はpythonのdatetime
型をサポートしていないので、value
の指定はstringValue
、typeHint
にTIMESTAMP
を指定することでTIMESTAMPを登録できます。
登録日時系のカラムはDEFAULT CURRENT_TIMESTAMP
を指定していることが多いと思うので、書き忘れに注意が必要です。
レスポンスの取り出し
execute_statement
の戻り値はdict型で、デフォルトではrecords
に結果が配列で格納されています。
この記事を書いている途中で知ったのですが、execute_statement
の引数でformatRecordsAs='JSON'
を指定すると、formattedRecords
にJSON文字列で結果セットが返ってくるみたいです。
なのでpythonであればjson.loads()
を使えば簡単に結果が取り出せます。
import json
result = json.loads(response['formattedRecords'])
print (result[0]["user_id"])
わたしは実装時この引数に気づかなかったので、以下に記載するちょっと面倒な処理で値を取り出していました。
完全に蛇足のコードなので読み飛ばしていいです。
formatRecordsAs='JSON'
を指定しない場合
formatRecordsAs='JSON'
を指定していない場合はrecords
に以下の形式で値が入ってきます。
全部のキーが指定されているわけではなく、結果の値の型に応じたisNull
〜arrayValue
のいずれかのキーで値が入っています。
'records': [
[
{
'isNull': True|False,
'booleanValue': True|False,
'longValue': 123,
'doubleValue': 123.0,
'stringValue': 'string',
'blobValue': b'bytes',
'arrayValue': {
'booleanValues': [
True|False,
],
'longValues': [
123,
],
'doubleValues': [
123.0,
],
'stringValues': [
'string',
],
'arrayValues': [
{'... recursive ...'},
]
}
},
],
],
SELECT時にuser_id
(整数型),user_name
(文字型)ならこの構造
'records': [
[
{
'longValue': 123,
},
{
'stringValue': 'user1',
},
],
],
上記の通りrecords
には列名が入っていないので、結果の取り出しが非常にわかりづらくなります。
該当システムはRDS ProxyからData APIに切り替えていて、元はpsycopg2
を使用していました。結果の取り出しはrow['user_id']
のように列名指定で取り出していたので、Data API切り替えによる既存コードの変更を抑えるために、以下のように結果を整形しました。
execute_statement
の実行時にincludeResultMetadata=True
を指定しておくと、型や列名など列のメタ情報が返されます。
'columnMetadata': [
{
'name': 'string', #列名 上記例なら'user_id'
'type': 123,
'typeName': 'string',
'label': 'string',
'schemaName': 'string',
'tableName': 'string',
'isAutoIncrement': True|False,
'isSigned': True|False,
'isCurrency': True|False,
'isCaseSensitive': True|False,
'nullable': 123,
'precision': 123,
'scale': 123,
'arrayBaseColumnType': 123
},
{
'name': 'string', # 上記例なら'user_name'
'type': 123,
'typeName': 'string',
'label': 'string',
'schemaName': 'string',
'tableName': 'string',
'isAutoIncrement': True|False,
'isSigned': True|False,
'isCurrency': True|False,
'isCaseSensitive': True|False,
'nullable': 123,
'precision': 123,
'scale': 123,
'arrayBaseColumnType': 123
},
],
columnMetadata
の配列の順序はrecords
の列の順番(SQL指定の列の順番)と一致するので、以下のように取り出します。
response = _db_client.execute_statement(
resourceArn=os.environ.get("DB_CLUSTER_ARN"),
secretArn=os.environ.get("DB_SECRETS_ARN"),
sql=sql,
database=os.environ.get("DB_NAME"),
includeResultMetadata=True,
parameters=parameters,
)
records = response.get("records", [])
column_meta_data = response.get("columnMetadata", [])
# レスポンスの整形
result = []
for record in records:
row = {}
for i, column in enumerate(record):
if list(column.keys())[0] == "isNull":
row[column_meta_data[i]["name"]] = None
else:
row[column_meta_data[i]["name"]] = list(column.values())[0]
result.append(row)
# result[{'user_id': 123, 'user_name':'user1'},{...},{...}]の構造に整形
if list(column.keys())[0] == "isNull":
row[column_meta_data[i]["name"]] = None
ここの処理は、戻り値がNULL
の場合はstringValue
など該当列の型ではなく'isNull': True
で返ってくるので、None
を詰めるようにしています。
処理速度について
わたしは上記蛇足のコードで実装しているので正確に比較できていないですが、psycopg2
などのDB
ドライバでLambda→RDS(RDS Proxy)の接続をしてSQL実行する方が処理速度は早い気がしています。
Data APIはHTTPでリクエストを飛ばすので、通信分の時間がかかってるのかしらと考えています。
ただLambdaがtimeoutになるような致命的な遅れではないので、あくまで体感です。
JSONで処理するようにすれば多分もうちょっと速くなる。
おわりに
RDS Data APIのPython実装についての記事があまりなかったので記事にしました。
これからData APIを使ってみようという方の参考になれば幸いです。