概要
EventBridge -> Lambda -> RDSのストプロ -> Slack通知の流れを構築しました。
具体的にいうと、
「毎日9時に、プロシージャで実施した結果を、指定のSlackチャンネルに通知する」
という流れがゴールです。
構築手順と注意点を解説します。
手順概要
- ストアドプロシージャを作成する
- Lambda用のSecurityGroupを作成する
- Lambda用のIAM Roleを作成する
- Lambda関数を作成する
- LambdaをVPCに設定する
- RDSのSecurity Groupを編集する
- EventBridgeSchedulerを設定する
手順詳細
1. ストアドプロシージャを作成する
まず、今回使うSQLを該当のRDSにプロシージャとして保存します。
今回は、sample_tableのcreate_dateカラムが前日のもので、かつ、sample_noカラムに指定の文字列から始まるレコードが何件あるか、というSQLになります。
CREATE DEFINER=`root`@`%` PROCEDURE `sample`.`count_number`(IN report_date DATE)
BEGIN
SELECT
SUM(CASE WHEN sample_no LIKE 'hoge%' THEN 1 ELSE 0 END) AS count_hoge,
SUM(CASE WHEN sample_no LIKE 'foo%' THEN 1 ELSE 0 END) AS count_foo,
SUM(CASE WHEN sample_no LIKE 'example%' THEN 1 ELSE 0 END) AS count_example,
SUM(CASE WHEN sample_no LIKE 'sample%' THEN 1 ELSE 0 END) AS count_sample,
SUM(CASE WHEN sample_no LIKE 'test%' THEN 1 ELSE 0 END) AS count_test
FROM mysql57.sample_table
WHERE create_date BETWEEN CONCAT(report_date, ' 00:00:00') AND CONCAT(report_date, ' 23:59:59');
END
上記は簡易的なものですが、もっと複雑だったり時間がかかったりする処理を任せる時により役立ちそうですね。
2. Lambda用のSecurityGroupを作成する
EC2 -> SecurityGroupから、セキュリティグループを一つ作成してください。
RDSがあるAWSアカウントのVPCで作成するようにしましょう。
インバウンドルールなどは特に何もなしでOKです。
3. Lambda用のIAM Roleを作成する
以下の権限をLambdaに付与するため、IAMロールを作成します。
・LambdaがCloudwatchlogsにアクセスする権限 = AWSLambdaExecute
・LambdaがVPC(ENI)にアクセスする権限 = AWSLambdaVPCAccessExecutionRole
上記のポリシーが付与されたIAMロールを作成できたら、次のステップでこれをLambdaに設定してあげます。
4. Lambda関数を作成する
作成したストプロを呼び出すLambda関数を作成します。
作成時に、IAMロールの選択が必要になるので、先のステップで作成したものを指定してください。
コードについては以下のPythonコードで作成しました。
ここは適宜用途に沿って変更してください。
import json
import os
from datetime import datetime, timedelta
import boto3
import requests
import pymysql
import logging
from decimal import Decimal
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# RDS設定
rds_host = os.environ['RDS_HOST']
rds_user = os.environ['RDS_USER']
rds_password = os.environ['RDS_PASSWORD']
rds_db = os.environ['RDS_DB']
slack_webhook_url = os.environ['SLACK_WEBHOOK_URL']
def send_slack_notification(webhook_url, message):
payload = {
"text": message
}
response = requests.post(webhook_url, data=json.dumps(payload), headers={'Content-Type': 'application/json'})
if response.status_code != 200:
raise ValueError(f'Request to Slack returned an error {response.status_code}, the response is:\n{response.text}')
def decimal_default(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError
def lambda_handler(event, context):
logger.info("Lambda function started")
# 前日の日付を計算
today = datetime.now()
yesterday = today - timedelta(1)
report_date = yesterday.strftime('%Y-%m-%d')
logger.info(f"Report date: {report_date}")
# RDSに接続
try:
connection = pymysql.connect(host=rds_host, user=rds_user, password=rds_password, db=rds_db)
logger.info("Connected to RDS")
except Exception as e:
logger.error(f"Error connecting to RDS: {e}")
raise
try:
with connection.cursor() as cursor:
# ストアドプロシージャを呼び出し
cursor.callproc('count_number', [report_date])
result = cursor.fetchall()
logger.info(f"Stored procedure result: {result}")
# 結果を整形してSlackに通知
message = f"Report for {report_date}:\n"
for row in result:
message += f"hoge: {row[0]}, foo: {row[1]}, example: {row[2]}, sample: {row[3]}, test: {row[4]}\n"
send_slack_notification(slack_webhook_url, message)
logger.info("Slack notification sent")
finally:
connection.close()
logger.info("RDS connection closed")
return {
'statusCode': 200,
'body': json.dumps(result, default=decimal_default)
}
count_number
はステップ1で作成したストプロの名前です。
以下は環境変数です。
・RDS_DB
・RDS_HOST
・RDS_PASSWORD
・RDS_USER
・SLACK_WEBHOOK_URL
以下のLambdaの設定画面「環境変数」から、上記5つの変数とその値を登録しましょう。
ちなみにlogger
が返すログは、cloudwatchlogsに出力されています。
CW logsは、Lambdaの「モニタリング」タブの「CloudWatch ログを表示」を押すと遷移できます。
何かでエラーが起きている場合は、そこからデバッグしていくと良いと思います。
補足1:SLACK_WEBHOOK_URL
SLACK_WEBHOOK_URLは、Slackの「Incoming Webhook」というアプリから作成が可能です。
https://slack.com/services/new/incoming-webhook
あらかじめSlackチャンネルを作成しておいてある場合、それを以下リンクから選択し、「Incoming Webhookインテグレーションの追加」を押下すればURLが生成されます。
補足2:pymysqlとrequests
上記の設定だけでは、pymysql
とrequests
モジュールを利用するにあたり、import error
が起きてしまいました(cloudwatchlogsで確認できます)。
そんな場合は、Lambda レイヤーを利用すると、追加ライブラリを使用できるようになります。
Layersは自分で作成することもできますが、今回は以下の記事を参考に、Gitでパッケージ化したものをARNとして用意してくださっている方のものを利用させていただきました。該当がない場合は、自分で作成しましょう。
https://qiita.com/rapirapi/items/faf18994fcc69a1136bf
https://github.com/keithrozario/Klayers/tree/master/deployments
今回は、Python3.11を利用していたので、上記Gitでまずそのディレクトリへ。その後、pymysql
とrequests
モジュールのARNをそれぞれコピーしてLambda側に追加してあげます。正常に追加されると、以下のようになります。
ちなみに、ハンドラは、Pythonファイル名.ハンドラーメソッド名
とします。
例えば上記コードのように、「index.py」というPythonファイルを作成し、ハンドラーメソッドとしてlambda_handler
を利用している場合は、index.lambda_handler
というハンドラになります。
デフォルトではlambda_function.lambda_handler
になっていると思います。
5. LambdaをVPCに設定する
(あまりない例だと思いますが、RDSがだれからでもアクセスできる状態の場合はステップ5と6は不要なはず)
今回の構成は、LambdaからRDSのDBに保存してあるストプロへアクセスするという流れ。
RDSは(一般的にも)セキュリティグループのインバウンドルールで特定のアクセスしか許可しないように設定してあります。
そのため、今回LambdaをVPCに設定して、そのLambdaのセキュリティグループからのアクセスをRDS側に許容する必要があります。許容されていない場合、タイムアウトエラーがCloudwatchlogsに出力されます。
Lambdaの設定画面から「VPC」 -> 「編集」を選ぶと、VPC設定が可能です。
セキュリティグループは最初のステップで作成したセキュリティグループを選択します。
※サブネットは最低二つ選択する必要があります
設定が正常に完了すると以下のようになっているはずです。
補足: AWSLambdaVPCAccessExecutionRole
がない場合
IAMロールにAWSLambdaVPCAccessExecutionRole
がない場合、上記のVPC設定時に以下のエラーになります。
The provided execution role does not have permissions to call CreateNetworkInterface on EC2
先のステップでIAMロールを新規作成しこのポリシーを付与したのは、このエラーを回避するためです。
これにより、Lambda関数がVPC内でネットワークインターフェースを作成するための必要な権限を持つようになります。
ちなみに、あくまでこれはLambdaがVPCに設定されるタイミングで「ENIへアクセスできませんよ」というエラーなので、作成時以降は仮にこのRoleを削除してもLambda自体は機能します。ただ、後々LambdaのRoleを見たときに「あれ、これどうやってVPCに設定したんだ?」となってしまうため、特に消したい意図がない限りはつけたままで良いと思います。
参考:
6. RDSのSecurity Groupを編集する
LambdaからRDSへのアクセスの準備はできました。
次に、RDSのセキュリティグループ側で、Lambda側からのアクセスを許容してあげましょう。
RDSのセキュリティグループに移動し、「インバウンドのルールを編集」から、作成したLambdaのセキュリティグループを選択します。
7. EventBridgeSchedulerを設定する
最後に、定期実行の仕組みを作ります。
EventBridgeのスケジューラから「スケジュールを作成」を押下します。
まずは任意のCron式を入力します。
毎日9時に実施したい場合は以下のようなイメージです。
次に、「ターゲット」で「頻繁に使用されるAPI」からAWS Lambdaを選択します。
プルダウンで関数が表示されるので、前述のステップで作成した関数名を指定しましょう。
設定が完了したら、以下の「ターゲット」から該当のLambda関数が選択されているか改めて確認しましょう。
これで設定完了です。
指定したCron式に従って、毎日9時に指定のSlackチャンネルへ通知がいくようになります。
自動化はなかなか楽しいですね。