0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS IAM認証でRDSProxy 接続サンプル

0
Posted at
      Code:
        ZipFile: |
          # Placeholder - deploy actual code via CI/CD or aws lambda update-function-code
          def handler(event, context):
              return {"statusCode": 200, "body": "placeholder"}
      VpcConfig:
        SubnetIds:
          - Fn::ImportValue: !Sub "${ProjectName}-${Environment}-PrivateSubnetAId"
          - Fn::ImportValue: !Sub "${ProjectName}-${Environment}-PrivateSubnetCId"
        SecurityGroupIds:
          - Fn::ImportValue: !Sub "${ProjectName}-${Environment}-LambdaSecurityGroupId"
      Environment:
        Variables:
          DB_PROXY_ENDPOINT:
            Fn::ImportValue: !Sub "${ProjectName}-${Environment}-RdsProxyEndpoint"
          DB_NAME:
            Fn::ImportValue: !Sub "${ProjectName}-${Environment}-DBName"
          DB_USER: app_user
          UPLOAD_BUCKET:
            Fn::ImportValue: !Sub "${ProjectName}-${Environment}-UploadBucketName"
          PRESIGNED_URL_EXPIRY: !Ref PresignedUrlExpiry
          ENVIRONMENT: !Ref Environment
          CUSTOM_DOMAIN: !Ref CustomDomainName
"""Database connection helper using IAM authentication and RDS Proxy."""

import os
import time
import logging

import boto3
import psycopg2
import psycopg2.extras

logger = logging.getLogger(__name__)

_rds_client = boto3.client("rds")

# Module-level connection cache (reused across warm Lambda invocations)
_connection = None


def _generate_auth_token() -> str:
    """Generate an IAM authentication token for RDS Proxy."""
    return _rds_client.generate_db_auth_token(
        DBHostname=os.environ["DB_PROXY_ENDPOINT"],
        Port=5432,
        DBUsername=os.environ["DB_USER"],
    )


def get_connection():
    """Get a database connection via RDS Proxy with IAM authentication.

    Reuses an existing connection if available and healthy.
    Retries up to 3 times with exponential backoff on failure.
    """
    global _connection

    if _connection is not None:
        try:
            _connection.cursor().execute("SELECT 1")
            return _connection
        except Exception:
            logger.warning("Existing connection is stale, reconnecting")
            try:
                _connection.close()
            except Exception:
                pass
            _connection = None

    max_retries = 3
    backoff_ms = [100, 200, 400]

    for attempt in range(max_retries):
        try:
            token = _generate_auth_token()
            conn = psycopg2.connect(
                host=os.environ["DB_PROXY_ENDPOINT"],
                port=5432,
                dbname=os.environ["DB_NAME"],
                user=os.environ["DB_USER"],
                password=token,
                sslmode="require",
                connect_timeout=5,
            )
            conn.autocommit = False
            _connection = conn
            return _connection
        except Exception as e:
            logger.error(
                "DB connection attempt %d/%d failed: %s",
                attempt + 1,
                max_retries,
                str(e),
            )
            if attempt < max_retries - 1:
                time.sleep(backoff_ms[attempt] / 1000.0)
            else:
                raise


def close_connection():
    """Close the cached database connection."""
    global _connection
    if _connection is not None:
        try:
            _connection.close()
        except Exception:
            pass
        _connection = None
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?