LoginSignup
0
0

More than 1 year has passed since last update.

CloudWatch Insightsで集計したWAFログをPythonで取得する

Last updated at Posted at 2022-02-16

概要

以下記載の通り、AWS WAFログをCloudWatch Logsに直接ロギングできるようになったので、特定のルールでブロックされたIPアドレスを集計するスクリプトを作成した。
https://aws.amazon.com/jp/about-aws/whats-new/2021/12/awf-waf-cloudwatch-log-s3-bucket/

WAFログの設定

AWS WAFログをCloudWatch Logsに出力する設定手順は、以下記事が参考になります。
https://dev.classmethod.jp/articles/aws-waf-log-support-s3-and-cloudwatch-logs/#toc-4

必要なIAMポリシー

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:DescribeQueries",
                "logs:GetQueryResults",
                "logs:StartQuery",
                "logs:StopQuery",
                "logs:PutQueryDefinition",
                "logs:DescribeQueryDefinitions"
            ],
            "Resource": "*"
        }
    ]
}

Pythonスクリプト

※python3系のみ実行可能。

import boto3
import time
import datetime
import json
import sys

END_DATE = datetime.datetime.now()
START_DATE = END_DATE - datetime.timedelta(hours=3) # 直近3時間のデータを参照している

END_TIME = int(END_DATE.timestamp())
START_TIME = int(START_DATE.timestamp())

LOG_GROUP_NAME ='aws-waf-logs-xxx' # クエリ実行対象のロググループ名を指定する
WAIT_TIME = 1
QUERY = '''fields  httpRequest.clientIp
| filter terminatingRuleId = "xxx" # 集計対象のWAFルールIDを指定する
| stats count(*) as requestCount by httpRequest.clientIp
| sort requestCount desc
| limit 10'''

client = boto3.client('logs')

def start_query(log_group):
    response = client.start_query(
        logGroupName=log_group,
        startTime=START_TIME,
        endTime=END_TIME,
        queryString=QUERY
    )
    return response['queryId']

def get_query_results(query_id):
    response = client.get_query_results(
        queryId=query_id
    )
    return response

query_id = start_query(LOG_GROUP_NAME)
query_result = get_query_results(query_id)
query_status = query_result['status']

while query_status != 'Complete':
    if query_status == 'Scheduled' or query_status == 'Running':
        time.sleep(WAIT_TIME)
        query_result = get_query_results(query_id)
        query_status = query_result['status']
        continue
    else:
        print(query_result)
        sys.exit(1)

cnt = 1
result_data = {}
result_data["FROM"] = str(START_DATE.strftime('%Y/%m/%d %H:%M:%S')) + "(UTC)"
result_data["TO"] = str(END_DATE.strftime('%Y/%m/%d %H:%M:%S')) + "(UTC)"

for blocked_ips in query_result['results']:
    key = str(cnt) + ") " + blocked_ips[0]['value']
    result_data[key] = blocked_ips[1]['value']
    cnt += 1

result = json.dumps(result_data, sort_keys=True, indent=4)
print(result)

実行結果

{
    "1) 14.9.136.64": "85",
    "2) 54.87.249.144": "18",
    "3) 111.225.148.157": "1",
    "FROM": "2022/02/16 07:05:21(UTC)",
    "TO": "2022/02/16 10:05:21(UTC)"
}

参考

https://dev.classmethod.jp/articles/how-to-count-cloudwatch-logs-log-events/
https://stackoverflow.com/questions/49706693/parsing-boto3-output-json

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0