2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ログの中身に応じたメール通知する設定をLambdaとSNSで実装してみた

Posted at

はじめに

お客さんより、
”所定のファイルにログがたまっていくから、そのログに応じてメール通知をしてほしい”というご依頼がありました。

AWSのサービスを複数組み合わせて、最終的にメール通知する仕組みを実装してみました。

概要

お客さんからの希望

・ログファイルの出力内容の応じて、メール通知を実施してほしい
・出力先:「Windows server2019」 C:\管理\Log\errorlogs.txt
・メール通知条件:「ERROR」「WARN」という文字列が出力されたとき
以下出力されるログのフォーマットです

YYYY/MM/DD Time ログの種類 拠点 機能: メッセージ

以下希望のメールフォーマットです

件名:$ログの種類(SystemA)
本文:
■拠点:$拠点名

■機能:$機能

■本文:$メッセージ

※ログの種類は「ERROR」「WARN」「INFO」の3種類あります

処理の流れ

処理の流れとしては以下のようになります

  • CloudWatchAgentの設定で、対象のログファイルのログをCloudWatchで監視できるようにする
  • 対象の文字列が出力された場合、そのLambdaに連携する
  • Lamdbda内で、取得した情報を成形し、SNSトピックに渡す
  • SNSトピックのサブスクライブエンドポイントにメールを送信する

順番に説明します

CloudWatchAgentの設定

まずはSSMを使用して、対象のインスタンスにCloudWatchAgentをインストールします
ざっくり
①必要なポリシーをEC2にアタッチ
➁SSMから、RunCommandを用いて対象のインスタンスにCloudWatchAgentをインストール
③CloudWatchAgentの設定で、監視するログファイルの指定

という手順を実施します
詳細は以下のリンクをご参考ください

ここでは、
ロググループ名:TestLog
ログストリーム名:$InstanceId

で指定します。
問題なく設定できていれば、AWS CloudWatchコンソール上から、ロググループとログストリームが確認できます。
※対象のログファイルはCloudwatchの仕様で文字コードの指定があります。今回はUTF-8を使用しました。

image.png

Lambdaの作成

ロググループとログストリームが確認できれば、次は
フィルタの設定し、設定したフィルタの条件に一致した場合、Lambdaを実行する。
という設定をします

なので、先にLambdaを作成します

以下がコードの全貌です。
※この後細かく解説します

※あとでやること
①コードの全体の修正
➁data_jsonの意味が分かりにくいので、data_pythonに変える??

import base64
import json
import zlib
import os
import boto3
from botocore.exceptions import ClientError
import re

print('Loading function')

def lambda_handler(event, context):
    data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)
    data_python = json.loads(data_json)
    log_entire = data_python["logEvents"]
    hostname = data_python['logGroup']
    logname = data_python['logStream']

    print(log_entire)

    for log_python in log_entire:  
        
        # 配列の要素をそれぞれの変数に格納
        text = log_python['message']
        words = re.split(r'\s+', text)

        date, time, level, branchoffice, functionality, *messagecontent = words
        messagecontent = " ".join(messagecontent)
        
        # 件名整形
        subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
        
        # 本文整形
        messagebranch = f"■拠点:{branchoffice}"
        messagefunction = f"■機能:{functionality}"
        content = f"■本文:{messagecontent}"
        msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"

        try:
            sns = boto3.client('sns')
    
            #SNS Publish
            publishResponse = sns.publish(
                TopicArn = os.environ['SNS_TOPIC_ARN'],
                Message = msg,
                Subject = subjectmsg
            )
    
        except Exception as e:
            print(e)
1)サブスクリプションフィルターから渡される情報を整形する
def lambda_handler(event, context):
    data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)
    data_python = json.loads(data_json)
    log_entire = data_python["logEvents"]

まずはこちらのコードについて確認します
そもそもCloudWatchLogsから渡される情報は、base64でエンコードされ、かつ圧縮されています
具体的には以下のようになっています

{
  "awslogs": {
    "data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0b・・・省略
  }
}

そのため、まずはデコードから開始します
デコードと解凍はこちらのコードで実施されています

    data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)

まずzlib.decompressとはPythonの標準ライブラリです。最初にimportしていましたね。
こちらは第一引数に解凍するデータを、第二引数には解凍方法を指定します。

解凍するデータ:base64.b64decode(event['awslogs']['data']
解凍方法:16+zlib.MAX_WBITS

ということですね
ちなみにbase64.b64decode()は、Base64エンコードされたデータをデコードするためのPythonの標準ライブラリ関数です。
これをすることで、バイナリデータから人間が読むことができるJSON文字列へと変換がされます。
その後はjson.loads(data_json)でJSON文字列をPythonオブジェクトに変換しています。json.loads()も、Pythonの標準ライブラリで、前もってインポートしてありますね。

実際のdata_pythonの出力結果がこちらです。

{
    'messageType': 'DATA_MESSAGE',
    'owner': 'アカウントID',
    'logGroup': 'ロググループ名',
    'logStream': 'ログストリーム名',
    'subscriptionFilters': ['サブスクリプションフィルター名'],
    'logEvents': [
        {
            'id': '37486647184891724155561375747897557404224833900580241408',
            'timestamp': 1680959396252,
            'message': '2023/3/31 10:54:40 ERROR 大阪 DB書き込み: エラーが発生しました。###Data:example test sample'
        }
    ]
}

この中から必要な情報を適宜取得し、通知に盛り込んでいきます。

log_entire = data_python["logEvents"]

こちらのコードで、先ほどのdata_jsonの中からlogEventsのみをlog_entire_jsonという変数に格納します。今回はたまたまlogEventsリストの中は要素が一つしかありませんが、要素が複数ある場合も想定して、ループを使用して、後続の処理を継続します。

2)Python文字列から必要な情報を取り出す

まずはfor分の前半から確認していきます。

    for log_python in log_entire:  
        
        # 配列の要素をそれぞれの変数に格納
        text = log_python['message']
        words = re.split(r'\s+', text)

        date, time, level, branchoffice, functionality, *messagecontent = words
        messagecontent = " ".join(messagecontent)
        
        # 件名整形
        subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
        
        # 本文整形
        messagebranch = f"■拠点:{branchoffice}"
        messagefunction = f"■機能:{functionality}"
        content = f"■本文:{messagecontent}"
        msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"

log_entireの中には、以下のような値が格納されてます。

 'logEvents': [
        {
            'id': '37486647184891724155561375747897557404224833900580241408',
            'timestamp': 1680959396252,
            'message': '2023/3/31 10:54:40 ERROR 大阪 DB書き込み: エラーが発生しました。###Data:example test sample'
        }
    ]

今回のメール通知で必要な情報はすべて、「message」の中にスペース区切りで含まれています。
そのため、以下のコードで必要な情報に分けていきます。

text = log_python['message']
words = re.split(r'\s+', text)
date, time, level, branchoffice, functionality, *messagecontent = words
messagecontent = " ".join(messagecontent)      

まず、「message」を「text」という変数に格納します。
次に、re.split関数で、「text」を分割していきます。reは冒頭でインポートしたものの一つでしたね。

①re.split() 関数は、指定した正規表現パターンに一致する部分で文字列を分割し、その結果の部分文字列のリストを返します。この場合、「text」 から空白文字を使って単語を抽出し、それらの単語をリスト words に格納しています。

➁次に「words」の構成要素を決めていきます。「messagecontent*」とすることで、「words」 の残りの要素を 「messagecontent」 という名前のリストにまとめて割り当てます。

③最後に、messagecontent リスト内の文字列要素を、スペース " " を区切り文字としてまとめています。

上記の処理で、「text」から必要な情報をスペース区切りで分割して、それぞれ異なる変数に格納することができました。

3)SNS通知を実施する

まずは、メール本文の内容に整えていきます。

# 件名整形
subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
        
# 本文整形
messagebranch = f"■拠点:{branchoffice}"
messagefunction = f"■機能:{functionality}"
content = f"■本文:{messagecontent}"
msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"

その後、以下のコードでSNS通知をする処理を書いていきます。

try:
    sns = boto3.client('sns')
        #SNS Publish
    publishResponse = sns.publish(
         TopicArn = os.environ['SNS_TOPIC_ARN'],
         Message = msg,
         Subject = subjectmsg
            )

まず最初に、boto3.client()を使って、Amazon SNSクライアントを呼び出します。
その後、作成した SNS クライアントの publish メソッドを呼び出して、通知を送信します。このメソッドには、次の引数が渡されます。
・TopicArn
→Amazon SNS トピックのARN(Amazonリソースネーム)。あらかじめ仕込んでおいた環境変数 SNS_TOPIC_ARN から取得されます。
・Message
→通知に含めるメッセージ。変数 msg に格納されています。
・Subject
→通知の件名。変数 subjectmsg に格納されています。

ここまでで、Lambdaの解説は終了となります。
テストをする際には、Lambdaに用意されているテストではなく、CloudWatchLogs上で新しくイベントを作成する必要があります。

サブスクリプションフィルターの作成

Lambdaも完成したので、次はCloudwatchLogで、特定の文字列を検知した場合、Lambdaを実行する。
という設定をしていきます。

「サブスクリプションフィルターの作成」→「Lambdaサブスクリプションフィルターの作成」
で設定可能です。

image.png

ここでは
①実行するLambda
➁フィルターする文字列の指定

を設定してきます。
以下のような画面で設定できます。

image.png

image.png

はい、こちらでLambdaを実行する流れが設定できました。

実際にCloudwatchLogsにログファイルにたまるログと同じイベントを作成すると、メールが確認出来ました。

image.png

おわりに

実装する際に、文字コードの不一致や、スペースの区切り方で詰まったので、備忘用にまとめてみました。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?