概要
AWS Batch -> CloudWatch -> SNS -> Lambda -> Slack
AWS BatchがFailed
になった場合にSlackへ通知する流れを作成しました。
本記事では、Lambda関数に記載するコードを紹介します。
※Role含め、これらのリソースをCloudFormationで一括作成する記事も別途投稿します。
通知フロー
以下の流れで処理が行われます。
AWS Batch:ジョブ実行結果(ステータス)がFailed
になる
↓
CloudWatchEvents:Failed
の情報を収集し、トリガーイベントとしてSNSに送信
↓
SNS:CloudWatchから受信したエラー情報をSNSメッセージとしてパブリッシュ
↓
Lambda:SNSから受信したメッセージをトリガーとして起動(Lambda関数がSNSトピックにサブスクライブされており、SNSメッセージがこの関数に送信される)
↓
Lambda関数:ステータスが"FAILED"
である場合、Slack通知メッセージを構築し、指定されたSlackウェブフックURLを使用して通知をSlackに送信する
CloudWatchEventsイベントパターン
AWS Batchからevent
を受け取る場合のCloudWatchEventsイベントパターンは、以下公式ドキュメントをご参考ください。
Ruleのイベントパターンは以下のような形になります。
{
"detail-type": ["Batch Job State Change"],
"source": ["aws.batch"],
"detail": {
"status": ["FAILED"]
}
}
ターゲットにSNSトピック、もしくは直接Lambda関数を指定します。
下記に、SNSの場合とLambdaの場合の関数コードを記載しました。
AWS Batch -> CloudWatchEvents -> Lambda -> Slack
AWS CloudWatchEventsからLambdaに直接連携する場合は以下の通り。
SLACK_WEBHOOK_URL
には、Incoming Webhookであらかじめ作成してあるURLを、Lambdaの環境変数で設定してある前提です。
また、確認のためprint
デバッグでCloudWatchLogsにevent
内容をプリントしておきます。
#!/usr/bin/python3.7
import json
import boto3
import os
import requests
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
slack_webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
print (event)
event_detail = event['detail']
print (event_detail)
detail_status = event_detail['status']
print (detail_status)
event_time_utc = event['time']
event_time_utc = datetime.fromisoformat(event_time_utc[:-1])
event_time_local = event_time_utc.astimezone(timezone(timedelta(hours=9)))
print("ローカル時間:", event_time_local)
if detail_status == 'FAILED':
message = f"BatchジョブがFAILEDしました。\n時間: {event_time_local}\nジョブ名: {event_detail['jobName']}\nジョブID: {event_detail['jobId']}"
print(message)
send_slack_notification(slack_webhook_url, message)
def send_slack_notification(webhook_url, message):
payload = {
"text": message
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
if response.status_code != 200:
print(f"Slack通知の送信に失敗しました。ステータスコード: {response.status_code}")
else:
print("Slack通知が正常に送信されました。")
AWS Batch -> CloudWatchEvents -> SNS -> Lambda -> Slack
SNS経由の場合は以下のようになります。
['Records'][0]['Sns']['Message']
は、SNSトピックから送信されたメッセージの中からメッセージ本文を取得する方法です。この中に、event
内容が入っている形ですね。
#!/usr/bin/python3.7
import json
import boto3
import os
import requests
from datetime import datetime, timezone, timedelta
def lambda_handler(event, context):
slack_webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
print (event)
sns_message = json.loads(event['Records'][0]['Sns']['Message'])
event_detail = sns_message['detail']
print (event_detail)
detail_status = event_detail['status']
print (detail_status)
event_time_utc = sns_message['time']
event_time_utc = datetime.fromisoformat(event_time_utc[:-1])
event_time_local = event_time_utc.astimezone(timezone(timedelta(hours=9)))
print("ローカル時間:", event_time_local)
if detail_status == 'FAILED':
message = f"BatchジョブがFAILEDしました。\n時間: {event_time_local}\nジョブ名: {event_detail['jobName']}\nジョブID: {event_detail['jobId']}"
print(message)
send_slack_notification(slack_webhook_url, message)
def send_slack_notification(webhook_url, message):
payload = {
"text": message
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(webhook_url, data=json.dumps(payload), headers=headers)
if response.status_code != 200:
print(f"Slack通知の送信に失敗しました。ステータスコード: {response.status_code}")
else:
print("Slack通知が正常に送信されました。")
SNSを使う場合は、メールに通知させることもできるので、必要な場合は別途設定しましょう。
Slack通知
正常に送信されると、Slackの該当チャンネルに以下の様な通知がきます。
BatchジョブがFAILEDしました。
時間: 2023-09-30 19:47:13+09:00
ジョブ名: test-100
ジョブID: XXXXXXX-XXXX-XXXX-XXXX-XXXXXXX
AWS Batchのジョブ発行もCloudWatchEventRuleを使えば、定期的なスケジュールで実行可能です。
こうやってSlack通知までサーバレス・自動で実現できると面白いですね。