Help us understand the problem. What is going on with this article?

CloudWatchでエラーログの内容を通知させたい

More than 1 year has passed since last update.

前置き

CloudWatch Logsの収集対象としているログにErrorという文字列が出力されたらSNSで通知したい。
CloudWatch Logsのロググループにメトリクスフィルタを設定し、このメトリクスが所定のしきい値を超えたらSNSへ連携するアクションをCloudWatch Alarmとして設定すれば実現できる。しかしこの方法だとErrorという文字列が出力されたことは認識できるが、ログの内容までは通知されない。

CloudWatch Alarm

まず、CloudWatch AlarmからSNSに渡される情報を見てみよう。

message.json
{
    "AlarmName": "sample-error",
    "AlarmDescription": "sampleでエラーが発生しました。",
    "AWSAccountId": "xxxxxxxxxxxx",
    "NewStateValue": "ALARM",
    "NewStateReason": "Threshold Crossed: 1 datapoint [2.0 (29/11/17 01:09:00)] was greater than or equal to the threshold (1.0).",
    "StateChangeTime": "2017-11-29T01:10:32.907+0000",
    "Region": "Asia Pacific (Tokyo)",
    "OldStateValue": "OK",
    "Trigger": {
        "MetricName": "sample-metric",
        "Namespace": "LogMetrics",
        "StatisticType": "Statistic",
        "Statistic": "SUM",
        "Unit": null,
        "Dimensions": [],
        "Period": 60,
        "EvaluationPeriods": 1,
        "ComparisonOperator": "GreaterThanOrEqualToThreshold",
        "Threshold": 1,
        "TreatMissingData": "- TreatMissingData:                    NonBreaching",
        "EvaluateLowSampleCountPercentile": ""
    }
}

たしかに、ログの内容は全く含まれていない。その代わりTrigger内にこのアラームに紐づくメトリクスの情報が含まれている。これを使って欲しい情報を辿っていく。

メトリクスフィルタ

MetricNameNamespaceを指定すればメトリクスフィルタの情報を取得することができる。

sample.py
        logs = boto3.client('logs')

        metricfilters = logs.describe_metric_filters(
            metricName = message['Trigger']['MetricName'] ,
            metricNamespace = message['Trigger']['Namespace']
        )

取得したメトリクスフィルタはこんなかんじ。

metricsfilters.json
{
  "metricFilters": [
    {
      "filterName": "sample-filter",
      "filterPattern": "Error",
      "metricTransformations": [
        {
          "metricName": "sample-metric",
          "metricNamespace": "LogMetrics",
          "metricValue": "1"
        }
      ],
      "creationTime": 1493029160596,
      "logGroupName": "sample-loggroup"
    }
  ],
  "ResponseMetadata": {
    "RequestId": "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "210",
      "date": "Wed, 29 Nov 2017 01:10:33 GMT"
    },
    "RetryAttempts": 0
  }
}

filterPatternlogGroupNameが取得できる。これがあればCloudWatch Logsからログイベントを抽出できそうだ。

開始時刻と終了時刻

ログデータを抽出するためにはfilterPatternlogGroupNameのほかに、フィルタリング対象の開始時刻と終了時刻を指定したい。この時刻はUNIX Timeである必要がある。こちらのサイトを参考にさせていただく。

sample.py
        #ログストリームの抽出対象時刻をUNIXタイムに変換(取得期間は TIME_FROM_MIN 分前以降)
        #終了時刻はアラーム発生時刻の1分後
        timeto = datetime.datetime.strptime(message['StateChangeTime'][:19] ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
        u_to = calendar.timegm(timeto.utctimetuple()) * 1000
        #開始時刻は終了時刻のTIME_FROM_MIN分前
        timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)
        u_from = calendar.timegm(timefrom.utctimetuple()) * 1000

ログイベントの取得

これで材料が揃った。CloudWatch Logsからログイベントを取得する。

sample.py
        response = logs.filter_log_events(
            logGroupName = loggroupname ,
            filterPattern = filterpattern,
            startTime = u_from,
            endTime = u_to,
            limit = OUTPUT_LIMIT
        )

responseはこんなかんじ。

response.json
{
  "events": [
    {
      "logStreamName": "sample-stream",
      "timestamp": 1511942974313,
      "message": "Errorが発生しました。sample messageです。",
      "ingestionTime": 1510943004111,
      "eventId": "11111111111111111111111111111111111111111111111111111112"
    },
    {
      "logStreamName": "sample-stream",
      "timestamp": 1511942974443,
      "message": "またまたErrorが発生しました。sample messageです。",
      "ingestionTime": 1510943004111,
      "eventId": "11111111111111111111111111111111111111111111111111111112"
    }
  ],
  "searchedLogStreams": [
    {
      "logStreamName": "sample-stream",
      "searchedCompletely": true
    }
  ],
  "ResponseMetadata": {
    "RequestId": "xxxxxxxxxxxxxxxxxxxxxx",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "x-amzn-requestid": "xxxxxxxxxxxxxxxxxxxxxx",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "1000",
      "date": "Wed, 29 Nov 2017 01:10:33 GMT"
    },
    "RetryAttempts": 0
  }
}

フィルタ条件にマッチする複数件のログイベントを取得できる。

メッセージの整形

仕上げに、通知する文面の整形を行う。またまた先程のサイトを参考にさせていただく。

sample.py
        #メッセージの整形
        log_message = u""
        for e in response['events']:
            #UNIX時刻をUTCへ変換後、日本時間に変更している
            date = datetime.datetime.fromtimestamp(int(str(e['timestamp'])[:10])) + datetime.timedelta(hours=9)
            log_message = log_message + '\n' + str(date) + ' : ' + e['message']

        #SNSのタイトル、本文
        title = message['NewStateValue'] + " : " + message['AlarmName']
        sns_message = message['AlarmDescription'] + '\n' + log_message

最終的な構成

CloudWatch Alarm -> SNS -> Lambda -> SNS
一つ目のSNSはLambdaファンクションを呼び出すためのもの。二つ目のSNSはいい感じに整形した文面を関係者へ通知するためのもの。

スクリプト

サンプルとして拙いスクリプトを挙げておく。例外処理がテキトウなのはご愛嬌。

lambda_function.py
# -*- coding: utf-8 -*-
import boto3
import json
import datetime
import calendar

#通知先SNSトピックのARN
TOPIC_ARN = "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:topic-name"
#抽出するログデータの最大件数
OUTPUT_LIMIT=5
#何分前までを抽出対象期間とするか
TIME_FROM_MIN=10

sns = boto3.client('sns')

def lambda_handler(event, context):
    message = json.loads(event['Records'][0]['Sns']['Message'])

    #SNSフォーマットの作成
    try:
        #CloudWatchのリージョン情報は message['Region'] に含まれる。
        #もしLambdaのリージョンと異なる場合は考慮が必要だが、ここでは同一リージョンを想定する。
        #(逆に言うと複数リージョンのCloudWatchAlarmを単一リージョンのLambdaが捌くことも可能)
        logs = boto3.client('logs')
        #logs = boto3.client('logs',region_name='xxxxxxxx') #←リージョンを指定する場合

        #MetricNameとNamespaceをキーにメトリクスフィルタの情報を取得する。
        metricfilters = logs.describe_metric_filters(
            metricName = message['Trigger']['MetricName'] ,
            metricNamespace = message['Trigger']['Namespace']
        )

        #ログストリームの抽出対象時刻をUNIXタイムに変換(取得期間は TIME_FROM_MIN 分前以降)
        #終了時刻はアラーム発生時刻の1分後
        timeto = datetime.datetime.strptime(message['StateChangeTime'][:19] ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
        u_to = calendar.timegm(timeto.utctimetuple()) * 1000
        #開始時刻は終了時刻のTIME_FROM_MIN分前
        timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)
        u_from = calendar.timegm(timefrom.utctimetuple()) * 1000

        #ログストリームからログデータを取得
        response = logs.filter_log_events(
            logGroupName = metricfilters['metricFilters'][0]['logGroupName'] ,
            filterPattern = metricfilters['metricFilters'][0]['filterPattern'],
            startTime = u_from,
            endTime = u_to,
            limit = OUTPUT_LIMIT
        )

        #メッセージの整形
        log_message = u""
        for e in response['events']:
            #UNIX時刻をUTCへ変換後、日本時間に変更している
            date = datetime.datetime.fromtimestamp(int(str(e['timestamp'])[:10])) + datetime.timedelta(hours=9)
            log_message = log_message + '\n' + str(date) + ' : ' + e['message']

        #SNSのタイトル、本文整形
        title = message['NewStateValue'] + " : " + message['AlarmName']
        sns_message = message['AlarmDescription'] + '\n' + log_message

    except Exception as e:
        print(e)
        sns_message = message
        title = "error"

    #SNS Publish
    try:
        response = sns.publish(
            TopicArn = TOPIC_ARN,
            Message = sns_message,
            Subject = title
        )

    except Exception as e:
        print(e)
        raise e

参考文献

onooooo
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした