- AWS コンソールを使った VPC、RDS、S3、Lambda、サブスクリプションフィルタの作成をまとめました。
- AWS CLI を使った作成はこちら
- システムの構成
Lambda と RSD はVPC、セキュリティグループどちらも同じものに所属させています。
VPCの作成
セキュリティグループの作成
データベースの作成
Lambdaの作成
実行
VPCの作成
セキュリティグループの作成
インバウンドルールを設定します。
アウトバウンドルールを設定します。
データベースの作成
- RDS から MySQL を選択・作成します。
- 「無料利用枠」を選ぶと無料枠内で利用可能なオプションが選択出来ます。
- 今回は「db.t2.micro」を選択しました。
- 「VPC」「セキュリティグループ」は先ほど作成した「myproject-vpc」「my-security-group」を選択します。
- 初期 DB として「testmysql」を作成しました。
- 作成した RDS のエンドポイントが接続時に必要になるのでメモします。
S3 の作成
Lambdaの作成
レイヤーの作成
- ライブラリをレイヤーに含めます。
- ローカル環境でライブラリを zip 化して、レイヤーに登録します。
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/configuration-layers.html
Lambda 実行ロールの作成
- S3 へのアクセス、CloudWatch のログ書き込み、VPC の設定で EC2 権限が必要だったので、以下の権限を付与しました。
Lambda 関数①の作成
- コードを編集します。
RDS へ接続して時刻を取得してログを出します。
import mysql.connector import json import os import logging logger = logging.getLogger() logger.setLevel(logging.INFO) host = os.environ['RDS_HOST_NAME'] user = os.environ['USER_NAME'] password = os.environ['PASSWORD'] db = os.environ['DB_NAME'] def lambda_handler(event, context): conn = None date_time = None message = None try: # MySQL の接続 conn = mysql.connector.connect( user=user, password=password, database=db, host=host ) cur = conn.cursor() # 日時の取得 cur.execute(f"SELECT NOW();") results = cur.fetchall() message = f"{results[0][0]}" logger.info("取得した時刻:" + message) status_code = 200 except Exception as e: logger.info(f"Error Occurred: {e}") status_code = 500 message = "error" finally: if conn is not None and conn.is_connected(): conn.close() return { "statusCode": status_code, "body": json.dumps({ "res": message, "event": event }) }
Lambda 関数①の設定
Lambda 関数①の実行
Lambda 関数①の CloudWatch のログの確認
Lambda 関数②の作成
import json
import base64
import gzip
import re
import boto3
import tempfile
import os
import glob
from datetime import datetime, timedelta, timezone
from boto3.session import Session
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
JST = timezone(timedelta(hours=+9), 'JST')
def lambda_handler(event, context):
try:
# base64 デコード
decoded_data = base64.b64decode(event["awslogs"]["data"])
# gzip 解凍
decompressed_data = gzip.decompress(decoded_data)
json_data = json.loads(decompressed_data)
logger.info(json_data)
# 「取得した時刻:」が書かれているログの一部を抜き出します。
for logEvent in json_data["logEvents"]:
log = re.search(r"取得した時刻:.*", logEvent["message"])
if log != None:
message = log.group()
logger.info("イベントメッセージ:{0}".format(message))
break
# 一時ファイルへの書き込み
with tempfile.TemporaryDirectory() as tmpdir_name:
tmpfile_name = "temp.txt"
tmpfile_path = os.path.join(tmpdir_name, tmpfile_name)
# 一時ファイルの作成
with open(tmpfile_path, "w", encoding='UTF-8') as f:
f.write(message)
logger.info("一時ファイル:{0}".format(glob.glob(tmpfile_path, recursive=True)))
# 一時ファイルの中身を確認
with open(tmpfile_path, encoding='UTF-8') as f:
logger.info("一時ファイルの中身:{0}".format(f.read()))
# 一時ファイルを S3 へアップロード
bucket_name = "xxx"
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
date_time = datetime.now(JST).strftime('%Y%m%d_%H%M%S_%f')
logger.info("現在時刻:{0}".format(date_time))
s3file_path = os.path.join(date_time, tmpfile_name)
bucket.upload_file(tmpfile_path, s3file_path)
# 一時ファイルの削除を確認
logger.info("削除後の一時ファイル:{0}".format(glob.glob(tmpfile_path, recursive=True)))
# s3ファイルの確認
for obj in bucket.objects.all():
logger.info("S3削除対象:{0}".format(obj.key))
# s3ファイルの削除
res = bucket.objects.filter(Prefix=obj.key).delete()
logger.info("削除したS3ファイル:HTTPStatusCode:{0}, Deleted:{1}".format(res[0]["ResponseMetadata"]["HTTPStatusCode"], res[0]["Deleted"][0]["Key"]))
except Exception as e:
logger.info(f"Error Occurred: {e}")
return {
"statusCode": 200,
"body": json.dumps({
"res": "S3TEST",
"event": event
}),
}