アドカレ案内
- ← 10日目 @taniyam さん BigQueryで配列をヒストグラム用のデータに加工するUDFなど
- → 12日目 @saitotakashi さん GASでChatworkを自由自在に操る方法
ピカピカの新卒1年目のuetashです。
ちょっとAWSをさわる機会があり、ググっても日本語でヒットしなかったりしたので、この機会にナレッジを共有したいと思います。
tl; dr
サブスクリプションフィルタを使いましょう。 以上!
あとこの記事最高です: https://dev.classmethod.jp/cloud/aws/notify-error-cloudwatch-logs-with-lambda/
背景・問題
あるとき、CloudWatch-Logsに特定のログが来た場合に、その内容をSlackに通知をしたいということがありました。
この際、CloudWatch-Logsにメトリクスフィルタを用意して、CloudWatch Alarm経由でそのイベントが起きたタイミングで、Lambdaを発火させる用にすることを考えていました。
このとき「Lambda上でログの内容を取得したい」ということを追加で行う必要があることがわかりました。
となると、Alarm経由では「特定のログが来たこと」は分かるのですが、「特定のログの内容」を知るためには、いい感じにさかのぼってあげる必要があるようでした。
(こちらの記事を参考にしていました https://qiita.com/onooooo/items/f59c69e30dc5b477f9fd )
もっと簡単にできればなぁ、とぼんやり考えているところに天啓を受けました。
神(サブスクリプションフィルタ…サブスクリプションフィルタを使うのです…………)
サブスクリプションフィルタとは
公式にサイトによると
サブスクリプションを使用して CloudWatch Logs からのログイベントのリアルタイムフィードにアクセスし、カスタム処理、分析、他のシステムへのロードを行うために、Amazon Kinesis ストリーム、Amazon Kinesis Data Firehose ストリーム、AWS Lambda などの他のサービスに配信することができます。ログイベントのサブスクリプションを開始するには、Kinesis ストリームなど、イベントを配信する宛先ソースを作成します。サブスクリプションフィルタは、AWS リソースに配信されるログイベントのフィルタリングに使用するフィルタパターンと、一致するログイベントの送信先に関する情報を定義します。
とのこと。なんとなくリアルタイム処理ができそうなことは伺えました。
実際にCloudWatch Logsを見に行くとすごーーーーく、わかりにくいところにサブスクリプションフィルタを設定できる場所がありました。引用ですが図を乗せておきます。ロググループのアクションのところを開くとCloudWatch Logsの上の方のところににありました。
引用元: https://dev.classmethod.jp/cloud/aws/notify-error-cloudwatch-logs-with-lambda/
設定する
次のような世界を実現したいと思います。
注意ですが、サブスクリプションフィルタはロググループに対して1つのみしか設定できないため、複数設定したいという場合は、ロググループをそもそも分けるか、Lambda側で判定したり(これはあまりかっこよくないですね)が必要になると思います。
リソース デフォルトの制限 .. ... サブスクリプションフィルター 1 ロググループあたり 1。この制限は変更できません。 引用元: https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
Filter
今回は例としてErrorのログが流れてきた場合通知するみたいなものを考えようと思います。
そのためFilterは単に"ERRORと"
します。
なんども同じ記事から引用するのは忍びないですが、分かりやすいには越したことがないので引用します。
引用元: https://dev.classmethod.jp/cloud/aws/notify-error-cloudwatch-logs-with-lambda/
またフィルタパターンはメトリクスフィルタを設定するときと同じ文法が使えるようです。
Lambda
Lambda側ではフィルタに引っかかったログの内容をそのまま利用する事ができます。
当初の設計ではログを遡る必要がありましたが、その際の懸念などもなくなりますね。
雰囲気だけ伝えるためにpython3でのLambdaのやつを書いておきます。
# -*- coding: utf-8 -*-
import base64
import zlib
import io
import json
import os
import urllib.request
def lambda_handler(event, context):
ids = parse_log_text(extract_event(event))
send_to_slack(make_messages_blocks([
'次のidで書き込みエラーが発生しています!',
'ids: ' +','.join(messages),
]))
# サブスクリプションフィルタから送られてくるeventはエンコードされているためいい感じにデコードしてあげる必要がある
def extract_event(event):
decoded_data = zlib.decompress(
base64.b64decode(event['awslogs']['data']),
16+zlib.MAX_WBITS
)
json_data = json.loads(decoded_data) # これでdecode終わり
return json_data['logEvents'][0]['message'] # 今回はログの内容を指定して返す
def parse_log_text(log_text):
# 今回の例では次のようなエラーがきたことを考える
# log_text = '[ERROR] 次のIDがなんらかのエラーでDBに書き込まれせんでした#1,123,234'
ids = log_text.split('#')[1].split(',')
return ids
def make_messages_blocks(messages):
return list(map(lambda m: {
"type": "section",
"text": {
"type": "mrkdwn",
"text": m
}
}, messages))
def send_to_slack(blocks):
send_data = {
"attachments": [
{
"color": "#000000",
"blocks": blocks
}
]
}
slack_text = "payload=" + json.dumps(send_data)
request = urllib.request.Request(
os.environ['WebhookURL'],
data = slack_text.encode("UTF-8"),
method = "POST"
)
with urllib.request.urlopen(request) as response:
response_body = response.read().decode('utf-8')
今回の例でいくとSlackに通知をおこなうやつですね。結構そのまんま使えるので有用だと思います!
まとめ
サブスクリプションフィルタすごい!
実は今回サブスクリプションフィルタの存在を教えていただいたのは @ikeisuke さんだったりしました。社内からこういったことをフランクに教えてもらえて最高ですね。(その節はありがとうございます:) )この実装を通して、イベント駆動というかストリーム処理というか、そういった考え方みたいなことをこれを通して学ぶことができたような気がします。
次は @saitotakashi さん の「GASでChatworkを自由自在に操る方法」とのことで明日も楽しみですね(dance)