目的
CloudWatchLogsにエラーログが流れた時にサブスクリプションフィルターを使って最終的にSlackに通知したいです。
サブスクリプションフィルターはLambdaにつなげて、データをSQSに保存します。
EventBridgeスケジューラーで10分ごとに別のLambdaを起動してSQSからデータを回収して、集計して、Slackに通知します。
ログは以下のようになります。levelがERRORとWARNのものを集めます。error_codeで集計をします。
{
"timestamp": "2023-08-18T07:48:33.563814Z",
"level": "ERROR",
"fields": {
"error_code": "parse error",
"message": "Error(\"expected `,` or `}`\", line: 7, column: 5)"
}
}
Slackに通知されるメッセージ
parse error: 1
コード
サブスクリプションフィルターからデータを受ける
サブスクリプションフィルター送られてくるデータはGZIPで圧縮されてBase64エンコードされています。
それを戻してSQSにpushします。
このコードはLambdaのデフォルトのgemで動作します。
require 'json'
require 'base64'
require 'zlib'
require 'aws-sdk-sqs'
def lambda_handler(event:, context:)
@sqs = Aws::SQS::Client.new(region: 'us-east-1')
@queue_url = @sqs.get_queue_url(queue_name: ENV['LOGS_QUEUE']).queue_url
json_log_data = JSON.parse(
Zlib::GzipReader.new(
StringIO.new(Base64.decode64(event["awslogs"]["data"]))
).read
)
log_events = json_log_data["logEvents"]
log_events.each do |log_event|
@sqs.send_message(
queue_url: @queue_url,
message_body: log_event.to_json
)
end
end
SQSから取得してSlackに通知する
SQSから取得する場合数が少ないと全て降ってこないようです。
ここでは最初は5秒まって取得して、移行は0秒で取得できなくなるまで取得します。
ログはerror_codeで集計してerror_codeごとに発生した数を数えてSlackに通知します。
このコードはLambdaのデフォルトのgemで動作します。
require 'json'
require 'base64'
require 'zlib'
require 'aws-sdk-sqs'
require 'net/http'
require 'uri'
# SQSのメッセージを削除する
def execute_delete(entries)
return if entries.empty?
begin
@sqs.delete_message_batch(
queue_url: @queue_url,
entries: entries
)
rescue => e
puts "fail: delete_message_batch"
raise e
end
end
# SQSのメッセージからエラーの文言を回収する
def retrive_message(message, map)
result = {
id: message.message_id,
receipt_handle: message.receipt_handle,
}
begin
msg1 = JSON.parse(message.body)
msg2 = JSON.parse(msg1["message"])
target_message = msg2["fields"]["error_code"] || msg2["fields"]["message"]
map[target_message] = (map[target_message] || 0) + 1
rescue => e2
# ignore json parse error
pp message
end
result
end
# SQSからメッセージを取得
def execute_receive(map, wait_time_seconds)
begin
receive_message_result = @sqs.receive_message({
queue_url: @queue_url,
message_attribute_names: ["All"], # Receive all custom attributes.
max_number_of_messages: 10, # これが最大サイズ
wait_time_seconds: wait_time_seconds
})
# 取得したメッセージが無ければ終了
return false if (receive_message_result.messages || []).empty?
entries = []
receive_message_result.messages.each do |message|
entries << retrive_message(message, map)
end
execute_delete(entries)
rescue => e
puts "fail: receive_message"
raise e
end
true
end
# エラーマップからメッセージを作成
def make_message(map)
message = map.map do |it|
"#{it[0]}: #{it[1]}"
end
message.join("\n")
end
# スラックに送信
def execute_slack(map)
# 空なら何もしない
return if map.empty?
uri = URI.parse('https://slack.com/api/chat.postMessage')
params = {
channel: '#logs',
icon_emoji: ':robot_face:',
username: 'bot',
token: ENV['SLACK_API_TOKEN'],
text: make_message(map),
}
begin
res = Net::HTTP.post_form(uri, params)
data = JSON.parse(res.body)
raise data["error"] unless data["ok"]
rescue => e
puts "fail: slack #{e.to_s}"
raise e
end
end
def lambda_handler(event:, context:)
# SQS準備
@sqs = Aws::SQS::Client.new(region: 'us-east-1')
@queue_url = @sqs.get_queue_url(queue_name: ENV['LOGS_QUEUE']).queue_url
# エラーを貯めるマップ。キーがエラー対象の文字列、値が数
map = {}
# メッセージが無くなるまで繰り返す
continue_flag = true
# 最初だけデータ取得を5秒待つ。
wait_time_seconds = 5
# 100回決め打ち
100.times do
continue_flag = execute_receive(map, wait_time_seconds)
wait_time_seconds = 0
break unless continue_flag
end
# 指定回を超えていたらリミットーオーバーの警告
if continue_flag
puts "warn: limit over"
end
# スラック送信
execute_slack(map)
end
CDK
import { Duration, Stack, StackProps } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as scheduler from "aws-cdk-lib/aws-scheduler";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as logs from "aws-cdk-lib/aws-logs";
import * as destinations from "aws-cdk-lib/aws-logs-destinations";
export class CdkDeploySubscriptionFilterStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const prefix = "sample-app";
const region = cdk.Stack.of(this).region;
const accountId = cdk.Stack.of(this).account;
const subsciriptionFilterQueue = new sqs.Queue(this, "SubscriptionFilterQueue", {
queueName: `${prefix}-subcription-filter-queue`,
visibilityTimeout: Duration.seconds(300)
});
const subscriptionFilterLambdaRole = new iam.Role(this, "SubscriptionFiletrLambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
roleName: `${prefix}-role-lambda-subcription-filter`,
inlinePolicies: {
CloudWatch: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:FilterLogEvents",
],
"Resource": "*"
},
]
}),
Sqs: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:ChangeMessageVisibility",
"sqs:SendMessageBatch",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ChangeMessageVisibilityBatch",
"sqs:SetQueueAttributes",
],
"Resource": subsciriptionFilterQueue.queueArn,
},
]
})
}
});
const subscriptionFilterFunction = new lambda.Function(this, "SubscriptionFiletrFunction", {
runtime: lambda.Runtime.RUBY_3_2,
code: lambda.Code.fromAsset("../../apps/subscription_filter"),
role: subscriptionFilterLambdaRole,
handler: "main.lambda_handler",
timeout: cdk.Duration.seconds(60),
logRetention: logs.RetentionDays.ONE_MONTH,
functionName: `${prefix}-subsciription-filter`,
environment: {
LOGS_QUEUE: `${prefix}-subcription-filter-queue`
}
});
const slackSenderLambdaRole = new iam.Role(this, "SlackSenderLambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
roleName: `${prefix}-role-lambda-slack-sender`,
inlinePolicies: {
CloudWatch: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:FilterLogEvents",
],
"Resource": "*"
},
]
}),
Sqs: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:ChangeMessageVisibility",
"sqs:SendMessageBatch",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ChangeMessageVisibilityBatch",
"sqs:SetQueueAttributes",
],
"Resource": subsciriptionFilterQueue.queueArn,
},
]
})
}
});
const slackSenderFunction = new lambda.Function(this, "SlackSenderFunction", {
runtime: lambda.Runtime.RUBY_3_2,
code: lambda.Code.fromAsset("../../apps/slack_sender"),
role: slackSenderLambdaRole,
handler: "main.lambda_handler",
timeout: cdk.Duration.seconds(600),
logRetention: logs.RetentionDays.ONE_MONTH,
functionName: `${prefix}-slack-sender`,
environment: {
LOGS_QUEUE: `${prefix}-subcription-filter-queue`,
SLACK_API_TOKEN: process.env["SLACK_API_TOKEN"] || "",
}
});
const slackSenderScheduleRole = new iam.Role(this, "SlackSenderScheduleRole", {
assumedBy: new iam.ServicePrincipal("scheduler.amazonaws.com"),
roleName: `${prefix}-role-slack-sender-schedule`,
inlinePolicies: {
CloudWatch: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:FilterLogEvents",
],
"Resource": "*"
},
]
}),
Lambda: iam.PolicyDocument.fromJson({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": slackSenderFunction.functionArn,
},
]
})
}
});
const event = new scheduler.CfnSchedule(this, "SlackSenderCfnSchedule", {
name: `${prefix}-slack-sender`,
flexibleTimeWindow: {
mode: "OFF",
},
scheduleExpression: "cron(0/10 * * * ? *)",
target: {
arn: slackSenderFunction.functionArn,
roleArn: slackSenderScheduleRole.roleArn,
retryPolicy: {
maximumEventAgeInSeconds: 60,
maximumRetryAttempts: 0,
},
}
});
const destination = new destinations.LambdaDestination(subscriptionFilterFunction);
const filterPattern = logs.FilterPattern.any(
logs.FilterPattern.stringValue("$.level", "=", "ERROR"),
logs.FilterPattern.stringValue("$.level", "=", "WARN"),
);
const ecsLogGroup = logs.LogGroup.fromLogGroupName(this, "EcsLogGroup", `${prefix}-ecs`);
ecsLogGroup.addSubscriptionFilter("EcsSubscriptionFilter", {
destination,
filterPattern,
});
}