はじめに
お客さんより、
”所定のファイルにログがたまっていくから、そのログに応じてメール通知をしてほしい”というご依頼がありました。
AWSのサービスを複数組み合わせて、最終的にメール通知する仕組みを実装してみました。
概要
お客さんからの希望
・ログファイルの出力内容の応じて、メール通知を実施してほしい
・出力先:「Windows server2019」 C:\管理\Log\errorlogs.txt
・メール通知条件:「ERROR」「WARN」という文字列が出力されたとき
以下出力されるログのフォーマットです
YYYY/MM/DD Time ログの種類 拠点 機能: メッセージ
以下希望のメールフォーマットです
件名:$ログの種類(SystemA)
本文:
■拠点:$拠点名
■機能:$機能
■本文:$メッセージ
※ログの種類は「ERROR」「WARN」「INFO」の3種類あります
処理の流れ
処理の流れとしては以下のようになります
- CloudWatchAgentの設定で、対象のログファイルのログをCloudWatchで監視できるようにする
- 対象の文字列が出力された場合、そのLambdaに連携する
- Lamdbda内で、取得した情報を成形し、SNSトピックに渡す
- SNSトピックのサブスクライブエンドポイントにメールを送信する
順番に説明します
CloudWatchAgentの設定
まずはSSMを使用して、対象のインスタンスにCloudWatchAgentをインストールします
ざっくり
①必要なポリシーをEC2にアタッチ
➁SSMから、RunCommandを用いて対象のインスタンスにCloudWatchAgentをインストール
③CloudWatchAgentの設定で、監視するログファイルの指定
という手順を実施します
詳細は以下のリンクをご参考ください
ここでは、
ロググループ名:TestLog
ログストリーム名:$InstanceId
で指定します。
問題なく設定できていれば、AWS CloudWatchコンソール上から、ロググループとログストリームが確認できます。
※対象のログファイルはCloudwatchの仕様で文字コードの指定があります。今回はUTF-8を使用しました。
Lambdaの作成
ロググループとログストリームが確認できれば、次は
フィルタの設定し、設定したフィルタの条件に一致した場合、Lambdaを実行する。
という設定をします
なので、先にLambdaを作成します
以下がコードの全貌です。
※この後細かく解説します
※あとでやること
①コードの全体の修正
➁data_jsonの意味が分かりにくいので、data_pythonに変える??
import base64
import json
import zlib
import os
import boto3
from botocore.exceptions import ClientError
import re
print('Loading function')
def lambda_handler(event, context):
data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)
data_python = json.loads(data_json)
log_entire = data_python["logEvents"]
hostname = data_python['logGroup']
logname = data_python['logStream']
print(log_entire)
for log_python in log_entire:
# 配列の要素をそれぞれの変数に格納
text = log_python['message']
words = re.split(r'\s+', text)
date, time, level, branchoffice, functionality, *messagecontent = words
messagecontent = " ".join(messagecontent)
# 件名整形
subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
# 本文整形
messagebranch = f"■拠点:{branchoffice}"
messagefunction = f"■機能:{functionality}"
content = f"■本文:{messagecontent}"
msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"
try:
sns = boto3.client('sns')
#SNS Publish
publishResponse = sns.publish(
TopicArn = os.environ['SNS_TOPIC_ARN'],
Message = msg,
Subject = subjectmsg
)
except Exception as e:
print(e)
1)サブスクリプションフィルターから渡される情報を整形する
def lambda_handler(event, context):
data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)
data_python = json.loads(data_json)
log_entire = data_python["logEvents"]
まずはこちらのコードについて確認します
そもそもCloudWatchLogsから渡される情報は、base64でエンコードされ、かつ圧縮されています
具体的には以下のようになっています
{
"awslogs": {
"data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0b・・・省略
}
}
そのため、まずはデコードから開始します
デコードと解凍はこちらのコードで実施されています
data_json = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS)
まずzlib.decompressとはPythonの標準ライブラリです。最初にimportしていましたね。
こちらは第一引数に解凍するデータを、第二引数には解凍方法を指定します。
解凍するデータ:base64.b64decode(event['awslogs']['data']
解凍方法:16+zlib.MAX_WBITS
ということですね
ちなみにbase64.b64decode()は、Base64エンコードされたデータをデコードするためのPythonの標準ライブラリ関数です。
これをすることで、バイナリデータから人間が読むことができるJSON文字列へと変換がされます。
その後はjson.loads(data_json)でJSON文字列をPythonオブジェクトに変換しています。json.loads()も、Pythonの標準ライブラリで、前もってインポートしてありますね。
実際のdata_pythonの出力結果がこちらです。
{
'messageType': 'DATA_MESSAGE',
'owner': 'アカウントID',
'logGroup': 'ロググループ名',
'logStream': 'ログストリーム名',
'subscriptionFilters': ['サブスクリプションフィルター名'],
'logEvents': [
{
'id': '37486647184891724155561375747897557404224833900580241408',
'timestamp': 1680959396252,
'message': '2023/3/31 10:54:40 ERROR 大阪 DB書き込み: エラーが発生しました。###Data:example test sample'
}
]
}
この中から必要な情報を適宜取得し、通知に盛り込んでいきます。
log_entire = data_python["logEvents"]
こちらのコードで、先ほどのdata_jsonの中からlogEventsのみをlog_entire_jsonという変数に格納します。今回はたまたまlogEventsリストの中は要素が一つしかありませんが、要素が複数ある場合も想定して、ループを使用して、後続の処理を継続します。
2)Python文字列から必要な情報を取り出す
まずはfor分の前半から確認していきます。
for log_python in log_entire:
# 配列の要素をそれぞれの変数に格納
text = log_python['message']
words = re.split(r'\s+', text)
date, time, level, branchoffice, functionality, *messagecontent = words
messagecontent = " ".join(messagecontent)
# 件名整形
subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
# 本文整形
messagebranch = f"■拠点:{branchoffice}"
messagefunction = f"■機能:{functionality}"
content = f"■本文:{messagecontent}"
msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"
log_entireの中には、以下のような値が格納されてます。
'logEvents': [
{
'id': '37486647184891724155561375747897557404224833900580241408',
'timestamp': 1680959396252,
'message': '2023/3/31 10:54:40 ERROR 大阪 DB書き込み: エラーが発生しました。###Data:example test sample'
}
]
今回のメール通知で必要な情報はすべて、「message」の中にスペース区切りで含まれています。
そのため、以下のコードで必要な情報に分けていきます。
text = log_python['message']
words = re.split(r'\s+', text)
date, time, level, branchoffice, functionality, *messagecontent = words
messagecontent = " ".join(messagecontent)
まず、「message」を「text」という変数に格納します。
次に、re.split関数で、「text」を分割していきます。reは冒頭でインポートしたものの一つでしたね。
①re.split() 関数は、指定した正規表現パターンに一致する部分で文字列を分割し、その結果の部分文字列のリストを返します。この場合、「text」 から空白文字を使って単語を抽出し、それらの単語をリスト words に格納しています。
➁次に「words」の構成要素を決めていきます。「messagecontent*」とすることで、「words」 の残りの要素を 「messagecontent」 という名前のリストにまとめて割り当てます。
③最後に、messagecontent リスト内の文字列要素を、スペース " " を区切り文字としてまとめています。
上記の処理で、「text」から必要な情報をスペース区切りで分割して、それぞれ異なる変数に格納することができました。
3)SNS通知を実施する
まずは、メール本文の内容に整えていきます。
# 件名整形
subjectmsg = level + "(GRAD)" + "伝送処理障害検知"
# 本文整形
messagebranch = f"■拠点:{branchoffice}"
messagefunction = f"■機能:{functionality}"
content = f"■本文:{messagecontent}"
msg = f"{messagebranch}\n\n{messagefunction}\n\n{content}"
その後、以下のコードでSNS通知をする処理を書いていきます。
try:
sns = boto3.client('sns')
#SNS Publish
publishResponse = sns.publish(
TopicArn = os.environ['SNS_TOPIC_ARN'],
Message = msg,
Subject = subjectmsg
)
まず最初に、boto3.client()を使って、Amazon SNSクライアントを呼び出します。
その後、作成した SNS クライアントの publish メソッドを呼び出して、通知を送信します。このメソッドには、次の引数が渡されます。
・TopicArn
→Amazon SNS トピックのARN(Amazonリソースネーム)。あらかじめ仕込んでおいた環境変数 SNS_TOPIC_ARN から取得されます。
・Message
→通知に含めるメッセージ。変数 msg に格納されています。
・Subject
→通知の件名。変数 subjectmsg に格納されています。
ここまでで、Lambdaの解説は終了となります。
テストをする際には、Lambdaに用意されているテストではなく、CloudWatchLogs上で新しくイベントを作成する必要があります。
サブスクリプションフィルターの作成
Lambdaも完成したので、次はCloudwatchLogで、特定の文字列を検知した場合、Lambdaを実行する。
という設定をしていきます。
「サブスクリプションフィルターの作成」→「Lambdaサブスクリプションフィルターの作成」
で設定可能です。
ここでは
①実行するLambda
➁フィルターする文字列の指定
を設定してきます。
以下のような画面で設定できます。
はい、こちらでLambdaを実行する流れが設定できました。
実際にCloudwatchLogsにログファイルにたまるログと同じイベントを作成すると、メールが確認出来ました。
おわりに
実装する際に、文字コードの不一致や、スペースの区切り方で詰まったので、備忘用にまとめてみました。