Why not login to Qiita and try out its useful features?

We'll deliver articles that match you.

You can read useful information later.

1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CloudWatchLogsから特定のエラーをサブスクリプションフィルターで抽出してSlackに通知する

Posted at

目的

CloudWatchLogsにエラーログが流れた時にサブスクリプションフィルターを使って最終的にSlackに通知したいです。

サブスクリプションフィルターはLambdaにつなげて、データをSQSに保存します。

EventBridgeスケジューラーで10分ごとに別のLambdaを起動してSQSからデータを回収して、集計して、Slackに通知します。

サブスクリプションフィルター.drawio.png

ログは以下のようになります。levelがERRORとWARNのものを集めます。error_codeで集計をします。

{
    "timestamp": "2023-08-18T07:48:33.563814Z",
    "level": "ERROR",
    "fields": {
        "error_code": "parse error",
        "message": "Error(\"expected `,` or `}`\", line: 7, column: 5)"
    }
}

Slackに通知されるメッセージ

parse error: 1

コード

サブスクリプションフィルターからデータを受ける

サブスクリプションフィルター送られてくるデータはGZIPで圧縮されてBase64エンコードされています。
それを戻してSQSにpushします。

このコードはLambdaのデフォルトのgemで動作します。

subscription_filter/main.rb
require 'json'
require 'base64'
require 'zlib'
require 'aws-sdk-sqs'

def lambda_handler(event:, context:)
  @sqs = Aws::SQS::Client.new(region: 'us-east-1')
  @queue_url = @sqs.get_queue_url(queue_name: ENV['LOGS_QUEUE']).queue_url

  json_log_data = JSON.parse(
    Zlib::GzipReader.new(
      StringIO.new(Base64.decode64(event["awslogs"]["data"]))
    ).read
  )
  log_events = json_log_data["logEvents"]
  log_events.each do |log_event|
    @sqs.send_message(
      queue_url: @queue_url,
      message_body: log_event.to_json
    )
  end
end

SQSから取得してSlackに通知する

SQSから取得する場合数が少ないと全て降ってこないようです。
ここでは最初は5秒まって取得して、移行は0秒で取得できなくなるまで取得します。
ログはerror_codeで集計してerror_codeごとに発生した数を数えてSlackに通知します。

このコードはLambdaのデフォルトのgemで動作します。

slack_sender/main.rb
require 'json'
require 'base64'
require 'zlib'
require 'aws-sdk-sqs'
require 'net/http'
require 'uri'

# SQSのメッセージを削除する
def execute_delete(entries)
  return if entries.empty?
  begin
    @sqs.delete_message_batch(
      queue_url: @queue_url, 
      entries: entries
    )
  rescue => e
    puts "fail: delete_message_batch"
    raise e
  end
end

# SQSのメッセージからエラーの文言を回収する
def retrive_message(message, map)
  result = {
    id: message.message_id,
    receipt_handle: message.receipt_handle,
  }
  begin
    msg1 = JSON.parse(message.body)
    msg2 = JSON.parse(msg1["message"])
    target_message = msg2["fields"]["error_code"] || msg2["fields"]["message"]
    map[target_message] = (map[target_message] || 0) + 1
  rescue => e2
    # ignore json parse error
    pp message
  end
  result
end

# SQSからメッセージを取得
def execute_receive(map, wait_time_seconds)
  begin
    receive_message_result = @sqs.receive_message({
      queue_url: @queue_url, 
      message_attribute_names: ["All"], # Receive all custom attributes.
      max_number_of_messages: 10, # これが最大サイズ
      wait_time_seconds: wait_time_seconds
    })

    # 取得したメッセージが無ければ終了
    return false if (receive_message_result.messages || []).empty?
    entries = []
    receive_message_result.messages.each do |message|
      entries << retrive_message(message, map)
    end
    execute_delete(entries)
  rescue => e
    puts "fail: receive_message"
    raise e
  end
  true
end

# エラーマップからメッセージを作成
def make_message(map)
  message = map.map do |it|
    "#{it[0]}: #{it[1]}"
  end
  message.join("\n")
end

# スラックに送信
def execute_slack(map)
  # 空なら何もしない
  return if map.empty?

  uri = URI.parse('https://slack.com/api/chat.postMessage')
  params = {
    channel: '#logs',
    icon_emoji: ':robot_face:',
    username: 'bot',
    token: ENV['SLACK_API_TOKEN'],
    text: make_message(map),
  }
  begin
    res = Net::HTTP.post_form(uri, params)
    data = JSON.parse(res.body)
    raise data["error"] unless data["ok"]
  rescue => e
    puts "fail: slack #{e.to_s}"
    raise e 
  end
end

def lambda_handler(event:, context:)
  # SQS準備
  @sqs = Aws::SQS::Client.new(region: 'us-east-1')
  @queue_url = @sqs.get_queue_url(queue_name: ENV['LOGS_QUEUE']).queue_url

  # エラーを貯めるマップ。キーがエラー対象の文字列、値が数
  map = {}

  # メッセージが無くなるまで繰り返す
  continue_flag = true

  # 最初だけデータ取得を5秒待つ。
  wait_time_seconds = 5

  # 100回決め打ち
  100.times do 
    continue_flag = execute_receive(map, wait_time_seconds)
    wait_time_seconds = 0
    break unless continue_flag
  end

  # 指定回を超えていたらリミットーオーバーの警告
  if continue_flag
    puts "warn: limit over"
  end

  # スラック送信
  execute_slack(map)
end

CDK

cdk-deploy-subscription-filter.ts
import { Duration, Stack, StackProps } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as scheduler from "aws-cdk-lib/aws-scheduler";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as logs from "aws-cdk-lib/aws-logs";
import * as destinations from "aws-cdk-lib/aws-logs-destinations";

export class CdkDeploySubscriptionFilterStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const prefix = "sample-app";

    const region = cdk.Stack.of(this).region;
    const accountId = cdk.Stack.of(this).account;

    const subsciriptionFilterQueue = new sqs.Queue(this, "SubscriptionFilterQueue", {
      queueName: `${prefix}-subcription-filter-queue`,
      visibilityTimeout: Duration.seconds(300)
    });

    const subscriptionFilterLambdaRole = new iam.Role(this, "SubscriptionFiletrLambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      roleName: `${prefix}-role-lambda-subcription-filter`,
      inlinePolicies: {
        CloudWatch: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "logs:DescribeLogStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:FilterLogEvents",
              ],
              "Resource": "*"
            },
          ]
        }),
        Sqs: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "sqs:ListQueues",
                "sqs:ChangeMessageVisibility",
                "sqs:SendMessageBatch",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:ListQueueTags",
                "sqs:ListDeadLetterSourceQueues",
                "sqs:DeleteMessageBatch",
                "sqs:ChangeMessageVisibilityBatch",
                "sqs:SetQueueAttributes",
              ],
              "Resource": subsciriptionFilterQueue.queueArn,
            },
          ]
        })
      }
    });

    const subscriptionFilterFunction = new lambda.Function(this, "SubscriptionFiletrFunction", {
      runtime: lambda.Runtime.RUBY_3_2,
      code: lambda.Code.fromAsset("../../apps/subscription_filter"),
      role: subscriptionFilterLambdaRole,
      handler: "main.lambda_handler",
      timeout: cdk.Duration.seconds(60),
      logRetention: logs.RetentionDays.ONE_MONTH,
      functionName: `${prefix}-subsciription-filter`,
      environment: {
        LOGS_QUEUE: `${prefix}-subcription-filter-queue`
      }
    });

    const slackSenderLambdaRole = new iam.Role(this, "SlackSenderLambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      roleName: `${prefix}-role-lambda-slack-sender`,
      inlinePolicies: {
        CloudWatch: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "logs:DescribeLogStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:FilterLogEvents",
              ],
              "Resource": "*"
            },
          ]
        }),
        Sqs: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "sqs:ListQueues",
                "sqs:ChangeMessageVisibility",
                "sqs:SendMessageBatch",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:ListQueueTags",
                "sqs:ListDeadLetterSourceQueues",
                "sqs:DeleteMessageBatch",
                "sqs:ChangeMessageVisibilityBatch",
                "sqs:SetQueueAttributes",
              ],
              "Resource": subsciriptionFilterQueue.queueArn,
            },
          ]
        })
      }
    });

    const slackSenderFunction = new lambda.Function(this, "SlackSenderFunction", {
      runtime: lambda.Runtime.RUBY_3_2,
      code: lambda.Code.fromAsset("../../apps/slack_sender"),
      role: slackSenderLambdaRole,
      handler: "main.lambda_handler",
      timeout: cdk.Duration.seconds(600),
      logRetention: logs.RetentionDays.ONE_MONTH,
      functionName: `${prefix}-slack-sender`,
      environment: {
        LOGS_QUEUE: `${prefix}-subcription-filter-queue`,
        SLACK_API_TOKEN: process.env["SLACK_API_TOKEN"] || "",
      }
    });

    const slackSenderScheduleRole = new iam.Role(this, "SlackSenderScheduleRole", {
      assumedBy: new iam.ServicePrincipal("scheduler.amazonaws.com"),
      roleName: `${prefix}-role-slack-sender-schedule`,
      inlinePolicies: {
        CloudWatch: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "logs:DescribeLogStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:FilterLogEvents",
              ],
              "Resource": "*"
            },
          ]
        }),
        Lambda: iam.PolicyDocument.fromJson({
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [
                "lambda:InvokeFunction",
              ],
              "Resource": slackSenderFunction.functionArn,
            },
          ]
        })
      }
    });

    const event = new scheduler.CfnSchedule(this, "SlackSenderCfnSchedule", {
      name: `${prefix}-slack-sender`,
      flexibleTimeWindow: {
        mode: "OFF",
      },
      scheduleExpression: "cron(0/10 * * * ? *)",
      target: {
        arn: slackSenderFunction.functionArn,
        roleArn: slackSenderScheduleRole.roleArn,
        retryPolicy: {
          maximumEventAgeInSeconds: 60,
          maximumRetryAttempts: 0,
        },
      }
    });

    const destination = new destinations.LambdaDestination(subscriptionFilterFunction);
    const filterPattern = logs.FilterPattern.any(
      logs.FilterPattern.stringValue("$.level", "=", "ERROR"),
      logs.FilterPattern.stringValue("$.level", "=", "WARN"),
    );

    const ecsLogGroup = logs.LogGroup.fromLogGroupName(this, "EcsLogGroup", `${prefix}-ecs`);
    ecsLogGroup.addSubscriptionFilter("EcsSubscriptionFilter", {
      destination,
      filterPattern,
    });
}

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?