初めに
今回はAWS Lambda から Amazon RDS に接続し、データベースを作成するまでの流れを簡単に紹介します。サーバーレスでデータベースを扱う手段として、Lambda と RDS の組み合わせは一般的な構成の一つです。今回はテスト環境での最小限の構成を想定しており、本番運用ではサブネット構成やセキュリティグループの設定をより厳重に行う必要がありますが、その基本的な流れを把握するには良い練習になると思います。
構成
今回のアプリの全体構成です。
VPC
プライベートサブネットを2つ作成し、ここに RDS と Lambda を配置します。
RDS
Multi-AZ (可用性の高い構成) を考慮するため、少なくとも 2 つの AZ (アベイラビリティーゾーン) を用意して作成します。
Lambda
同じ VPC 内のプライベートサブネットに配置して、RDS にアクセスできるようにします。
作業概要
今回の作業の大まかな流れはこんな感じです。
VPC 作成
↓
RDS 作成
↓
Lambda 作成
↓
LambdaとRDS接続
↓
Lambda 関数でデータベース作成の処理を実装 & テスト実行
準備
ここでは、今回使用するデプロイパッケージ用のライブラリとLambdaのサンプルコードとを紹介します。
デプロイパッケージ用のライブラリ
Lambda関数のデプロイパッケージに「pymysql」モジュールを用意します。
Lambda関数は自動的に外部ライブラリを含まないため、ライブラリも一緒にパッケージングする必要があります。
「pymysql」モジュールのパッケージを作成してください。
Google Colabで以下のコードを実行して作成することができます。
!mkdir -p python
!pip install pymysql -t python/
!zip -r pymysql_layer.zip python
from google.colab import files
files.download('pymysql_layer.zip')
データベース作成用Lambda関数
Python と pymysql を使用して、Lambda 関数から RDS (MySQL) に接続し、データベース「test_db」を作成、さらに「test_table」というテーブルを作成して、No 列に [1,2,3]、String 列に ['a','b','c'] を挿入するサンプルコードです。
import os
import pymysql
import json
def lambda_handler(event, context):
# RDS接続情報は環境変数から取得
host = os.environ['DB_HOST']
user = os.environ['DB_USER']
password = os.environ['DB_PASS']
# 初期接続時はデータベース指定なしで接続
connection = pymysql.connect(
host=host,
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# 1. データベース「test_db」を作成(存在しない場合)
cursor.execute("CREATE DATABASE IF NOT EXISTS test_db")
# 2. 使用するデータベースを切り替え
cursor.execute("USE test_db")
# 3. テーブル「test_table」を作成(No: INT, String: VARCHAR)
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_table (
No INT,
String VARCHAR(255)
)
""")
# ※ テスト用なので、重複挿入を避けるために既存データはクリアしておく
cursor.execute("DELETE FROM test_table")
# 4. データの挿入:No列に [1,2,3]、String列に ['a','b','c']
insert_sql = "INSERT INTO test_table (No, String) VALUES (%s, %s)"
data = [(1, 'a'), (2, 'b'), (3, 'c')]
cursor.executemany(insert_sql, data)
# 変更をコミット
connection.commit()
return {
"statusCode": 200,
"body": json.dumps({
"message": "Database 'test_db' and table 'test_table' created successfully, data inserted."
})
}
except Exception as e:
print("Error:", e)
return {
"statusCode": 500,
"body": json.dumps({"error": str(e)})
}
finally:
connection.close()
データ取得用Lambda関数
Python と pymysql を使って RDS の「test_db」内の「test_table」から全データを取得し、その結果を JSON 形式で返す Lambda 関数のサンプルコードです。
import os
import pymysql
import json
def lambda_handler(event, context):
# 環境変数から接続情報を取得(Lambdaの環境変数に設定済み)
host = os.environ['DB_HOST']
user = os.environ['DB_USER']
password = os.environ['DB_PASS']
database = 'test_db' # ここでは固定値
# RDS(MySQL)に接続。cursorclassをDictCursorにして、結果を辞書形式で取得
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# test_table の全レコードを取得するSQLを実行
sql = "SELECT * FROM test_table"
cursor.execute(sql)
results = cursor.fetchall() # 全結果を取得
# 取得したデータをJSON形式で返す
return {
'statusCode': 200,
'body': json.dumps({
'message': "Query executed successfully.",
'data': results
})
}
except Exception as e:
# エラーが発生した場合はエラーメッセージを返す
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
finally:
connection.close()
手順
VPC 作成
2). 「VPCなど」を選択し、以下項目を入力し「VPCを作成」を押します。
3). 問題なく作成されていることを確認し、「VPCを表示」を押します。
4). 後のRDSのVPCへの紐づけ作業で必要なので、赤枠のVPCのIDをメモに控えておきます。
RDSの作成
2). 「標準作成」を押して以下項目を入力し、「データベースの作成」を押します。
- エンジンのオプション:MySQL
- API名:任意のAPI名
- テンプレート:無料利用枠
- インスタンス識別子:任意のインスタンスの名前
- マスターユーザー名:任意のユーザ名(後程Lambda関数の設定に使うのでメモに控えてください)
- 認証情報管理:セルフマネージド
- マスターパスワード:任意のパスワード(後程Lambda関数の設定に使うのでメモに控えてください)
- コンピューティングリソース:EC2コンピューティングリソースに接続しない
- Virtual private Cloud(VPC):先ほど作成したVPCのIDを入力してVPCを選択してください
- パブリックアクセス:なし
- 既存のVPCセキュリティグループ:default
3). 「作成中」→「利用可能」になることを確認し、左横のDBインスタンスを押します。
利用可能になるまで数分かかります。
4). 後ほどLambda関数の設定に使うので以下情報をメモに控えてください
- DB識別子
- エンドポイント
Lambdaレイヤーの作成
1). Lambdaを開き「レイヤー」→「レイヤーの作成」を押します。
2).以下項目を入力し「作成」を押します。
3). 後ほどLambda関数の設定に使うので「バージョンARN」の値をメモに控えてください
Lambda関数の作成
2). 「一から作成」を選択し、以下項目を入力し「関数の作成」を押します。
3). Lambda関数が作成されたらそのまま下にスクロールし、レイヤーの「レイヤーの追加」を押します。
4).「ARNを指定」を押し、先ほどレイヤーの作成の際にメモしたバージョンARNを入力してください。
右横の「検証」押し、エラーがなければ「追加」を押します。
5).Lambda関数のコードソースに以下のデータベース作成用Lambda関数を入力し、「Deploy」を押します。
import os
import pymysql
import json
def lambda_handler(event, context):
# RDS接続情報は環境変数から取得
host = os.environ['DB_HOST']
user = os.environ['DB_USER']
password = os.environ['DB_PASS']
# 初期接続時はデータベース指定なしで接続
connection = pymysql.connect(
host=host,
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# 1. データベース「test_db」を作成(存在しない場合)
cursor.execute("CREATE DATABASE IF NOT EXISTS test_db")
# 2. 使用するデータベースを切り替え
cursor.execute("USE test_db")
# 3. テーブル「test_table」を作成(No: INT, String: VARCHAR)
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_table (
No INT,
String VARCHAR(255)
)
""")
# ※ テスト用なので、重複挿入を避けるために既存データはクリアしておく
cursor.execute("DELETE FROM test_table")
# 4. データの挿入:No列に [1,2,3]、String列に ['a','b','c']
insert_sql = "INSERT INTO test_table (No, String) VALUES (%s, %s)"
data = [(1, 'a'), (2, 'b'), (3, 'c')]
cursor.executemany(insert_sql, data)
# 変更をコミット
connection.commit()
return {
"statusCode": 200,
"body": json.dumps({
"message": "Database 'test_db' and table 'test_table' created successfully, data inserted."
})
}
except Exception as e:
print("Error:", e)
return {
"statusCode": 500,
"body": json.dumps({"error": str(e)})
}
finally:
connection.close()
6). 「設定」→「RDSデータベース」→「RDSデータベースに接続」を押します。
7).「RDSデータベース」にRDS作成の章でメモしたDB識別子を入力しRDSを選択し、「作成」を押します。
9).「環境変数の追加」を押します。
DB_HOST:RDS作成の章でメモしたデータベースのエンドポイント
DB_USER:RDS作成の章でメモしたデータベースのマスターユーザー名
DB_PASS:RDS作成の章でメモしたデータベースのマスターパスワード
10). 以下の「キー」:「値」を入力し、「保存」を押します。
11). 環境変数が設定できていることを確認してLambda関数の作成は完了です。
テスト
データテーブルの確認
1). Lambda関数の作成章からテストまで同様に以下のコードでLambda関数を作成、実行すると、データテーブルの中身を見ることができます。または、今作成したLambda関数のコードソース部分を以下のコードに上書きし直してテストしてもいいと思います。
import os
import pymysql
import json
def lambda_handler(event, context):
# 環境変数から接続情報を取得(Lambdaの環境変数に設定済み)
host = os.environ['DB_HOST']
user = os.environ['DB_USER']
password = os.environ['DB_PASS']
database = 'test_db' # ここでは固定値
# RDS(MySQL)に接続。cursorclassをDictCursorにして、結果を辞書形式で取得
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# test_table の全レコードを取得するSQLを実行
sql = "SELECT * FROM test_table"
cursor.execute(sql)
results = cursor.fetchall() # 全結果を取得
# 取得したデータをJSON形式で返す
return {
'statusCode': 200,
'body': json.dumps({
'message': "Query executed successfully.",
'data': results
})
}
except Exception as e:
# エラーが発生した場合はエラーメッセージを返す
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
finally:
connection.close()
これで今回の手順は終了です!
削除手順
このまま運用する予定がない方は必要以上に料金が発生しないようにリソースを削除します。
特に、RDSはそこそこの料金が取られたりしますので、今回は無料枠での利用ですがちゃんと消すようにしましょう。
RDSの削除
RDSの削除をします。
削除手順がわからない場合、以下を参照ください。
【AWS】RDS DBの削除手順
VPCの削除
VPCの削除をします。
削除手順がわからない場合、以下を参照ください。
【AWS】VPCの削除手順
Lambda関数の削除
Lambda関数の削除をします。
削除手順がわからない場合、以下を参照ください。
Lambda関数削除手順
終わりに
今回はLambda から RDS に接続してデータベースを作成するまでの一連の手順と、その構成の概略についてご紹介しました。
結構簡単にLambdaでRDSデータベースを扱えることがわかってもらえたのではないでしょうか?
この記事が参考になりましたらぜひ「いいね」「フォロー」など励みになるのでよろしくお願いします!