6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS WAFv2のレートベースルールでIPブロック時にメール通知する仕組みを構築する

6
Last updated at Posted at 2026-02-10

はじめに

ソーイ村上です。

開発しているシステムでAWS上でAWS WAFを設定したWebアプリケーションを運用しています。WAFのレートベースルールを導入した際、「特定ルールによるブロックが発生した場合に通知したい」という要望がありました。本記事はその解決策として構築した通知システムの実装記録です。

前提条件

  • AWS WAFv2でレートベースルール設定済み
  • WAFログをCloudWatch Logsに出力設定済み

レートベースルールとは

レートベースルールは、AWS WAFv2が提供するルールタイプの1つで、指定した時間内に特定の集約キー(IPアドレス、ヘッダー、Cookie等)からのリクエスト数が閾値を超えた場合に、自動的にブロックする機能です。

構成図

waf.drawio.png

この構成を選んだ理由

  • CloudWatch Logsサブスクリプションフィルター: 必要なログのみをLambdaに送信でき、コスト効率が良い
  • S3での履歴管理: DynamoDBより安価で、日付ごとのファイル構造で管理しやすい
  • SNS: メール以外にもSlack連携など拡張しやすい

CloudWatchアラーム + SNSだけでも通知は可能ですが、「どのIPがブロックされたか」の詳細情報を含めたかったため、Lambda経由の構成を採用しました。

実装手順

WAFによる同一IPアドレスのアクセス制限

AWS WAFv2でレートベースルールを作成します。

レートベースルールの設定例

今回のプロジェクトでは、以下の設定でルールを作成しました。

設定項目 説明
ルール名 IPAddressRateLimitRule 任意の名前(この名前でフィルタリングします)
レート制限 600 評価期間内の最大リクエスト数
評価期間 60秒 リクエストをカウントする時間枠
IPアドレスの集約キー Source IP address クライアントのIPアドレスでカウント
アクション Block 閾値超過時にリクエストをブロック

AWSコンソールでの作成手順

  1. AWS WAFコンソール → Web ACLs → 対象のWeb ACLを選択
  2. 「Rules」タブ → 「Add rules」 → 「Add my own rules and rule groups」
  3. 「Rule type」で「Rate-based rule」を選択
  4. 以下を設定:
    • Rule name: IPAddressRateLimitRule
    • Rate limit: 600
    • Evaluation window: 1 minute (60 seconds)
    • Request aggregation: Source IP address
    • Action: Block
  5. 「Add rule」をクリック

CloudWatch メトリクスの有効化

ウェブACLのログ記録送信先を有効化してください、ブロック数などのメトリクスが取得できます。これにより、CloudWatchアラームでの監視も可能になります。
CleanShot 2026-02-09 at 19.37.48@2x.png

1. S3バケットの作成

履歴管理用のS3バケットを作成します。今回は環境(dev/staging/production)で共有し、パスで区別します。

{バケット名}/{環境}/{日付}/{IPアドレス}.json
例: my-waf-logs/dev/2025-04-18/58.95.184.73.json

2. Lambda関数の作成

ランタイム: Python 3.9以降(zoneinfoモジュール使用のため)

環境変数:

変数名 説明
SNS_TOPIC_ARN 通知先SNSトピックのARN arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:waf-alerts
S3_BUCKET 履歴保存用S3バケット my-waf-notification-bucket
APP_ENV 環境識別子 dev

Lambda関数コード:

import json
import boto3
import os
import base64
import zlib
from botocore.exceptions import ClientError
from datetime import datetime, timezone
from zoneinfo import ZoneInfo

# 環境変数
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
APP_ENV = os.environ.get('APP_ENV', 'dev')
S3_BUCKET = os.environ.get('S3_BUCKET')
REGION = "ap-northeast-1"

def send_notification(topic_arn, subject, message):
    """SNS通知を送信"""
    client = boto3.client('sns', region_name=REGION)
    response = client.publish(
        TopicArn=topic_arn,
        Subject=subject,
        Message=message
    )
    return response

def get_tokyo_date():
    """日本時間の現在日付を取得"""
    tokyo_tz = ZoneInfo('Asia/Tokyo')
    tokyo_now = datetime.now(tokyo_tz)
    return tokyo_now.strftime('%Y-%m-%d')

def should_send_notification(ip_address):
    """同一IPが本日既に通知済みかチェック"""
    s3 = boto3.client('s3', region_name=REGION)
    today_str = get_tokyo_date()
    s3_key = f"{APP_ENV}/{today_str}/{ip_address}.json"
    
    try:
        s3.head_object(Bucket=S3_BUCKET, Key=s3_key)
        return False  # ファイルが存在 = 通知済み
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            return True
        raise e

def record_notification(ip_address, log_data):
    """通知履歴をS3に記録"""
    s3 = boto3.client('s3', region_name=REGION)
    today_str = get_tokyo_date()
    s3_key = f"{APP_ENV}/{today_str}/{ip_address}.json"
    
    record = {
        "ip_address": ip_address,
        "notified_at": datetime.now(timezone.utc).isoformat(),
        "log_data": log_data
    }
    
    s3.put_object(
        Bucket=S3_BUCKET,
        Key=s3_key,
        Body=json.dumps(record, ensure_ascii=False),
        ContentType='application/json'
    )

def lambda_handler(event, context):
    # CloudWatch Logsからのデータをデコード
    compressed_payload = base64.b64decode(event['awslogs']['data'])
    uncompressed_payload = zlib.decompress(compressed_payload, 16 + zlib.MAX_WBITS)
    log_data = json.loads(uncompressed_payload)
    
    for log_event in log_data['logEvents']:
        log_message = json.loads(log_event['message'])
        
        # IPアドレス取得
        client_ip = log_message.get("httpRequest", {}).get("clientIp", "不明")
        
        # 本日通知済みかチェック
        if not should_send_notification(client_ip):
            print(f"IPアドレス {client_ip} は本日既に通知済みです")
            continue
        
        # 国情報取得
        country = log_message.get("httpRequest", {}).get("country", "不明")
        
        # User-Agent取得
        user_agent = "不明"
        headers = log_message.get("httpRequest", {}).get("headers", [])
        for header in headers:
            if header.get("name", "").lower() == "user-agent":
                user_agent = header.get("value", "不明")
                break
        
        # レートベースルール情報取得
        rate_rule = {}
        rate_list = log_message.get("rateBasedRuleList", [])
        if rate_list:
            rate_rule = rate_list[0]
        
        # 通知メッセージ作成
        subject = f"[{APP_ENV}] WAF IPAddressRateLimitRule でアクセスをブロックしました"
        message = f'''WAFがレートベースルールによりアクセスをブロックしました。

■ ブロック情報
IPアドレス: {client_ip}
国: {country}
User-Agent: {user_agent}

■ ルール詳細
ルール名: {log_message.get("terminatingRuleId", "不明")}
ルールタイプ: {log_message.get("terminatingRuleType", "不明")}
最大リクエスト数: {rate_rule.get("maxRateAllowed", "不明")}
評価期間: {rate_rule.get("evaluationWindowSec", "不明")}秒

■ 環境
環境: {APP_ENV}
タイムスタンプ: {datetime.now(ZoneInfo('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S')} (JST)

※同一IPアドレスへの通知は1日1回です。
'''
        
        # SNS通知送信
        try:
            send_notification(SNS_TOPIC_ARN, subject, message)
            record_notification(client_ip, log_message)
            print(f"通知送信完了: {client_ip}")
        except ClientError as e:
            print(f"通知送信エラー: {e}")
            raise e
    
    return {'statusCode': 200, 'body': 'Success'}

3. IAMポリシーの設定

Lambda実行ロールに以下のポリシーを付与します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["sns:Publish"],
            "Resource": "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:waf-alerts"
        },
        {
            "Effect": "Allow",
            "Action": ["s3:GetObject", "s3:PutObject", "s3:HeadObject"],
            "Resource": "arn:aws:s3:::my-waf-notification-bucket/*"
        }
    ]
}

実装時にハマったポイント

当初、以下のようにS3権限をまとめて記述していました。

{
    "Effect": "Allow",
    "Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
    "Resource": "arn:aws:s3:::my-bucket/dev/*"
}

これだとAccessDeniedエラーが発生します。理由は、s3:ListBucketはバケットレベルの権限であり、オブジェクトパス(/*)を指定したリソースには適用されないためです。

正しくは以下のように分離する必要があります。

{
    "Statement": [
        {
            "Action": ["s3:GetObject", "s3:PutObject"],
            "Resource": "arn:aws:s3:::my-bucket/dev/*"
        },
        {
            "Action": ["s3:ListBucket"],
            "Resource": "arn:aws:s3:::my-bucket"
        }
    ]
}

ただし今回はhead_objectで存在確認しているため、s3:ListBucketは不要で、s3:HeadObjects3:GetObjectに含まれる)があれば動作します。

4. サブスクリプションフィルターの作成

CloudWatch Logsコンソールまたは以下のCLIで設定します。

aws logs put-subscription-filter \
  --log-group-name "aws-waf-logs-your-webacl-name" \
  --filter-name "IPAddressRateLimitRule-Block-Filter" \
  --filter-pattern '{ $.action = "BLOCK" && $.terminatingRuleId = "IPAddressRateLimitRule" }' \
  --destination-arn "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:waf-block-notification"

注意: Lambdaに対してCloudWatch Logsからの呼び出し権限を付与する必要があります。

aws lambda add-permission \
  --function-name waf-block-notification \
  --statement-id AllowCloudWatchLogs \
  --action lambda:InvokeFunction \
  --principal logs.ap-northeast-1.amazonaws.com \
  --source-arn "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:aws-waf-logs-*:*"

通知メールの例

件名: [dev] WAF IPAddressRateLimitRule でアクセスをブロックしました

WAFがレートベースルールによりアクセスをブロックしました。

■ ブロック情報
IPアドレス: x.x.x.x
国: JP
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...

■ ルール詳細
ルール名: IPAddressRateLimitRule
ルールタイプ: RATE_BASED
最大リクエスト数: 600
評価期間: 60秒

■ 環境
環境: dev
タイムスタンプ: 2026-02-09 15:30:45 (JST)

運用上の注意点

  1. S3ライフサイクル設定: 履歴ファイルが蓄積されるため、30日などでの自動削除ルールを設定することをおすすめします

  2. タイムゾーン: 通知の重複判定は日本時間(Asia/Tokyo)基準です。Python 3.9以降のzoneinfoを使用しているため、3.8以前の場合はpytzをLayerで追加してください

参考リンク

まとめ

AWS WAFv2のレートベースルールによるIPブロックをリアルタイムで把握できる通知システムを構築しました。Lambda側でフィルタリング条件を変更すれば、特定のルールのみ通知するなど柔軟な対応が可能です。

次回は一時的に大量のアクセスが来た場合に、アクセスブロックを行うWAF設定をしたときの内容を記事にします。

お知らせ

技術ブログを週1〜2本更新中、ソーイをフォローして最新記事をチェック!
https://qiita.com/organizations/sewii

6
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?