はじめに
ソーイ村上です。
開発しているシステムでAWS上でAWS WAFを設定したWebアプリケーションを運用しています。WAFのレートベースルールを導入した際、「特定ルールによるブロックが発生した場合に通知したい」という要望がありました。本記事はその解決策として構築した通知システムの実装記録です。
前提条件
- AWS WAFv2でレートベースルール設定済み
- WAFログをCloudWatch Logsに出力設定済み
レートベースルールとは
レートベースルールは、AWS WAFv2が提供するルールタイプの1つで、指定した時間内に特定の集約キー(IPアドレス、ヘッダー、Cookie等)からのリクエスト数が閾値を超えた場合に、自動的にブロックする機能です。
構成図
この構成を選んだ理由
- CloudWatch Logsサブスクリプションフィルター: 必要なログのみをLambdaに送信でき、コスト効率が良い
- S3での履歴管理: DynamoDBより安価で、日付ごとのファイル構造で管理しやすい
- SNS: メール以外にもSlack連携など拡張しやすい
CloudWatchアラーム + SNSだけでも通知は可能ですが、「どのIPがブロックされたか」の詳細情報を含めたかったため、Lambda経由の構成を採用しました。
実装手順
WAFによる同一IPアドレスのアクセス制限
AWS WAFv2でレートベースルールを作成します。
レートベースルールの設定例
今回のプロジェクトでは、以下の設定でルールを作成しました。
| 設定項目 | 値 | 説明 |
|---|---|---|
| ルール名 | IPAddressRateLimitRule | 任意の名前(この名前でフィルタリングします) |
| レート制限 | 600 | 評価期間内の最大リクエスト数 |
| 評価期間 | 60秒 | リクエストをカウントする時間枠 |
| IPアドレスの集約キー | Source IP address | クライアントのIPアドレスでカウント |
| アクション | Block | 閾値超過時にリクエストをブロック |
AWSコンソールでの作成手順
- AWS WAFコンソール → Web ACLs → 対象のWeb ACLを選択
- 「Rules」タブ → 「Add rules」 → 「Add my own rules and rule groups」
- 「Rule type」で「Rate-based rule」を選択
- 以下を設定:
- Rule name:
IPAddressRateLimitRule - Rate limit:
600 - Evaluation window:
1 minute (60 seconds) - Request aggregation:
Source IP address - Action:
Block
- Rule name:
- 「Add rule」をクリック
CloudWatch メトリクスの有効化
ウェブACLのログ記録送信先を有効化してください、ブロック数などのメトリクスが取得できます。これにより、CloudWatchアラームでの監視も可能になります。

1. S3バケットの作成
履歴管理用のS3バケットを作成します。今回は環境(dev/staging/production)で共有し、パスで区別します。
{バケット名}/{環境}/{日付}/{IPアドレス}.json
例: my-waf-logs/dev/2025-04-18/58.95.184.73.json
2. Lambda関数の作成
ランタイム: Python 3.9以降(zoneinfoモジュール使用のため)
環境変数:
| 変数名 | 説明 | 例 |
|---|---|---|
| SNS_TOPIC_ARN | 通知先SNSトピックのARN | arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:waf-alerts |
| S3_BUCKET | 履歴保存用S3バケット | my-waf-notification-bucket |
| APP_ENV | 環境識別子 | dev |
Lambda関数コード:
import json
import boto3
import os
import base64
import zlib
from botocore.exceptions import ClientError
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
# 環境変数
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
APP_ENV = os.environ.get('APP_ENV', 'dev')
S3_BUCKET = os.environ.get('S3_BUCKET')
REGION = "ap-northeast-1"
def send_notification(topic_arn, subject, message):
"""SNS通知を送信"""
client = boto3.client('sns', region_name=REGION)
response = client.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
return response
def get_tokyo_date():
"""日本時間の現在日付を取得"""
tokyo_tz = ZoneInfo('Asia/Tokyo')
tokyo_now = datetime.now(tokyo_tz)
return tokyo_now.strftime('%Y-%m-%d')
def should_send_notification(ip_address):
"""同一IPが本日既に通知済みかチェック"""
s3 = boto3.client('s3', region_name=REGION)
today_str = get_tokyo_date()
s3_key = f"{APP_ENV}/{today_str}/{ip_address}.json"
try:
s3.head_object(Bucket=S3_BUCKET, Key=s3_key)
return False # ファイルが存在 = 通知済み
except ClientError as e:
if e.response['Error']['Code'] == '404':
return True
raise e
def record_notification(ip_address, log_data):
"""通知履歴をS3に記録"""
s3 = boto3.client('s3', region_name=REGION)
today_str = get_tokyo_date()
s3_key = f"{APP_ENV}/{today_str}/{ip_address}.json"
record = {
"ip_address": ip_address,
"notified_at": datetime.now(timezone.utc).isoformat(),
"log_data": log_data
}
s3.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=json.dumps(record, ensure_ascii=False),
ContentType='application/json'
)
def lambda_handler(event, context):
# CloudWatch Logsからのデータをデコード
compressed_payload = base64.b64decode(event['awslogs']['data'])
uncompressed_payload = zlib.decompress(compressed_payload, 16 + zlib.MAX_WBITS)
log_data = json.loads(uncompressed_payload)
for log_event in log_data['logEvents']:
log_message = json.loads(log_event['message'])
# IPアドレス取得
client_ip = log_message.get("httpRequest", {}).get("clientIp", "不明")
# 本日通知済みかチェック
if not should_send_notification(client_ip):
print(f"IPアドレス {client_ip} は本日既に通知済みです")
continue
# 国情報取得
country = log_message.get("httpRequest", {}).get("country", "不明")
# User-Agent取得
user_agent = "不明"
headers = log_message.get("httpRequest", {}).get("headers", [])
for header in headers:
if header.get("name", "").lower() == "user-agent":
user_agent = header.get("value", "不明")
break
# レートベースルール情報取得
rate_rule = {}
rate_list = log_message.get("rateBasedRuleList", [])
if rate_list:
rate_rule = rate_list[0]
# 通知メッセージ作成
subject = f"[{APP_ENV}] WAF IPAddressRateLimitRule でアクセスをブロックしました"
message = f'''WAFがレートベースルールによりアクセスをブロックしました。
■ ブロック情報
IPアドレス: {client_ip}
国: {country}
User-Agent: {user_agent}
■ ルール詳細
ルール名: {log_message.get("terminatingRuleId", "不明")}
ルールタイプ: {log_message.get("terminatingRuleType", "不明")}
最大リクエスト数: {rate_rule.get("maxRateAllowed", "不明")}
評価期間: {rate_rule.get("evaluationWindowSec", "不明")}秒
■ 環境
環境: {APP_ENV}
タイムスタンプ: {datetime.now(ZoneInfo('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S')} (JST)
※同一IPアドレスへの通知は1日1回です。
'''
# SNS通知送信
try:
send_notification(SNS_TOPIC_ARN, subject, message)
record_notification(client_ip, log_message)
print(f"通知送信完了: {client_ip}")
except ClientError as e:
print(f"通知送信エラー: {e}")
raise e
return {'statusCode': 200, 'body': 'Success'}
3. IAMポリシーの設定
Lambda実行ロールに以下のポリシーを付与します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["sns:Publish"],
"Resource": "arn:aws:sns:ap-northeast-1:XXXXXXXXXXXX:waf-alerts"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:HeadObject"],
"Resource": "arn:aws:s3:::my-waf-notification-bucket/*"
}
]
}
実装時にハマったポイント
当初、以下のようにS3権限をまとめて記述していました。
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
"Resource": "arn:aws:s3:::my-bucket/dev/*"
}
これだとAccessDeniedエラーが発生します。理由は、s3:ListBucketはバケットレベルの権限であり、オブジェクトパス(/*)を指定したリソースには適用されないためです。
正しくは以下のように分離する必要があります。
{
"Statement": [
{
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::my-bucket/dev/*"
},
{
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::my-bucket"
}
]
}
ただし今回はhead_objectで存在確認しているため、s3:ListBucketは不要で、s3:HeadObject(s3:GetObjectに含まれる)があれば動作します。
4. サブスクリプションフィルターの作成
CloudWatch Logsコンソールまたは以下のCLIで設定します。
aws logs put-subscription-filter \
--log-group-name "aws-waf-logs-your-webacl-name" \
--filter-name "IPAddressRateLimitRule-Block-Filter" \
--filter-pattern '{ $.action = "BLOCK" && $.terminatingRuleId = "IPAddressRateLimitRule" }' \
--destination-arn "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:waf-block-notification"
注意: Lambdaに対してCloudWatch Logsからの呼び出し権限を付与する必要があります。
aws lambda add-permission \
--function-name waf-block-notification \
--statement-id AllowCloudWatchLogs \
--action lambda:InvokeFunction \
--principal logs.ap-northeast-1.amazonaws.com \
--source-arn "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:aws-waf-logs-*:*"
通知メールの例
件名: [dev] WAF IPAddressRateLimitRule でアクセスをブロックしました
WAFがレートベースルールによりアクセスをブロックしました。
■ ブロック情報
IPアドレス: x.x.x.x
国: JP
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...
■ ルール詳細
ルール名: IPAddressRateLimitRule
ルールタイプ: RATE_BASED
最大リクエスト数: 600
評価期間: 60秒
■ 環境
環境: dev
タイムスタンプ: 2026-02-09 15:30:45 (JST)
運用上の注意点
-
S3ライフサイクル設定: 履歴ファイルが蓄積されるため、30日などでの自動削除ルールを設定することをおすすめします
-
タイムゾーン: 通知の重複判定は日本時間(Asia/Tokyo)基準です。Python 3.9以降の
zoneinfoを使用しているため、3.8以前の場合はpytzをLayerで追加してください
参考リンク
- AWS WAF ログの設定 - AWS公式ドキュメント
- CloudWatch Logs サブスクリプションフィルター - AWS公式ドキュメント
- レートベースのルールステートメント - AWS公式ドキュメント
まとめ
AWS WAFv2のレートベースルールによるIPブロックをリアルタイムで把握できる通知システムを構築しました。Lambda側でフィルタリング条件を変更すれば、特定のルールのみ通知するなど柔軟な対応が可能です。
次回は一時的に大量のアクセスが来た場合に、アクセスブロックを行うWAF設定をしたときの内容を記事にします。
お知らせ
技術ブログを週1〜2本更新中、ソーイをフォローして最新記事をチェック!
https://qiita.com/organizations/sewii
