0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS】EventBridge -> Lambda -> RDSのストプロ -> Slack通知の流れを構築する

Posted at

概要

EventBridge -> Lambda -> RDSのストプロ -> Slack通知の流れを構築しました。

具体的にいうと、
「毎日9時に、プロシージャで実施した結果を、指定のSlackチャンネルに通知する」
という流れがゴールです。

構築手順と注意点を解説します。

手順概要

  1. ストアドプロシージャを作成する
  2. Lambda用のSecurityGroupを作成する
  3. Lambda用のIAM Roleを作成する
  4. Lambda関数を作成する
  5. LambdaをVPCに設定する
  6. RDSのSecurity Groupを編集する
  7. EventBridgeSchedulerを設定する

手順詳細

1. ストアドプロシージャを作成する

まず、今回使うSQLを該当のRDSにプロシージャとして保存します。
今回は、sample_tableのcreate_dateカラムが前日のもので、かつ、sample_noカラムに指定の文字列から始まるレコードが何件あるか、というSQLになります。

count_number.sql
CREATE DEFINER=`root`@`%` PROCEDURE `sample`.`count_number`(IN report_date DATE)
BEGIN
    SELECT 
        SUM(CASE WHEN sample_no LIKE 'hoge%' THEN 1 ELSE 0 END) AS count_hoge,
        SUM(CASE WHEN sample_no LIKE 'foo%' THEN 1 ELSE 0 END) AS count_foo,
        SUM(CASE WHEN sample_no LIKE 'example%' THEN 1 ELSE 0 END) AS count_example,
        SUM(CASE WHEN sample_no LIKE 'sample%' THEN 1 ELSE 0 END) AS count_sample,
        SUM(CASE WHEN sample_no LIKE 'test%' THEN 1 ELSE 0 END) AS count_test
    FROM mysql57.sample_table  
    WHERE create_date BETWEEN CONCAT(report_date, ' 00:00:00') AND CONCAT(report_date, ' 23:59:59');
END

上記は簡易的なものですが、もっと複雑だったり時間がかかったりする処理を任せる時により役立ちそうですね。

2. Lambda用のSecurityGroupを作成する

EC2 -> SecurityGroupから、セキュリティグループを一つ作成してください。
RDSがあるAWSアカウントのVPCで作成するようにしましょう。
インバウンドルールなどは特に何もなしでOKです。

3. Lambda用のIAM Roleを作成する

以下の権限をLambdaに付与するため、IAMロールを作成します。

・LambdaがCloudwatchlogsにアクセスする権限 = AWSLambdaExecute
・LambdaがVPC(ENI)にアクセスする権限 = AWSLambdaVPCAccessExecutionRole

上記のポリシーが付与されたIAMロールを作成できたら、次のステップでこれをLambdaに設定してあげます。

4. Lambda関数を作成する

作成したストプロを呼び出すLambda関数を作成します。
作成時に、IAMロールの選択が必要になるので、先のステップで作成したものを指定してください。

コードについては以下のPythonコードで作成しました。
ここは適宜用途に沿って変更してください。

index.py
import json
import os
from datetime import datetime, timedelta
import boto3
import requests
import pymysql
import logging
from decimal import Decimal

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# RDS設定
rds_host = os.environ['RDS_HOST']
rds_user = os.environ['RDS_USER']
rds_password = os.environ['RDS_PASSWORD']
rds_db = os.environ['RDS_DB']
slack_webhook_url = os.environ['SLACK_WEBHOOK_URL']

def send_slack_notification(webhook_url, message):
    payload = {
        "text": message
    }
    response = requests.post(webhook_url, data=json.dumps(payload), headers={'Content-Type': 'application/json'})
    if response.status_code != 200:
        raise ValueError(f'Request to Slack returned an error {response.status_code}, the response is:\n{response.text}')

def decimal_default(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError

def lambda_handler(event, context):
    logger.info("Lambda function started")
    
    # 前日の日付を計算
    today = datetime.now()
    yesterday = today - timedelta(1)
    report_date = yesterday.strftime('%Y-%m-%d')
    logger.info(f"Report date: {report_date}")
    
    # RDSに接続
    try:
        connection = pymysql.connect(host=rds_host, user=rds_user, password=rds_password, db=rds_db)
        logger.info("Connected to RDS")
    except Exception as e:
        logger.error(f"Error connecting to RDS: {e}")
        raise
    
    try:
        with connection.cursor() as cursor:
            # ストアドプロシージャを呼び出し
            cursor.callproc('count_number', [report_date])
            result = cursor.fetchall()
            logger.info(f"Stored procedure result: {result}")
            
            # 結果を整形してSlackに通知
            message = f"Report for {report_date}:\n"
            for row in result:
                message += f"hoge: {row[0]}, foo: {row[1]}, example: {row[2]}, sample: {row[3]}, test: {row[4]}\n"
            send_slack_notification(slack_webhook_url, message)
            logger.info("Slack notification sent")
            
    finally:
        connection.close()
        logger.info("RDS connection closed")
    
    return {
        'statusCode': 200,
        'body': json.dumps(result, default=decimal_default)
    }

count_numberはステップ1で作成したストプロの名前です。
以下は環境変数です。

RDS_DB
RDS_HOST
RDS_PASSWORD
RDS_USER
SLACK_WEBHOOK_URL

以下のLambdaの設定画面「環境変数」から、上記5つの変数とその値を登録しましょう。

image.png

ちなみにloggerが返すログは、cloudwatchlogsに出力されています。
CW logsは、Lambdaの「モニタリング」タブの「CloudWatch ログを表示」を押すと遷移できます。
何かでエラーが起きている場合は、そこからデバッグしていくと良いと思います。

補足1:SLACK_WEBHOOK_URL

SLACK_WEBHOOK_URLは、Slackの「Incoming Webhook」というアプリから作成が可能です。
https://slack.com/services/new/incoming-webhook

あらかじめSlackチャンネルを作成しておいてある場合、それを以下リンクから選択し、「Incoming Webhookインテグレーションの追加」を押下すればURLが生成されます。

image.png

補足2:pymysqlとrequests

上記の設定だけでは、pymysqlrequestsモジュールを利用するにあたり、import errorが起きてしまいました(cloudwatchlogsで確認できます)。
そんな場合は、Lambda レイヤーを利用すると、追加ライブラリを使用できるようになります。

Layersは自分で作成することもできますが、今回は以下の記事を参考に、Gitでパッケージ化したものをARNとして用意してくださっている方のものを利用させていただきました。該当がない場合は、自分で作成しましょう。

https://qiita.com/rapirapi/items/faf18994fcc69a1136bf
https://github.com/keithrozario/Klayers/tree/master/deployments

今回は、Python3.11を利用していたので、上記Gitでまずそのディレクトリへ。その後、pymysqlrequestsモジュールのARNをそれぞれコピーしてLambda側に追加してあげます。正常に追加されると、以下のようになります。

image.png

ちなみに、ハンドラは、Pythonファイル名.ハンドラーメソッド名とします。
例えば上記コードのように、「index.py」というPythonファイルを作成し、ハンドラーメソッドとしてlambda_handlerを利用している場合は、index.lambda_handlerというハンドラになります。

デフォルトではlambda_function.lambda_handlerになっていると思います。

5. LambdaをVPCに設定する

(あまりない例だと思いますが、RDSがだれからでもアクセスできる状態の場合はステップ5と6は不要なはず)

今回の構成は、LambdaからRDSのDBに保存してあるストプロへアクセスするという流れ。
RDSは(一般的にも)セキュリティグループのインバウンドルールで特定のアクセスしか許可しないように設定してあります。

そのため、今回LambdaをVPCに設定して、そのLambdaのセキュリティグループからのアクセスをRDS側に許容する必要があります。許容されていない場合、タイムアウトエラーがCloudwatchlogsに出力されます。

Lambdaの設定画面から「VPC」 -> 「編集」を選ぶと、VPC設定が可能です。
セキュリティグループは最初のステップで作成したセキュリティグループを選択します。

image.png

※サブネットは最低二つ選択する必要があります

設定が正常に完了すると以下のようになっているはずです。

image.png

補足: AWSLambdaVPCAccessExecutionRoleがない場合

IAMロールにAWSLambdaVPCAccessExecutionRoleがない場合、上記のVPC設定時に以下のエラーになります。

The provided execution role does not have permissions to call CreateNetworkInterface on EC2

image.png

先のステップでIAMロールを新規作成しこのポリシーを付与したのは、このエラーを回避するためです。
これにより、Lambda関数がVPC内でネットワークインターフェースを作成するための必要な権限を持つようになります。

ちなみに、あくまでこれはLambdaがVPCに設定されるタイミングで「ENIへアクセスできませんよ」というエラーなので、作成時以降は仮にこのRoleを削除してもLambda自体は機能します。ただ、後々LambdaのRoleを見たときに「あれ、これどうやってVPCに設定したんだ?」となってしまうため、特に消したい意図がない限りはつけたままで良いと思います。

参考:

6. RDSのSecurity Groupを編集する

LambdaからRDSへのアクセスの準備はできました。
次に、RDSのセキュリティグループ側で、Lambda側からのアクセスを許容してあげましょう。

RDSのセキュリティグループに移動し、「インバウンドのルールを編集」から、作成したLambdaのセキュリティグループを選択します。

image.png

7. EventBridgeSchedulerを設定する

最後に、定期実行の仕組みを作ります。

EventBridgeのスケジューラから「スケジュールを作成」を押下します。
まずは任意のCron式を入力します。
毎日9時に実施したい場合は以下のようなイメージです。

image.png

次に、「ターゲット」で「頻繁に使用されるAPI」からAWS Lambdaを選択します。
プルダウンで関数が表示されるので、前述のステップで作成した関数名を指定しましょう。

image.png

設定が完了したら、以下の「ターゲット」から該当のLambda関数が選択されているか改めて確認しましょう。

image.png

これで設定完了です。
指定したCron式に従って、毎日9時に指定のSlackチャンネルへ通知がいくようになります。
自動化はなかなか楽しいですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?