AWS
CloudWatch
lambda

Lambdaファンクションのエラーを通知したい(ファンクション名も込みで)

要件

  • Lambdaファンクションのエラーが発生したら関係者へ通知したい
  • どのファンクションがエラーになったのかを通知内容に含めたい
  • Lambdaファンクションは頻繁に追加したり削除したりするので、ファンクションごとに設定を追加/変更するのは避けたい
  • 既存のLambdaファンクションの設定はなるべくいじりたくない

対応案の検討

  1. CloudWatchのメトリクス(Lambda>全ての関数>エラー(Errors)) にアラームをセットしてSNS経由で通知
    • メリット:設定が最初の一回ですむ。ファンクションの追加や削除の影響を受けない
    • デメリット:どのファンクションがエラーになったかがわからない
  2. CloudWatchのメトリクス(Lambda>関数の名称別>[ファンクション名]>エラー(Errors))にアラームをセットしてSNS経由で通知
    • メリット:ファンクション名がわかる
    • デメリット:ファンクションごとに設定が必要。CloudWatch Alarmが増えるとコストもかかる(誤差だけど…)
  3. Lambdaのデッドレターキュー(DLQ)からSNSで通知
    • メリット:ファンクション名がわかる
    • デメリット
      • すでに別の用途でDLQを使っている場合の対応が難しい。
      • ファンクションごとに設定が必要。
      • Lambdaファンクションの実行ロールにDLQへのアクセス権が必要。
      • リトライが全て失敗した場合のみ検出可能。
      • DLQを使えるのはイベント駆動のファンクションのみ。

いずれも要件にピタリとフィットしてくれない。案1が最も簡単だが肝心のファンクション名が通知されないため採用できない。
案2は考え方はシンプルだがすでに大量のLambdaファンクションを抱えている状況では対応が面倒。ファンクション追加ごとにアラームの設定も必要になるため、設定作業漏れのリスクも。(サーバーレスアプリのデプロイ自動化に組み込めるなら別だが…。)
案3もなかなか魅力的。やりたいことはこちらで紹介されている→【新機能】AWS LambdaがDead Letter Queueをサポートしました だがデメリットを許容できない場合は採用できない。
つまり、「他にもっといい案は…?」となる。

もっといい(と思われる)案

CloudWatchのメトリクスデータ群から一発でエラーの発生しているファンクション名を取得できればいいのだが、サクっとはできそうにない。
仕方がないので以下の処理を実行するLambdaファンクションを作成する。

  1. Lambdaファンクションのリストを取得
  2. Lambdaファンクションごとに、CloudWatchのメトリクスをサーチして直近のエラー発生有無を確認
  3. 全ファンクションのサーチが終わったら結果をSNSへPublish

「CloudWatchのメトリクス(Lambda>全ての関数>エラー(Errors))」のメトリクスにセットしたアラームによって何らかのファンクションのエラー発生を検知し、これをトリガーにしてこのファンクションを起動させる。

1. Lambdaファンクションのリストを取得

まずは特定のリージョンに所属するLambdaのファンクション名をリストで取得する。

sample.py
#Lambdaのファンクション名のリストを返す。
def get_function_names(region_name):
    lmd = boto3.client('lambda',region_name=region_name)
    response = lmd.list_functions()
    function_names = [d['FunctionName'] for d in response['Functions']]
    return function_names

2. Lambdaファンクションごとに、CloudWatchのメトリクスをサーチして直近のエラー発生有無を確認

sample.py
    cloudwatch = boto3.client('cloudwatch',region_name = region)

    #FunctionNameごとに直近時間帯のメトリクスデータを取得
    errorpoints = []
    for function_name in function_names:
        datapoints = cloudwatch.get_metric_statistics(
            Namespace  = namespace,
            MetricName = metric,
            StartTime  = timefrom,
            EndTime    = timeto,
            Period     = 300,
            Dimensions = [function_name],
            Statistics = ['Sum']
        )
        #SUM > 0の場合はエラーが発生している。
        #エラー発生のメトリクスデータを抽出し、errorpointsリストに追加する。
        errorpoint = list(filter(lambda x: x['Sum'] > 0, datapoints['Datapoints']))
        if len(errorpoint) > 0:
            errorpoints.append({"FunctionName": function_name['Value'], "Datapoints": errorpoint})

    #Timestampの逆順(=新しい順)でソート
    sortedList = sorted(errorpoints, key=lambda x:x['Datapoints'][0]['Timestamp'],reverse=True)

function_names は前述の get_function_names 関数の戻り値で、Lambdaファンクション名のリスト。namespace,metric,timefrom,timetoはいずれもCloudWatchメトリクスを抽出するためのフィルタ条件だが、このLambdaファンクションがトリガーとして受信するSNSのメッセージ内容から導出できる。

結果としてsortedListには直近時間帯に発生したLambdaファンクション名とDatapointsが含まれる。Datapointsには

Datapoints.json
[
    {
        "FunctionName": "Test-LambdaFunction-Name",
        "Datapoints": [
            {
                "Timestamp": datetime.datetime(2017, 12, 20, 12, 56, tzinfo=tzlocal()),
                "Sum": 1.0,
                "Unit": "Count"
            }
        ]
    }
]

というふうに、ファンクション名のほかに時刻やメトリクスデータ(エラー発生件数)が含まれる。

3. 全ファンクションのサーチが終わったら結果をSNSへPublish

ここは好みで実装すれば良いと思うが、SNSのトピックARN、タイトル、メッセージ本文をセットしてSNS Publishを実行する。

sample.py
    #件名:直近でエラーになったFunctionNameを件名に含める
    title = alarm_name + " : " + new_state + " : " + sortedList[0]['FunctionName']

    #本文:直近時間帯でエラーになったすべてのFunctionNameとエラー件数を本文に含める
    message = description+ "\n\n"
    for e in sortedList:
        #タイムスタンプは日本時間に変換する
        date = e['Datapoints'][0]['Timestamp'] + datetime.timedelta(hours=9)
        message = message + '\n' + str(date)[:19] + " : " + str(int(e['Datapoints'][0]['Sum'])) + e['Datapoints'][0]['Unit'] + ' : ' + e['FunctionName']

    #SNS Publish
    try:
        response = sns.publish(
            TopicArn = topicarn,
            Message  = message,
            Subject  = title
        )

    except Exception as e:
        print(e)
        raise e

最終的な構成

CloudWatch Alarm -> SNS -> Lambda -> SNS
CloudWatch Alarmは「CloudWatchのメトリクス(Lambda>全ての関数>エラー(Errors))」のメトリクスに紐付け、何らかのLambdaファンクションでエラーが発生したら発動する。一つ目のSNSはLambdaファンクションを呼び出すためのもの。二つ目のSNSはCloudWatchメトリクスから直近時間帯にエラーになったLambdaファンクションのメトリクスを抽出し、いい感じに整形した文面を関係者へ通知するためのもの。

スクリプト

lambda_function.py
# -*- coding: utf-8 -*-
import boto3
import json
import datetime

sts = boto3.client('sts')
sns = boto3.client('sns')

#SNSトピックのARN
topicarn = "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:topicname"

#何分前からのメトリクスを検索対象とするか
TIME_FROM_MIN=10

#Lambdaファンクションのファンクション名のリストを返す。
def get_function_names(region_name):
    lmd = boto3.client('lambda',region_name=region_name)
    response = lmd.list_functions()
    function_names = [d['FunctionName'] for d in response['Functions']]
    return function_names


def lambda_handler(event, context):
    message = event['Records'][0]['Sns']['Message']
    print("SNS Message: " + message)
    message = json.loads(message)

    alarm_name  = message['AlarmName']             #任意に指定したアラーム名
    new_state   = message['NewStateValue']         #普通は"ALARM"
    description = message['AlarmDescription']      #"Lambdaファンクションでエラーが発生しました"など、任意に指定したアラームの説明文
    metric      = message['Trigger']['MetricName'] #"Errors"
    namespace   = message['Trigger']['Namespace']  #"AWS/Lambda"

    #CloudWatchのリージョン判定
    #どのリージョンでエラーが発生したかを判定する。ここではus-east-1 or ap-northeast-1のみを判定しているが、実際の稼働環境にあわせて変更すること。
    region = 'us-east-1' if "Virginia" in message['Region'] else "ap-northeast-1"
    cloudwatch = boto3.client('cloudwatch',region_name = region)

    #Lambdaファンクション名のリストを取得する
    function_names = get_function_names(region)

    #TIME_FROM_MIN分前からStateChangeTimeの一分後までのメトリクスを対象とする
    timeto = datetime.datetime.strptime(message['StateChangeTime'][:19] ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
    timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)

    #FunctionNameごとに直近時間帯のメトリクスデータを取得
    errorpoints = []
    for function_name in function_names:
        datapoints = cloudwatch.get_metric_statistics(
            Namespace  = namespace,
            MetricName = metric,
            StartTime  = timefrom,
            EndTime    = timeto,
            Period     = 300,
            Dimensions = [
                {
                    'Name': 'FunctionName',
                    'Value': function_name
                }
            ],
            Statistics = ['Sum']
        )
        #SUM > 0の場合はエラーが発生している。
        #エラー発生のメトリクスデータを抽出し、errorpointsリストに追加する。
        errorpoint = list(filter(lambda x: x['Sum'] > 0, datapoints['Datapoints']))
        if len(errorpoint) > 0:
            errorpoints.append({"FunctionName": function_name, "Datapoints": errorpoint})

    #Timestampの逆順で(=新しい順に)ソート
    sortedList = sorted(errorpoints, key=lambda x:x['Datapoints'][0]['Timestamp'],reverse=True)

    #件名:直近でエラーになったFunctionNameを件名に含める
    title = alarm_name + " : " + new_state + " : " + sortedList[0]['FunctionName']

    #本文:直近時間帯でエラーになったすべてのFunctionNameとエラー件数を本文に含める
    message = description + "\n\n"
    for e in sortedList:
        #タイムスタンプは日本時刻に変換する
        date = e['Datapoints'][0]['Timestamp'] + datetime.timedelta(hours=9)
        message = message + '\n' + str(date)[:19] + " : " + str(int(e['Datapoints'][0]['Sum'])) + e['Datapoints'][0]['Unit'] + ' : ' + e['FunctionName']

    #SNS Publish
    try:
        response = sns.publish(
            TopicArn = topicarn,
            Message = message,
            Subject = title
        )

    except Exception as e:
        print(e)
        raise e

補足

  • Lambdaファンクションの数だけループを回してCloudWatchメトリクスをサーチしている。ファンクション数が10や20くらいなら問題ないが、100や200になってくるとどうなるか不明。
  • このファンクション自体がバグって正常に動作しない場合を考慮して、このファンクションだけはCloudWatchメトリクスのLambda>関数の名称別>[このファンクションの名前]>エラー(Errors)にアラームを紐付けてSNS→メールといった通知を確保しておくと安心。

参考