1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

株式会社オーイーシーAdvent Calendar 2024

Day 18

LambdaからRDS Data APIを使ってAuroraServerlessV2にアクセスする

Last updated at Posted at 2024-12-23

はじめに

2023年12月にAuroraServerlessV2でもRDS Data APIが使えるようになりました。

LambdaからAuroraへの接続をRDS ProxyからRDS Data APIに切り替えた話はこちら。
主にコストと構成面の変更についてまとめています。

この記事では、実際にLambdaのPythonからData APIを使用してSQLを実行するコードを紹介します。
DBエンジンはpostgresです。

Data APIの利用設定

こちらの公式ドキュメントを参考に以下を実施してください。

  • Secrets ManagerにDBシークレットを作成する(Lambda→RDSの接続で既に作っているものがあればそれを使い回しでOK)
  • RDS Data API の有効化
  • Lambdaの実行ロールにアクセス権を追加
    • この記事記載のコードでは最低限 secretsmanager:GetSecretValuerds-data:ExecuteStatementの権限があればOK

コード

LambdaのPythonからAWSのリソースを操作するので、boto3を使用します。
boto3はLambda環境にデフォルトでインストールされています。

SQLの発行をするときはexecute_statement関数を使用します。
execute_statement単体実行の場合、変更は自動でコミットされます。

_boto = boto3.Session()
_db_client = _boto.client("rds-data")

sql = "SELECT * FROM users WHERE user_id = :user_id"
parameters =  [
    {"name": "user_id", "value": {"stringValue": "user1"}},
]

response = _db_client.execute_statement(
    resourceArn=os.environ.get("DB_CLUSTER_ARN"),
    secretArn=os.environ.get("DB_SECRETS_ARN"),
    sql=sql,
    database=os.environ.get("DB_NAME"),
    includeResultMetadata=True,
    parameters=parameters,
    formatRecordsAs='JSON',
)

DB接続情報の指定

以下の3つで接続先を指定します。

  • resourceArn:AuroraクラスターのARNを指定(arn:aws:rds:<リージョン>:<アカウントID>:cluster:<クラスター名>の形式)。AuroraでDBを作成するとクラスターにインスタンスがぶら下がった形になりますが、インスタンスではなくクラスターのARNを指定してください
  • secretArn:Data APIの利用設定で作成したシークレットのARNを指定
  • database:接続先のDB名

SQL文とパラメータの指定

sqlparameters引数にそれぞれ実行するSQL文とパラメータを渡します。

パラメータ指定は上記コードの:user_idのように注入したい箇所にキーワード引数を指定し、parametersに連想配列形式で引数の値を定義します。
value部分は置き換える値の型に応じてキーの部分を変えます。
isNull, booleanValue, longValue, doubleValue, stringValue, blobValue, arrayValueから選択します。
整数を指定するときはlongValueです。
arrayValueはドキュメント記載の通り指定したらエラーになったので使えていません。

日時型の扱い

Data APIはTIMESTAMPTZ(timestamp with time zone)をサポートしていないので、postgresのCURRENT_DATECURRENT_TIMESTAMPが使えません。
またtimezoneもUTCで固定されているので、列定義でinsert_date TIMESTAMP DEFAULT CURRENT_TIMESTAMPなどを指定していても、日本時間で登録されません。
そのためINSERT時には以下のようにLambdaのコード側で時刻を生成し、パラメータに指定する必要があります。

current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]

sql = """
    INSERT INTO user(user_id, user_name, insert_date)
    VALUES (:user_id, :user_name, :insert_date
"""
parameters =  [
    {"name": "user_id", "value": {"stringValue": "user1"}},
    {"name": "user_name", "value": {"stringValue": "ユーザー1"}},
    {"name": "insert_date", "value": {"stringValue": current_timestamp}, "typeHint": "TIMESTAMP"},
]

parametersはpythonのdatetime型をサポートしていないので、valueの指定はstringValuetypeHintTIMESTAMPを指定することでTIMESTAMPを登録できます。
登録日時系のカラムはDEFAULT CURRENT_TIMESTAMPを指定していることが多いと思うので、書き忘れに注意が必要です。

レスポンスの取り出し

execute_statementの戻り値はdict型で、デフォルトではrecordsに結果が配列で格納されています。
この記事を書いている途中で知ったのですが、execute_statementの引数でformatRecordsAs='JSON'を指定すると、formattedRecordsにJSON文字列で結果セットが返ってくるみたいです。

なのでpythonであればjson.loads()を使えば簡単に結果が取り出せます。

import json

result = json.loads(response['formattedRecords'])
print (result[0]["user_id"])

わたしは実装時この引数に気づかなかったので、以下に記載するちょっと面倒な処理で値を取り出していました。
完全に蛇足のコードなので読み飛ばしていいです。

formatRecordsAs='JSON'を指定しない場合

formatRecordsAs='JSON'を指定していない場合はrecordsに以下の形式で値が入ってきます。
全部のキーが指定されているわけではなく、結果の値の型に応じたisNullarrayValueのいずれかのキーで値が入っています。

'records': [
        [
            {
                'isNull': True|False,
                'booleanValue': True|False,
                'longValue': 123,
                'doubleValue': 123.0,
                'stringValue': 'string',
                'blobValue': b'bytes',
                'arrayValue': {
                    'booleanValues': [
                        True|False,
                    ],
                    'longValues': [
                        123,
                    ],
                    'doubleValues': [
                        123.0,
                    ],
                    'stringValues': [
                        'string',
                    ],
                    'arrayValues': [
                        {'... recursive ...'},
                    ]
                }
            },
        ],
    ],

SELECT時にuser_id(整数型),user_name(文字型)ならこの構造

'records': [
        [
            {
                'longValue': 123,
            },
            {
                'stringValue': 'user1',
            },
        ],
    ],

上記の通りrecordsには列名が入っていないので、結果の取り出しが非常にわかりづらくなります。
該当システムはRDS ProxyからData APIに切り替えていて、元はpsycopg2を使用していました。結果の取り出しはrow['user_id']のように列名指定で取り出していたので、Data API切り替えによる既存コードの変更を抑えるために、以下のように結果を整形しました。

execute_statementの実行時にincludeResultMetadata=Trueを指定しておくと、型や列名など列のメタ情報が返されます。

'columnMetadata': [
        {
            'name': 'string', #列名 上記例なら'user_id'
            'type': 123,
            'typeName': 'string',
            'label': 'string',
            'schemaName': 'string',
            'tableName': 'string',
            'isAutoIncrement': True|False,
            'isSigned': True|False,
            'isCurrency': True|False,
            'isCaseSensitive': True|False,
            'nullable': 123,
            'precision': 123,
            'scale': 123,
            'arrayBaseColumnType': 123
        },
        {
            'name': 'string', # 上記例なら'user_name'
            'type': 123,
            'typeName': 'string',
            'label': 'string',
            'schemaName': 'string',
            'tableName': 'string',
            'isAutoIncrement': True|False,
            'isSigned': True|False,
            'isCurrency': True|False,
            'isCaseSensitive': True|False,
            'nullable': 123,
            'precision': 123,
            'scale': 123,
            'arrayBaseColumnType': 123
        },
    ],

columnMetadataの配列の順序はrecordsの列の順番(SQL指定の列の順番)と一致するので、以下のように取り出します。

response = _db_client.execute_statement(
    resourceArn=os.environ.get("DB_CLUSTER_ARN"),
    secretArn=os.environ.get("DB_SECRETS_ARN"),
    sql=sql,
    database=os.environ.get("DB_NAME"),
    includeResultMetadata=True,
    parameters=parameters,
)

records = response.get("records", [])
column_meta_data = response.get("columnMetadata", [])

# レスポンスの整形
result = []

for record in records:
    row = {}
    for i, column in enumerate(record):
        if list(column.keys())[0] == "isNull":
            row[column_meta_data[i]["name"]] = None
        else:
            row[column_meta_data[i]["name"]] = list(column.values())[0]
    result.append(row)

# result[{'user_id': 123, 'user_name':'user1'},{...},{...}]の構造に整形
if list(column.keys())[0] == "isNull":
    row[column_meta_data[i]["name"]] = None

ここの処理は、戻り値がNULLの場合はstringValueなど該当列の型ではなく'isNull': Trueで返ってくるので、Noneを詰めるようにしています。

処理速度について

わたしは上記蛇足のコードで実装しているので正確に比較できていないですが、psycopg2などのDB
ドライバでLambda→RDS(RDS Proxy)の接続をしてSQL実行する方が処理速度は早い気がしています。
Data APIはHTTPでリクエストを飛ばすので、通信分の時間がかかってるのかしらと考えています。
ただLambdaがtimeoutになるような致命的な遅れではないので、あくまで体感です。
JSONで処理するようにすれば多分もうちょっと速くなる。

おわりに

RDS Data APIのPython実装についての記事があまりなかったので記事にしました。
これからData APIを使ってみようという方の参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?