はじめに
S3上のCSVファイルのデータをRDS上のMySQL DBに書き込むLambda関数を作成しました。
ハードコーディングを回避するためにしたことを書いていきます。
システム構成
構成図は次のとおりです。メインの処理にかかわる部分は赤色の矢印にしています。
S3、RDS、Lambda以外のサービスは以下の用途で使います。
なお、今回は使用していませんが、LambdaとRDSを接続するときは、RDSプロキシを使うのがベストプラクティスです。
EC2
RDS上のDBにコマンドを流すために使います。テーブル構築と実行結果の確認に使用しました。
Lambdaのコードで同様のことができるのですが、コマンドを流すのが簡単で実行結果の視認性も高いため、EC2インスタンスを立てました。
Systems Manager(パラメータストア)
DB認証情報を保管するのに使います。
Systems Manager(セッションマネージャー)
EC2に接続するのに使います。
Endpoints
VPC上のリソースをVPC外のリソースと接続させるために使います。
各リソースの設定について
前項のシステム構成図のとおりにするためにはいろいろと気を付けることがあります。箇条書きでリソース構築時に気を付ける点について書いていきます。
<注意>
・リソースを作成すると利用料が発生するものもあるので、使わなくなったら必ず削除してください。(特に、VPCエンドポイントは結構いい値段します)
・番号はこの順番で作っていくと楽という順番で振っています。
・個人での利用を想定したものになっています。本番環境では使用する場合は、セキュリティグループの細かい設定やRDSの高可用性設定が必要になります。
① VPC
・作成するリソースで、「VPCなど」を選択
・AZの数は2つ
・パブリックサブネットの数は0、プライベートサブネットの数は2
・NATゲートウェイはなし
・VPCエンドポイントは「S3ゲートウェイ」を選択
→ この設定により、S3用のVPCエンドポイントが自動的に作られる
・DNSオプションはデフォルトのまま(DNSホスト名を有効化・DNS解決を有効化のどちらもチェック)
② VPCエンドポイント
・com.amazonaws.<リージョン名>.ssm
、com.amazonaws.<リージョン名>.ssmmessages
、com.amazonaws.<リージョン名>.ec2messages
の3つのVPCエンドポイントを作成する
・①で作成したVPCとサブネット2つを選択する
・セキュリティグループはデフォルトのものを選択
※デフォルト以外のセキュリティグループを選択する場合、443ポートのインバウンドアクセス許可が必要
③ RDSサブネットグループ
・①で作成したVPCとサブネット2つを選択する
④ RDS
RDSは作成完了までに時間を要するのではやめに作ります
・「標準作成」を選択
・エンジンのオプションは「MySQL」を選択
・「無料利用枠」を選択
・マスターユーザー名は「admin」のまま、マスターパスワードは手動で生成する
→ Lambdaからの接続に必要となるので忘れないようにすること!
・インスタンスの設定はデフォルトのまま
・ストレージの設定は、「ストレージの自動スケーリングを有効にする」のチェックを外し、後はデフォルトのまま
・接続の設定は、VPCを①で作成したVPCに、DBサブネットグループを③で作成したサブネットグループに、セキュリティグループはデフォルトのものを選択して、後はデフォルトのまま
・その他の設定はデフォルトのまま
⑤ S3
・すべてデフォルトのまま
⑥ IAMロール
■ Lambda用ロール
以下のポリシーをアタッチする
・AWSLambdaBasicExecutionRole
(AWS管理ポリシー)
・AWSLambdaVPCAccessExecutionRole
(AWS管理ポリシー)
・AmazonS3ReadOnlyAccess
(AWS管理ポリシー)
・ssm:GetParameters
を許可するポリシー(自分で作成要)
■ EC2用ロール
以下のポリシーをアタッチする
・AmazonSSMManagedInstanceCore
(AWS管理ポリシー)
⑦ Lambda
<作成前>
・pymysqlモジュールを開発環境でインストールしてzip化
・「一から作成」を選択
・ランタイムはPythonを選択(バージョンはその時によって変わると思います)
・アーキテクチャは「x86_64」
・「デフォルトの実行ロールの変更」を展開
・「既存のロールを使用する」を選択。⑥で作成したLambda用のIAMロールを選択
・「詳細設定」を展開
・「VPCを有効化」にチェックを入れ、①で作成したVPCとサブネットを選択、セキュリティグループはデフォルトのものを選択
<作成後>
・コードソースの右上のアップロード元をクリックし、「.zipファイル」を選択し、関数作成前に作ったzipファイルをアップロード
・トリガーを追加
・ソースは「S3」、バケットは⑤で作成したバケット、イベントタイプは「すべてのオブジェクト作成イベント」、再帰呼び出しの文章にチェック
⑧ EC2
RDSのデータベース作成完了後に作成して下さい。
・OSは「Amazon Linux」、AMIはデフォルトのまま
・インスタンスタイプはデフォルトのまま(無料利用枠の対象のもの)
・キーペアは「キーペアなしで続行」
・ネットワーク設定の右上の「編集」をクリック
・①で作成したVPC、サブネットを選択
・パブリックIPの自動割り当ては「無効化」を選択
・セキュリティグループは、「既存のセキュリティグループを選択する」を選択し、デフォルトのものを選択
・ストレージを設定はデフォルトのまま
・高度な詳細を展開
・IAMインスタンスプロフィールで、⑥で作成したEC2用のIAMロールを選択
・その他はデフォルトのまま
Lambdaのハードコーディングを改善
前置きがだいぶ長くなりましたが、本題に入ります。
今回直していく点は、S3のバケット名とオブジェクトキー名の記述、DB認証情報の記述2点です。
S3のバケット名とオブジェクトキー名
S3オブジェクトを取得するコードとして以下のようなものが考えられます。
これでも問題ありませんが、S3のオブジェクト名が決まっていない場合(AWSサービスからのログをS3に保管する場合など)に困りますし、保守性も低いです。
def lambda_handler(event, context):
# バケット名・オブジェクト名を取得
BUCKET_NAME = <バケット名>
KEY_NAME = <オブジェクト名>
# PUTされたオブジェクトの取得
s3Client = boto3.client('s3')
response = s3Client.get_object(
Bucket=BUCKET_NAME,
Key=KEY_NAME
)
今回作成したLambdaはS3のオブジェクト作成で駆動するので、S3オブジェクト作成イベント情報を利用します。
S3オブジェクト作成イベントは以下のようなJSON形式で、Lambda関数に渡されます。(一部マスキングしています)
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "ap-northeast-1",
"eventTime": "2023-02-23T15: 07: 31.188Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "XXXXXXXXXXXXX"
},
"requestParameters": {
"sourceIPAddress": "XXXXXXXXXXXXX"
},
"responseElements": {
"x-amz-request-id": "KR5VTP6F445DKXWR",
"x-amz-id-2": "XXXXXXXXXXXX"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "a30bf28a-ecbe-47fd-bacb-f570a5f60771",
"bucket": {
"name": "test-20230224-lambda-trigger",
"ownerIdentity": {
"principalId": "XXXXXXXXXXX"
},
"arn": "arn:aws:s3:::test-20230224-lambda-trigger"
},
"object": {
"key": "sampledata.csv",
"size": 155,
"eTag": "9ee8099a5c115b1dcd37745ad62d665a",
"sequencer": "0063F7813329449888"
}
}
}
]
}
イベント情報は、lambda_handler(event, context)
の引数event
に渡されているので、下記のようなコードで、S3オブジェクトを取得できます。
def lambda_handler(event, context):
# オブジェクトがPUTされたバケット名・オブジェクト名を取得
BUCKET_NAME = event['Records'][0]['s3']['bucket']['name']
KEY_NAME = event['Records'][0]['s3']['object']['key']
# PUTされたオブジェクトの取得
s3Client = boto3.client('s3')
response = s3Client.get_object(
Bucket=BUCKET_NAME,
Key=KEY_NAME
)
DB認証情報
RDS上のDBに接続するためには、RDSのエンドポイント、DBのマスターユーザー名、DBのマスターパスワード、使用するDB名が必要になります。
これらの情報は機密情報にあたるので、以下のようなハードコーディングは避けるべきです。
def lambda_handler(event, context):
# DB認証情報を変数に格納
HOST = <RDSのエンドポイント>
USER = <マスターユーザー名>
PASSWORD = <マスターパスワード>
DB_NAME = <使用するDB名>
DB認証情報を保管するために、AWS Systems Managerのパラメータストアを利用します。
パラメータストアの設定方法や、設定したパラメータを利用するコードについて説明します。
パラメータストアの設定
AWS Systems Managerから「パラメータストア」を選択
「マイパラメータ」タブを選択し、「パラメータの作成」をクリック
<平文でパラメータを作成する場合>
・利用枠は「標準」
・タイプは「文字列」
・データ型は「text」
画像のとおりに設定すると、「test」というパラメータに「test_value」という値が入ります。
<パラメータの値を暗号化する場合>
・利用枠は「標準」
・タイプは「安全な文字列」
・KMSキーソースは「現在のアカウント」
・KMSキーIDはデフォルトのまま(「alias/aws/ssm」)
「test_secret」というパラメータに「test_secret_value」という値が入っていますが、「安全な文字列」を選択しているためマスキングされています。
以下の3つのパラメータを設定したという前提で、次項に進みます。
パラメータ名:test、値:test_value
パラメータ名:test_secret、値:test_secret_value(暗号化処理実施)
パラメータ名:sample、値:sample_value
パラメータを取得するコード
コードはこちらの記事を参考にさせてもらいました。
【AWS】パラメータストアの値をLambdaから取得する
# パラメータストアからパラメータを取得
def get_ssm_params(*keys):
result = {}
ssm = boto3.client('ssm')
response = ssm.get_parameters(
Names=keys,
WithDecryption=True,
)
for param in response['Parameters']:
result[param['Name']] = param['Value']
return result
# メイン処理
def lambda_handler(event, context):
# パラメータの値を変数に格納
parameters = get_ssm_params('test', 'test_secret', 'sample')
test = parameters['test']
test_secret = parameters['test_secret']
sample = parameters['sample']
boto3.client('ssm').get_parameters
の返り値は、引数の順番通りに返るわけではありません。
例えば、get_ssm_params
関数中のresponse['Parameters']
は以下のようになっています。
そのため、辞書型の変数result
を用いて、パラメータとそれに対応する値を格納しています。
[
{
'Name': 'sample',
'Type': 'String',
'Value': 'sample_value',
'Version': 1,
'LastModifiedDate': datetime.datetime(2023, 2, 23, 15, 49, 34, 471000, tzinfo=tzlocal()),
'ARN': 'arn:aws:ssm:ap-northeast-1:XXXXXXXXXXXXX:parameter/sample', 'DataType': 'text'
}, {
'Name': 'test', 'Type': 'String', 'Value': 'test_value', 'Version': 1,
'LastModifiedDate': datetime.datetime(2023, 2, 23, 15, 46, 49, 744000, tzinfo=tzlocal()),
'ARN': 'arn:aws:ssm:ap-northeast-1:XXXXXXXXXXXXX:parameter/test',
'DataType': 'text'
}, {
'Name': 'test_secret',
'Type': 'SecureString',
'Value': 'test_secret_value',
'Version': 1,
'LastModifiedDate': datetime.datetime(2023, 2, 23, 15, 47, 6, 708000, tzinfo=tzlocal()),
'ARN': 'arn:aws:ssm:ap-northeast-1:XXXXXXXXXXXXX:parameter/test_secret',
'DataType': 'text'
}
]
修正後コードの全体像
エラーハンドリングなしの簡単なお試しコードです。
動作としては、S3上にアップロードされたCSVファイルの情報をそのままRDSにINSERTしていくものとなっています。
import boto3
import io
import os
import csv
import python.pymysql
# パラメータストアからパラメータを取得
def get_ssm_params(*keys):
result = {}
ssm = boto3.client('ssm')
response = ssm.get_parameters(
Names=keys,
WithDecryption=True,
)
for paran in response['Parameters']:
result[param['Name']] = param['Value']
return result
# メイン処理
def lambda_handler(event, context):
# DB認証情報を変数に格納
parameters = get_ssm_params('HOST', 'USER', 'PASSWORD', 'DB_NAME')
HOST = parameters['HOST']
USER = parameters['USER']
PASSWORD = parameters['PASSWORD']
DB_NAME = parameters['DB_NAME']
# オブジェクトがPUTされたバケット名・オブジェクト名を取得
BUCKET_NAME = event['Records'][0]['s3']['bucket']['name']
KEY_NAME = event['Records'][0]['s3']['object']['key']
# PUTされたオブジェクトの取得
s3Client = boto3.client('s3')
response = s3Client.get_object(
Bucket=BUCKET_NAME,
Key=KEY_NAME
)
# CSVファイルの読み取り
csv_file = io.TextIOWrapper(io.BytesIO(response['Body'].read()))
rows = []
for row in csv.reader(csv_file):
rows.append(row)
del rows[0]
data_list = []
for data in rows:
data_list.append(data)
# 連携先DBのuserテーブルへのINSERTクエリ
connection = python.pymysql.connect(host=HOST, user=USER, passwd=PASSWORD, db=DB_NAME)
with connection:
with connection.cursor() as cursor:
# レコードを挿入
sql_insert_Query = "INSERT INTO user (UserId, UserName) VALUES (%s, %s)"
cursor.executemany(sql_insert_Query, data_list)
# コミットしてトランザクション実行
connection.commit()
動作確認
① RDSと接続しているEC2インスタンスに、セッションマネージャーを使用して接続
② 接続後、以下のコマンドを叩く
EC2にMySQLをインストール
sudo yum install mysql
RDSに接続
mysql -h <RDSのエンドポイント> -u <RDSのマスターユーザー名> -p
使用するDBを作成し選択(例では、test
という名称)
CREATE DATABASE test;
USE test;
使用するテーブルを作成
(特にNOT NULL制限等を指定しません)
CREATE TABLE user (
UserId VARCHAR(20),
UserName VARCHAR(50)
);
初期データ投入
INSERT INTO user (UserId, UserName) VALUES
('サンプル001','サンプルユーザー001'),
('サンプル002','サンプルユーザー002');
③ S3にファイルをアップロード
挿入するCSVファイルを作成し、アップロード
④ 実行結果を確認
S3へアップロード後のテーブルの状態はこちら
想定通りの動作になりました!
うまくいかない場合は、CloudWatch LogsでLambdaのログを確認してください。セキュリティグループやIAMロール等の権限まわりでエラーになっているケースが多いかと思います。(タイムアウトになっている場合もだいたい権限まわりを見直せばうまくいくはずです)
おわりに
ハードコーディング対策にはパラメータストアが有効打であることは知っていましたが、いざ実装してみるといろいろと考えることがありました。(主に権限まわり)
座額だけでなく、手を動かして勉強していくのは大事ですね。
ちなみに、DBエンジンがPostgreSQLの場合は、S3の拡張機能を使って簡単に、S3→RDSができるみたいです。
Amazon S3 から RDS for PostgreSQL DB インスタンスにデータをインポートする