概要
AWS Lambdaを使ってSlackチャンネルへ通知を行うCloudFormationテンプレートを作成しましたので、紹介いたします。
CFnテンプレート
BatchLambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
FunctionName: sample-function
Runtime: python3.11
Environment:
Variables:
SLACK_WEBHOOK_URL: !Sub ${SlackWebhookURL}
Code:
ZipFile: |
#!/usr/bin/python3.11
import json
import os
import urllib3
def lambda_handler(event, context):
slack_webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
message = {
"text": "HERE IS YOUR MESSAGE!!"
}
http = urllib3.PoolManager()
response = http.request(
"POST",
slack_webhook_url,
body=json.dumps(message).encode('utf-8'),
headers={"Content-Type": "application/json"}
)
return {
"statusCode": response.status,
"body": response.data.decode('utf-8')
}
コードの補足
SlackチャンネルのIncoming webhookは事前に作成し、パラメータSlackWebhookURL
として設定しておいてください。Roleも適切な権限のものを記載してください。
ランタイムのpythonのバージョンは3.11
にしています。
※2024年10月現在、管理コンソールから確認すると3.7
(2023年6月EOL)は当然ですが既にサポート対象外になっていました(3.8
は2024年10月EOLなのでこれももうすぐ)
urllib3.PoolManager()
では、HTTPリクエストを管理するためのプールマネージャを作成しています(urllib3
はHTTP通信を扱うためのPythonライブラリ)。 そして、PoolManager
オブジェクトのrequest
メソッドを呼び出して、HTTPリクエストを送信(=SlackのWebhook URLに対してPOSTリクエストを送信)、という流れです。
今回はPOST
を利用していますが、('GET', url)
といった形で指定すればGETリクエストを送信できます。
最後のreturn
文は、Lambda関数の戻り値です。
上記では特にログ出力していないですが、ログに残したい場合はprint
文を使用すればOKです。
通知の流れ
EventBridgeルールで「xxのステータスがxxになったら」などと条件指定し、ターゲットを上記にすれば特定の条件下のみ動くルールにすることができます。
また、SNSトピックを利用したりアラームで閾値を設定することも可能です。
以前似た記事としてこちらも作成したのでご参考までに。