0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

alarm lambda

Posted at
import json
import boto3
from datetime import datetime, timezone

# SNS クライアントを初期化(メール通知用)
sns_client = boto3.client('sns')

# SNS トピック ARN をご自身のものに置き換えてください
SNS_TOPIC_ARN = 'arn:aws:sns:your-region:your-account-id:your-topic-name'

def lambda_handler(event, context):
    # デバッグ用にイベントを出力
    print("Received event: " + json.dumps(event))
    
    # CloudWatch Alarm からのイベントを解析
    alarm_data = event.get('alarmData', {})
    alarm_name = alarm_data.get('alarmName', '')
    state = alarm_data.get('state', {})
    reason = state.get('reason', '')
    region = event.get('region', '')
    timestamp = state.get('timestamp', '')
    metrics = extract_metrics(alarm_data.get('configuration', {}))
    
    # 日付と時刻のフォーマットを調整
    occurrence_time = format_timestamp(timestamp)
    
    # 比較のために小文字に変換
    reason_lower = reason.lower()
    alarm_name_lower = alarm_name.lower()
    
    # 第一段階のフィルタリング:reason に特定のキーワードが含まれるかチェック
    if ('no datapoints' in reason_lower) or ('insufficient data' in reason_lower) or ('unknown' in reason_lower):
        # メール A - データ不明
        subject = '【データ不明】アラームが発生しました'
        message = f'''発生時間:{occurrence_time}
発生アラーム名:{alarm_name}
発生メトリクス:{metrics}

理由:
{reason}

詳細はdevops環境のAWS Cloudwatchコンソールでご確認ください。'''
        send_email(message, subject)
    else:
        # 第二段階のフィルタリング:alarmName に特定のキーワードが含まれるかチェック
        if 'warning' in alarm_name_lower:
            # メール B - 警告
            subject = '【警告】アラームが発生しました'
        elif 'danger' in alarm_name_lower:
            # メール C - 危険
            subject = '【危険】重大なアラームが発生しました'
        else:
            # メール D - アラーム状態
            subject = '【アラーム状態】アラームが発生しました'

        message = f'''発生時間:{occurrence_time}
発生アラーム名:{alarm_name}
発生メトリクス:{metrics}

理由:
{reason}

詳細はdevops環境のAWS Cloudwatchコンソールでご確認ください。'''
        send_email(message, subject)

def extract_metrics(configuration):
    """
    メトリクス情報を抽出して文字列にフォーマット
    """
    metrics = configuration.get('metrics', [])
    metric_list = []
    for metric in metrics:
        metric_info = metric.get('metricStat', {}).get('metric', {})
        namespace = metric_info.get('namespace', '')
        metric_name = metric_info.get('name', '')
        dimensions = metric_info.get('dimensions', {})
        dimensions_str = ', '.join([f"{k}={v}" for k, v in dimensions.items()])
        metric_list.append(f"{namespace}/{metric_name} ({dimensions_str})")
    return '; '.join(metric_list)

def format_timestamp(timestamp_str):
    """
    タイムスタンプ文字列を日本時間にフォーマット
    """
    if timestamp_str:
        # タイムスタンプをパース
        timestamp = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S.%f%z')
        # 日本時間に変換
        jst_time = timestamp.astimezone(timezone.utc).astimezone(timezone(offset=timezone(timedelta(hours=9))))
        return jst_time.strftime('%Y-%m-%d %H:%M:%S %Z')
    else:
        return 'N/A'

def send_email(message, subject):
    response = sns_client.publish(
        TopicArn=SNS_TOPIC_ARN,
        Message=message,
        Subject=subject
    )
    print('Email sent! Message ID:')
    print(response['MessageId'])
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?