5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Aurora DSQLへデータベースロールでアクセスしよう! ~ハマりポイントを添えて~

Posted at

こんにちは、ふくちです。

データベースロールを用いてAurora DSQLへ接続しようとした際にハマりポイントが山ほどあったので、備忘録として纏めておきます。

全体の流れ

  1. DSQLへ接続するためのIAMロールとポリシーを作成する
  2. Aurora DSQLを作成する
  3. DSQLへ、Admin権限でログインする
    3.1 データベースロールを作成する
    3.2 データスキーマやテーブルを作成する
    3.3 データベースロールがスキーマやテーブルへアクセスする権限を付与する
    3.4 (きちんと作成できたか確認する)
  4. DSQLへ、データベースロールでアクセスする
    4.1 CLIを用いる場合
    4.2 SDKを用いる場合

一応、ローカル環境での動作確認がメインとなっていますが、CloudShellでもできると思います。
ローカルの実行環境は以下の通りです。VSCodeのターミナルで試してます。

  • MacBook Air M2
  • aws-cli/2.28.5
  • Python/3.13.5 (3.8以降であればOK)
  • psql/14.19 (14以降であればOK)
  • aws configure設定済み

1. DSQLへ接続するためのIAMロールとポリシーを作成する

まずは以下のポリシーを作成します。リージョンは任意ですが、ここではバージニア北部リージョン(us-east-1)を用います。

また、権限を絞りたい場合は、ResourceのARNをこの後作成するAuroraクラスターのARNに置き換えてください。

aurora-dsql-access-policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "dsql:DbConnect",
            "Resource": "arn:aws:dsql:us-east-1:<AccountId>:cluster/*"
        }
    ]
}

その後、IAMロールを作成します。
名前は何でも良いですが、ここでは aurora-dsql-access-role としておきます。
信頼関係に以下を設定します。また、ポリシーは上記で作成した aurora-dsql-access-policy を指定します。

信頼関係
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<AccountId>:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}

このIAMロールは、この後データベースロールを用いる際と、DSQLへ接続する際に使います。

2. Aurora DSQLを作成する

コンソールかIaCでAurora DSQLを作成します。
今回は単一リージョンのみの構成とします。

エンドポイントをこの後使うことになるので、メモ帳にでもコピーしておくと良いかもしれません。
image.png

3. DSQLへ、Admin権限でログインする

まずはAdmin権限でのログインです。

前提として、DSQLへの接続にはIAMを用います。
クラスターに接続する許可を持ったIAM IDを用いて、署名付きIAM認証トークンを生成し、このトークンを用いて接続するという流れです。

Admin権限、すなわちAdminのデータベースロールはDSQL作成時に自動作成されます。

また、このAdminロールを用いて接続する際の挙動としては、dsql:DbConnectAdmin というアクションになります。

ということで、まずは以下コマンドで認証トークンを生成します。

ターミナル
# DSQLクラスターエンドポイントを環境変数に入れておく
$ export DSQL_ENDPOINT="your-cluster-endpoint"

# 生成したトークンを環境変数に入れておく
$ export PGPASSWORD=$(aws dsql generate-db-connect-admin-auth-token \
  --expires-in 3600 \
  --region us-east-1 \
  --hostname $DSQL_ENDPOINT)

その後、作成した認証トークンを用いてクラスターに接続します。

ターミナル
# 接続には
$ export PGSSLMODE=require

# psqlコマンドで接続可能
$ psql --dbname postgres \
  --username admin \
  --host $DSQL_ENDPOINT

# 表示が以下のようにpostgresになれば成功
postgres=>

これでひとまずDSQLへ接続できました。

3.1 データベースロールを作成する

次はカスタムデータベースロールを作成していきます。

さっきまではAdminロールを用いて接続していましたが、実際にアプリケーションなどでAdminロールを設定するわけにはいきません。権限が強すぎるためです。

そこでカスタムデータベースロールを作成し、必要なだけの権限を付与して上げる必要があります。

ということで以下コマンドを入力してデータベースロールを作成し、それをIAMロールと結びつけておきます。

SQL
-- Step 1: 新しいデータベースロールを作成
CREATE ROLE app_user WITH LOGIN;

-- Step 2: 最初に作成したIAMロールとマッピング
AWS IAM GRANT app_user TO 'arn:aws:iam::<AccountId>:role/aurora-dsql-access-role';

-- 確認: IAMロールとデータベースロール間のマッピングを表示する
SELECT * FROM sys.iam_pg_role_mappings;

-- 出力例
iam_oid|                          arn                          | pg_role_oid | pg_role_name | grantor_pg_role_oid | grantor_pg_role_name 
-------+-------------------------------------------------------+-------------+--------------+---------------------+----------------------
 ***** | arn:aws:iam::<AccountId>:role/aurora-dsql-access-role |   *******   |   app_user   |       *******       | admin
(1 row)

これでapp_userというデータベースロールを作成し、そこに先ほど作成したaurora-dsql-access-roleの権限を紐づけた形になります。
すなわち、aurora-dsql-access-roleを用いれば、app_userロールを用いてDSQLへ接続できるようになりました。

3.2 データスキーマやテーブルを作成する

ロールが作成できたら、実際にデータを入れる枠組みを作っていきます。
枠組み、と言っても色んな用語が出てくるので簡単に補足しておきます。

  • スキーマ(schema)
    • データベース内の名前空間
  • テーブル(table)
    • データ本体の列を定義
    • データの型、Nullの可否、プライマリキーなどの制約を決定
  • インデックス(index)
    • 高速検索のための付加データ構造
    • WHERE/JOIN/ORDER BYでよく使う列に絞るのが基本(らしい)
  • search_path
    • スキーマ検索の解決順
  • 使い方
    • <schema>.<table>という形式で一意に管理
    • SET search_path TO <schema>を設定しておけば、<table>と書くだけで<schema>.<table>を指定できるようになる

実際に指定するコマンドの例としては、以下の通りです。

sql
-- スキーマが存在しない場合は作成
CREATE SCHEMA IF NOT EXISTS app_schema;

-- スキーマを明示的に指定
SET search_path TO app_schema;

-- memberというテーブル作成
CREATE TABLE IF NOT EXISTS app_schema.members (
  member_id   UUID DEFAULT gen_random_uuid() PRIMARY KEY,
  name        VARCHAR(100) NOT NULL,
  email       VARCHAR(255) UNIQUE NOT NULL,
  created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- よく検索する列にだけインデックス
CREATE INDEX ASYNC idx_members_email   ON app_schema.members(email);
CREATE INDEX ASYNC idx_members_active  ON app_schema.members(is_active);

これでapp_schema配下にmemberテーブルが作成され、その中にidやnameを格納できるようになりました。

3.3 データベースロールがスキーマやテーブルへアクセスする権限を付与する

先ほどapp_userロールを作成しましたが、現段階ではまだDSQLへの接続権限しか持っていません。
なので、接続した先でどんな操作を許可するかについての権限を付与していきます。

その際はGRANTコマンドを用いて、スキーマ単位とテーブル単位ごとにそれぞれ権限を付与します。

  • スキーマへの権限付与
    • USAGEでスキーマ名を解決
  • テーブルへの権限付与
    • SELECT/INSERT/UPDATE/DELETEでテーブルへの操作権限付与
    • SELECT: データ検索
    • INSERT: データ挿入
    • UPDATE: データ更新
    • DELETE: データ削除
sql
-- スキーマへのアクセス権限付与
GRANT USAGE ON SCHEMA app_schema TO app_user;

-- テーブルへのアクセス権限付与
GRANT SELECT, INSERT, UPDATE ON app_schema.members TO app_user;

今はテーブルに対してDELETE以外の3つの権限を付けてみました。
実際に使用する場合はユーザーに応じて必要な権限のみを付与する形にするのがベストだと思います。

これで、ひとまず準備完了です。コマンドで\qを入力して、DSQLとの接続から抜けておきます。

DSQLへの接続には認証トークンを用いていますが、これには有効期限があります。デフォルトでは15分間です。
トークン生成コマンド実行時に--expires-in 3600などを指定することで延長も可能です。

ただ、接続中にトークンの有効期限が過ぎたとしてもそれは問題ないようです。
トークンはあくまで接続のために用いるもので、一度接続した後はそのセッション中において、再使用されることはないそうです。

3.4 (きちんと作成できたか確認する)

ここまでできたら、スキーマ、テーブルの作成状況などを確認していきます。

スキーマにpublicやsysというものが追加されていますが、これはデフォルトで作成されているもののようです(Ownerがadminではない)。

ターミナル
$ psql -h $DSQL_ENDPOINT -U admin -d postgres -c "\dn"
        List of schemas
    Name    |       Owner       
------------+-------------------
 app_schema | admin
 public     | pg_database_owner
 sys        | dbowner
(3 rows)

$ psql -h $DSQL_ENDPOINT -U admin -d postgres -c "\dt app_schema.*"
                List of relations
   Schema   |        Name        | Type  | Owner 
------------+--------------------+-------+-------
 app_schema | members            | table | admin
(1 rows)

ここまででDSQL側の準備が整いました。

4. DSQLへ、データベースロールでアクセスする

接続するためには、カスタムデータベースロールにも指定した aurora-dsql-access-role の認証情報をローカルに設定する必要があります。

aws sts assume-roleコマンドを用いて、ローカルにIAMロール側の認証情報を設定しましょう。

ターミナル
# 現在の認証情報を確認する
$ aws sts get-caller-identity
{
    "UserId": "**************",
    "Account": "************",
    "Arn": "arn:aws:iam::************:user/<IAMユーザー名>"
}

# IAMロールを引き受ける
$ aws sts assume-role \                                          
  --role-arn arn:aws:iam::<AccountId>:role/aurora-dsql-access-role \
  --role-session-name test-session > assume-role-output.json

# IAMロールの認証情報をターミナルへ設定する
$ export AWS_ACCESS_KEY_ID=$(jq -r '.Credentials.AccessKeyId' assume-role-output.json)
export AWS_SECRET_ACCESS_KEY=$(jq -r '.Credentials.SecretAccessKey' assume-role-output.json)
export AWS_SESSION_TOKEN=$(jq -r '.Credentials.SessionToken' assume-role-output.json)

# 現在の認証情報を確認する(Arnが変わっているはず)
$ aws sts get-caller-identity
{
    "UserId": "**************",
    "Account": "************",
    "Arn": "arn:aws:sts::************:assumed-role/aurora-dsql-access-role/test-session"
}

先ほどデータベースロールとこのIAMロールをマッピングしたので、データベースロールを用いてDSQLへ接続するためにはこのIAMロールを用いて接続する必要があります。
また、このIAMロールにはDbConnectアクションが許可されています。

これでカスタムデータベースロールを用いて、DSQLへ接続できるようになります。

4.1 CLIを用いる場合

先ほどはAdminロールでアクセスしていましたが、ここでは先ほど作成したapp_userロールを使います。

その際、トークン作成コマンドが変わってくるので要注意です。
Adminロールでは generate-db-connect-admin-auth-token を使いましたが、
カスタムデータベースロールでは generate-db-connect-auth-token を使います。

ターミナル
# カスタムデータベースロール用トークンを環境変数に入れておく
$ export APP_TOKEN=$(aws dsql generate-db-connect-auth-token \
  --expires-in 3600 \
  --region us-east-1 \
  --hostname $DSQL_ENDPOINT)

その後、作成した認証トークンを用いてクラスターに接続します。

ターミナル
# app_userのトークンとユーザー名を用いてDSQL接続
$ PGPASSWORD=$APP_TOKEN psql \
  --dbname postgres \
  --username app_user \
  --host $DSQL_ENDPOINT

# 表示が以下のようにpostgresになれば成功
postgres=>

ここまで来れば、このユーザーではデータの参照・格納・更新が可能で、削除だけできない、という状態になっているはずです。

まずはデータを1つ格納してみましょう。

postgres=> INSERT INTO app_schema.members (
    email,
    name,
) VALUES (
    'test.user@example.com',
    'テスト太郎',
) RETURNING member_id, email, name, created_at;
   member_id   |         email         |    name    |          created_at           
--------------------------------------+-----------------------+------------+-------------------------------
 ************* | test.user@example.com |  テスト太郎  | 2025-09-07 21:50:01.295681+00
(1 row)

INSERT 0 1

続いて、先ほど格納したデータを参照します。

SELECT 
    member_id,
    email,
    name,
    created_at,
    updated_at
FROM app_schema.members
WHERE email = 'test.user@example.com';
   member_id   |         email         |    name    |          created_at           |          updated_at           
--------------------------------------+-------------+-------------------------------+-------------------------------
 ************* | test.user@example.com |  テスト太郎  | 2025-09-07 21:50:01.295681+00 | 2025-09-07 21:50:01.295681+00

今作製したデータを更新します。

postgres=> UPDATE app_schema.members
SET 
    name = 'テスト花子',
    updated_at = CURRENT_TIMESTAMP
WHERE email = 'test.user@example.com'
RETURNING member_id, email, name, slack_user_id, github_username, updated_at;
   member_id   |         email         |   name   |          updated_at           
---------------+-----------------------+----------+-------------------------------
 ************* | test.user@example.com | テスト花子 | 2025-09-07 22:09:13.684773+00
(1 row)

UPDATE 1

削除には失敗するはずです。

postgres=> DELETE FROM output_history.members
WHERE email = 'test.user@example.com';
ERROR:  permission denied for table members

ここまでできていれば、CLIでの動作確認は完璧です!

ローカルに設定したIAMロールの情報を削除するには、以下コマンドを実行します。環境変数が削除されて、aws configureで設定された認証情報が再度有効になります。

$ unset AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN

4.2 SDKを用いる場合

上記はCLIでの実行だったので、アプリケーションで使う時にはイメージが沸かないかもしれません。
ということで、AWS SDKを用いてLambdaからDSQLへアクセスしてみましょう。

事前準備として、最初に作成した aurora-dsql-access-role の信頼関係を修正します。
現在はアカウントへの紐づけしかできませんが、Lambdaへの紐づけも可能にしておきましょう。

信頼関係
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<AccountId>:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        },
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

そしてサンプルコードは以下の感じです。CLIの時と同じようにデータ格納・参照・更新・削除を一回ずつやってみます。

サンプルLambdaコード(←クリックで開閉できます)
connectDsqlFunction.py
def get_connection():
    """
    Aurora DSQLへの接続を取得(pg8000使用)
    app_userロールを使用
    """
    # 環境変数から設定を読み込み
    dsql_endpoint = os.environ.get('DSQL_ENDPOINT')
    aws_region = os.environ.get('AWS_REGION', 'us-east-1')
    
    if not dsql_endpoint:
        raise ValueError("DSQL_ENDPOINT environment variable is not set")
    
    # IAMトークン生成(app_user用)
    dsql_client = boto3.client('dsql', region_name=aws_region)
    auth_token = dsql_client.generate_db_connect_auth_token(
        Hostname=dsql_endpoint,
        ExpiresIn=900  # 15分
    )
    
    # SSL接続の設定
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_REQUIRED
    
    # pg8000での接続(app_userロール、postgresデータベース固定)
    conn = pg8000.connect(
        host=dsql_endpoint,
        database='postgres',  # Aurora DSQLは固定
        user='app_user',
        password=auth_token,
        port=5432,
        ssl_context=ssl_context
    )
    return conn


def lambda_handler(event, context):
    """
    Lambda エントリーポイント
    シンプルなCRUD操作をテストし、結果を返す (pg8000使用)
    """
    results = {
        "test_timestamp": datetime.now().isoformat(),
        "connection_method": "pg8000 (Pure Python - No Lambda Layer)",
        "schema": "app_schema",
        "role": "app_user",
        "table": "members (email, name, created_at, updated_at)",
        "tests": {},
        "summary": {
            "total": 0,
            "passed": 0,
            "failed": 0
        }
    }
    
    test_email = f"lambda.simple.{datetime.now().timestamp()}@example.com"
    
    try:
        # 1. INSERT テスト
        print("=" * 50)
        print("1. INSERT テスト - ダミーデータの挿入 (pg8000)")
        print("=" * 50)
        
        try:
            conn = get_connection()
            cur = conn.cursor()
            
            cur.execute("""
                INSERT INTO app_schema.members (
                    email, name
                ) VALUES (%s, %s)
                RETURNING email, name, created_at, updated_at
            """, (
                test_email,
                'シンプルテスト太郎'
            ))
            result = cur.fetchone()
            conn.commit()
            
            # pg8000は列名を返さないので、列番号でアクセス
            # RETURNING句の順: email(0), name(1), created_at(2), updated_at(3)
            results["tests"]["INSERT"] = {
                "status": "PASSED",
                "message": "データ挿入成功",
                "data": {
                    "email": result[0],
                    "name": result[1],
                    "created_at": str(result[2]),
                    "updated_at": str(result[3]) if result[3] else None
                }
            }
            results["summary"]["passed"] += 1
            print(f"✅ INSERT成功: email={result[0]}")
            conn.close()
            
        except Exception as e:
            results["tests"]["INSERT"] = {
                "status": "FAILED",
                "error": str(e),
                "traceback": traceback.format_exc()
            }
            results["summary"]["failed"] += 1
            print(f"❌ INSERT失敗: {e}")
        
        results["summary"]["total"] += 1
        
        # 2. SELECT テスト
        print("\n" + "=" * 50)
        print("2. SELECT テスト - データの参照 (pg8000)")
        print("=" * 50)
        
        try:
            conn = get_connection()
            cur = conn.cursor()
            
            cur.execute("""
                SELECT email, name, created_at, updated_at
                FROM app_schema.members
                WHERE email = %s
            """, (test_email,))
            result = cur.fetchone()
            
            if result:
                # 列番号: email(0), name(1), created_at(2), updated_at(3)
                results["tests"]["SELECT"] = {
                    "status": "PASSED",
                    "message": "データ参照成功",
                    "data": {
                        "email": result[0],
                        "name": result[1],
                        "created_at": str(result[2]),
                        "updated_at": str(result[3]) if result[3] else None
                    }
                }
                results["summary"]["passed"] += 1
                print(f"✅ SELECT成功: {result[1]}")  # result[1] = name
            else:
                results["tests"]["SELECT"] = {
                    "status": "FAILED",
                    "error": "データが見つかりません"
                }
                results["summary"]["failed"] += 1
            conn.close()
            
        except Exception as e:
            results["tests"]["SELECT"] = {
                "status": "FAILED",
                "error": str(e),
                "traceback": traceback.format_exc()
            }
            results["summary"]["failed"] += 1
            print(f"❌ SELECT失敗: {e}")
        
        results["summary"]["total"] += 1
        
        # 3. UPDATE テスト
        print("\n" + "=" * 50)
        print("3. UPDATE テスト - データの更新 (pg8000)")
        print("=" * 50)
        
        try:
            conn = get_connection()
            cur = conn.cursor()
            
            cur.execute("""
                UPDATE app_schema.members
                SET name = %s,
                    updated_at = CURRENT_TIMESTAMP
                WHERE email = %s
                RETURNING email, name, created_at, updated_at
            """, (
                'シンプルテスト花子',
                test_email
            ))
            result = cur.fetchone()
            conn.commit()
            
            if result:
                # RETURNING句の順: email(0), name(1), created_at(2), updated_at(3)
                results["tests"]["UPDATE"] = {
                    "status": "PASSED",
                    "message": "データ更新成功",
                    "data": {
                        "email": result[0],
                        "name": result[1],
                        "created_at": str(result[2]),
                        "updated_at": str(result[3])
                    }
                }
                results["summary"]["passed"] += 1
                print(f"✅ UPDATE成功: {result[1]}")  # result[1] = name
            else:
                results["tests"]["UPDATE"] = {
                    "status": "FAILED",
                    "error": "更新対象が見つかりません"
                }
                results["summary"]["failed"] += 1
            conn.close()
            
        except Exception as e:
            results["tests"]["UPDATE"] = {
                "status": "FAILED",
                "error": str(e),
                "traceback": traceback.format_exc()
            }
            results["summary"]["failed"] += 1
            print(f"❌ UPDATE失敗: {e}")
        
        results["summary"]["total"] += 1
        
        # 4. DELETE テスト(失敗することを確認)
        print("\n" + "=" * 50)
        print("4. DELETE テスト - データの削除(失敗することを確認) (pg8000)")
        print("=" * 50)
        
        try:
            conn = get_connection()
            cur = conn.cursor()
            
            cur.execute("""
                DELETE FROM app_schema.members
                WHERE email = %s
            """, (test_email,))
            conn.commit()
            
            # ここに到達した場合は想定外(DELETE権限があってはいけない)
            results["tests"]["DELETE"] = {
                "status": "FAILED",
                "error": "DELETE権限があります(想定外)",
                "message": "app_userロールにDELETE権限が付与されています。セキュリティ設計に反します。"
            }
            results["summary"]["failed"] += 1
            print("⚠️  DELETE成功(想定外)- 権限設定を確認してください")
            conn.close()
            
        except pg8000.exceptions.DatabaseError as e:
            # pg8000での権限エラーチェック
            error_msg = str(e)
            if "permission denied" in error_msg.lower():
                # 期待される動作:権限エラー
                results["tests"]["DELETE"] = {
                    "status": "PASSED",
                    "message": "期待通りDELETE権限なしエラー",
                    "expected_error": "permission denied for table members",
                    "actual_error": error_msg
                }
                results["summary"]["passed"] += 1
                print(f"✅ DELETE失敗(期待通り): permission denied")
            else:
                results["tests"]["DELETE"] = {
                    "status": "FAILED",
                    "error": error_msg,
                    "message": "予期しないデータベースエラー"
                }
                results["summary"]["failed"] += 1
                print(f"❌ DELETE予期しないエラー: {e}")
            
        except Exception as e:
            results["tests"]["DELETE"] = {
                "status": "FAILED",
                "error": str(e),
                "message": "予期しないエラーが発生しました",
                "traceback": traceback.format_exc()
            }
            results["summary"]["failed"] += 1
            print(f"❌ DELETE予期しないエラー: {e}")
        
        results["summary"]["total"] += 1
        
        # 5. DELETE失敗後のデータ確認
        print("\n" + "=" * 50)
        print("5. DELETE失敗後のデータ確認 (pg8000)")
        print("=" * 50)
        
        try:
            conn = get_connection()
            cur = conn.cursor()
            
            cur.execute("""
                SELECT email, name
                FROM app_schema.members
                WHERE email = %s
            """, (test_email,))
            result = cur.fetchone()
            
            if result:
                # 列番号: email(0), name(1)
                results["tests"]["POST_DELETE_CHECK"] = {
                    "status": "PASSED",
                    "message": "データは削除されずに残っています(期待通り)",
                    "data": {
                        "email": result[0],
                        "name": result[1]
                    }
                }
                results["summary"]["passed"] += 1
                print(f"✅ データ確認成功: データは残存({result[1]}")  # result[1] = name
            else:
                results["tests"]["POST_DELETE_CHECK"] = {
                    "status": "FAILED",
                    "error": "データが削除されています(想定外)"
                }
                results["summary"]["failed"] += 1
            conn.close()
            
        except Exception as e:
            results["tests"]["POST_DELETE_CHECK"] = {
                "status": "FAILED",
                "error": str(e),
                "traceback": traceback.format_exc()
            }
            results["summary"]["failed"] += 1
        
        results["summary"]["total"] += 1
        
        # 6. クリーンアップ(adminロールが必要なので、ここではスキップ)
        results["cleanup_note"] = (
            "テストデータのクリーンアップはadminロールで実行してください: "
            f"DELETE FROM app_schema.members WHERE email = '{test_email}';"
        )
        
    except ImportError as import_error:
        # pg8000がインストールされていない場合
        results["global_error"] = {
            "error": "pg8000 package not installed",
            "details": str(import_error),
            "solution": "pip install pg8000 -t . でLambdaデプロイパッケージに含めてください",
            "note": "pg8000はPure Pythonなので、Lambda Layerは不要です"
        }
        
    except Exception as e:
        results["global_error"] = {
            "error": str(e),
            "traceback": traceback.format_exc()
        }
    
    # テスト結果サマリー
    print("\n" + "=" * 50)
    print("テスト結果サマリー")
    print("=" * 50)
    print(f"スキーマ: app_schema")
    print(f"ロール: app_user")
    print(f"テーブル: members (simple version)")
    print(f"合計: {results['summary']['total']}")
    print(f"成功: {results['summary']['passed']}")
    print(f"失敗: {results['summary']['failed']}")
    
    # 期待される結果のチェック
    expected_results = {
        "INSERT": "PASSED",
        "SELECT": "PASSED",
        "UPDATE": "PASSED",
        "DELETE": "PASSED",  # 権限エラーが期待通り発生 = PASSED
        "POST_DELETE_CHECK": "PASSED"
    }
    
    all_tests_passed = all(
        results["tests"].get(test, {}).get("status") == expected 
        for test, expected in expected_results.items()
    )
    
    results["overall_status"] = "SUCCESS" if all_tests_passed else "FAILURE"
    results["permission_check"] = {
        "app_user_permissions": {
            "SELECT": "✅ 許可",
            "INSERT": "✅ 許可",
            "UPDATE": "✅ 許可",
            "DELETE": "❌ 拒否(設計通り)"
        },
        "security_design": "app_userロールの権限設定は正しく機能しています",
        "connection_info": "pg8000 (Pure Python) - Lambda Layerなしで動作"
    }
    
    return {
        'statusCode': 200 if all_tests_passed else 500,
        'body': json.dumps(results, ensure_ascii=False, indent=2, default=str)
    }


# ローカルテスト用
if __name__ == "__main__":
    # 環境変数の設定例
    # os.environ['DSQL_ENDPOINT'] = 'your-cluster-endpoint.dsql.us-east-1.on.aws'
    # os.environ['AWS_REGION'] = 'us-east-1'
    
    # pg8000がインストールされているか確認
    try:
        import pg8000
        print("✅ pg8000がインストールされています")
        print(f"   バージョン: {pg8000.__version__}")
    except ImportError:
        print("❌ pg8000がインストールされていません")
        print("   実行: pip install pg8000")
        exit(1)
    
    result = lambda_handler({}, {})
    print("\n最終結果:")
    print(json.dumps(json.loads(result['body']), ensure_ascii=False, indent=2))

環境変数として以下を設定し、テストをしてみてください。入力値は不要です。
すると、CLI実行時と同じような結果が得られるはずです。

  • DSQL_ENDPOINT: .dsql.us-east-1.on.aws

一応簡単にコードの解説をしておきます。
dsqlのクライアントを作製し、generate_db_connect_auth_token() メソッドで認証トークンを生成します。

connectDsqlFunction.py
# IAMトークン生成(app_user用)
dsql_client = boto3.client('dsql', region_name=aws_region)
auth_token = dsql_client.generate_db_connect_auth_token(
    Hostname=dsql_endpoint,
    ExpiresIn=900  # 15分
)

あとはそのトークンを用いてアクセスするのみです。
ここではpg8000というツールを使ってPostgreSQLへの接続を確立し、SQLコマンドを叩いていくだけです。

connectDsqlFunction.py
# SSL接続の設定
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_REQUIRED
    
# pg8000での接続(app_userロール、postgresデータベース固定)
conn = pg8000.connect(
    host=dsql_endpoint,
    database='postgres',  # Aurora DSQLは固定
    user='app_user',
    password=auth_token,
    port=5432,
    ssl_context=ssl_context
)
return conn

PostgreSQL接続ツールとしては他にもpsycopg2などが有名ですが、こちらはLambda Layerの作成が必要そうだったので、今回はpg8000を採用しています。

ということで、サクッとLambdaからDSQLへの接続まで試してみました。

トラブルシューティング

Aurora DSQL、思っていた以上にハマりポイントが多いです。
参考までに私がハマったエラーを纏めておきます。

エラー1. JSON/JSONBデータ型がサポートされていない

JSON形式でデータを格納しようとしたところ、以下のようなエラーが出力されました。
Aurora DSQLは現時点でJSON/JSONBデータ型をサポートしていないようです。

ERROR:  datatype jsonb not supported
ERROR:  datatype json not supported

解決法としては。JSON/JSONB型をTEXT型に変更しておくことです。

CREATE TABLE activities (
    activity_id UUID PRIMARY KEY,
    aws_services TEXT,  -- JSON代替
    tags TEXT          -- JSON代替
);

また、アプリケーションとの都合で渡されてくる情報がJSON形式になっている場合は、格納処理前にJSON→TEXTの変換処理を実装するのが良さそうです。

エラー2. CREATE INDEX構文エラー

Aurora DSQLは通常のCREATE INDEXをサポートしていないようです。

ERROR:  unsupported mode. please use CREATE INDEX ASYNC.

上記の例では当然のように使っていましたが、非同期インデックス作成(CREATE INDEX ASYNC)を行う必要がありました。

CREATE INDEX ASYNC idx_members_email   ON app_schema.members(email);

エラー3. PL/pgSQL関数がサポートされていない

PL/pgSQLとは、PostgreSQL用の手続き型言語で、ストアドプロシージャやトリガーを記述するために使用します。

一応DSQLの裏側はPostgreSQLですが、こちらには未対応なようです。

ERROR:  CREATE FUNCTION with language plpgsql not supported
ERROR:  unsupported statement: CreateTrig

(Claudeさん曰く、「Aurora DSQLは分散アーキテクチャのためノード間でのストアドプロシージャやトリガーの同期が困難であり、サポートされていないのでは」とのことでした。ドキュメントには明記されていなそうでした。)

これで困るのは、例えばデータが格納されたことをトリガーに別処理を行う、みたいなことがしづらいという点です。

-- トリガー関数の定義
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- トリガーの作成
CREATE TRIGGER update_members_updated_at
    BEFORE UPDATE ON members
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();

解決策としては、トリガーとして実装していた機能をアプリケーション側に寄せるしか無さそうです。

from datetime import datetime

class DSQLClient:
    def update_member(self, member_id, data):
        # アプリケーション層でupdated_atを更新
        data['updated_at'] = datetime.now()
        
        query = """
        UPDATE members 
        SET name = %s, email = %s, updated_at = %s
        WHERE member_id = %s
        """
        cursor.execute(query, (
            data['name'], 
            data['email'], 
            data['updated_at'],
            member_id
        ))

エラー4. publicスキーマへの権限付与エラー

最初、app_schemaを作成せずデフォルトのスキーマに対するアクセス権限を付与しようとしたのですが、その際エラーになりました。

postgres=> GRANT USAGE ON SCHEMA public TO app_user;
ERROR:  feature not supported on system entity

この時、publicスキーマにテーブルが作成されており、これが原因でエラーになっていたようです。上記3.4で確認した際にpublicはデフォルトで作成されているもの、と記載しましたが、ここにユーザーから干渉することはできないようでした。

↓こんな状態
              List of relations
 Schema |        Name        | Type  | Owner 
--------+--------------------+-------+-------
 public | members            | table | admin

解決方法として、カスタムスキーマを作成して使用します。app_schemaを作るようにしていたのはこのためです。

-- カスタムスキーマを作成
CREATE SCHEMA app_schema;

-- スキーマへの権限付与
GRANT USAGE ON SCHEMA app_schema TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA app_schema TO app_user;

エラー5. ユーザーとアクションの不一致

認証トークンを作成し、psqlコマンドでDSQLへ接続しようとすると、以下エラーが出ました。

psql: error: connection to server at "<DSQL Endpoint>", port 5432 failed: FATAL:  unable to accept connection, access denied
HINT: Wrong user to action mapping. user: admin, action: DbConnect

# HINTが以下の場合もあります
HINT:  Wrong user to action mapping. user: app_user, action: DbConnectAdmin

DSQLには2種類の認証アクションがあり、それに応じたロールで接続する必要があります。
そしてそのロールとトークン生成コマンドの組み合わせが間違っていると発生します。

adminロールはDbConnectAdminアクションのみ使用可能で、通常のDbConnectアクションは使用できません。

逆に、カスタムデータベースロールはDBConnetアクションのみ可能です。

ユーザー 正しいトークン生成コマンド IAMアクション 用途
admin generate-db-connect-admin-auth-token dsql:DbConnectAdmin DDL操作、ロール管理
app_user generate-db-connect-auth-token dsql:DbConnect DML操作、アプリケーション接続

エラー6. 単一クラスター上で複数のDBを作成する

接続するデータベース名を別のものに変更しようとしたのですが、できませんでした。

psql: error: unable to accept connection, database "my_database" does not exist

また、そもそもCREATE DATABASEコマンドをサポートしていないようです。

postgres=> CREATE DATABASE my_database;
ERROR:  unsupported statement: Createdb

つまり、1つのクラスターにおいて、デフォルトの postgres データベース以外は使用できず、増やすこともできないようでした。

これに伴い、データベースの代わりにスキーマを使用して論理的にデータを分離する必要がありそうです。

postgres(データベース)
├── public(デフォルトスキーマ)
├── app1_schema(アプリケーション用スキーマ)
|   └── members
└── app2_schema(アプリケーション用スキーマその2)
    └── reports

今回私が遭遇したのは上記のとおりでしたが、他にも色々な制約があるようです。
こちらはお手元のコーディングエージェントなどにもインプットしながら設計していくのが良さそうです。

まとめ

これから利用が増えていきそうなAurora DSQL、ぜひ触ってみてはいかがでしょう!

参考資料

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?