この記事でわかること
- 任意のHTTPステータスをWAFで自動ブロックする構成
- EventBridge Scheduler + S3でシンプルな重複防止を実現する方法
はじめに
ソーイ株式会社 村上です。
運用しているプロジェクトでnginxの499エラーが頻発し、バックエンドのリソースが圧迫される事象が発生しました。ピーク時には1分間に300件以上の499エラーが発生し、アプリケーションサーバーのCPU使用率が90%以上まで上昇しました。手動でのIP遮断では対応が追いつかないため、自動ブロックの仕組みを検討・構築しました。
本記事では、自動ブロックの仕組みについて紹介します。
499エラーとは
499ステータスコードは、クライアントがサーバーのレスポンスを受け取る前に接続を切断した場合に記録されるnginx独自のステータスコードです。
主な発生原因は以下の通りです。
- クライアントのタイムアウト設定がサーバーの処理時間より短い
- ユーザーがページ遷移やリロードを連続で行う
- ボットや不正なクライアントによる大量リクエスト
- モバイルアプリのネットワーク切り替えによる接続断
今回対処したかったのは、短時間に大量の499エラーを発生させるクライアントを自動ブロックすることです。
なぜWAFでブロックするのか
ブロックの方法として、以下を検討しました。
| 方法 | 課題 |
|---|---|
| nginxでIP制限(deny等) | ブロック処理自体がサーバーの負荷になる。複数台構成では設定の同期も必要 |
| アプリケーションでブロック | リクエストがバックエンドまで到達するため処理コストが高い |
| WAFでブロック | ALBの手前で遮断できるため、バックエンドに負荷をかけずに済む |
WAFであればリクエストがアプリケーションサーバーに到達する前に遮断できるため、最も効率的にサーバー負荷を軽減できます。今回はWAFによるブロックを採用しました。
全体アーキテクチャ
自動ブロックのフローとなります。
CloudWatch Logsのアラームをフックとして、EventBridge経由でLambda実行を行います。LambdaからWAFのブロック用IPセットに登録するという流れになります。
ポイント:Lambda関数1つで完結
今回一時的な集中アクセスに対応したいため、任意のIPアドレスのリクエストをブロック後、解除する仕組みをいれています。DynamoDB TTL+Streamsでの管理も検討しましたが、利用サービスが増えすぎるため、よりシンプルなEventBridge Schedulerで同じLambda関数を2分後に再呼び出しする方式を採用しました。
利用AWSサービス一覧
| サービス | 役割 |
|---|---|
| CloudWatch Logs | nginxアクセスログの収集・保存 |
| CloudWatch メトリクスフィルター | 499エラーの検出・IPごとの集計 |
| CloudWatch アラーム | しきい値超過の監視 |
| EventBridge Rule | アラーム状態変化をLambdaにルーティング |
| EventBridge Scheduler | ブロック解除をスケジュール |
| Lambda | IPセットへの追加・削除処理 |
| WAF IPセット + Web ACL | IPベースのアクセス制御 |
| S3 | ブロック通知の重複チェック用 |
構築手順
1. CloudWatch Logsへのnginxログ転送
EC2インスタンス上のnginxアクセスログをCloudWatch Logsに送るため、CloudWatch Agentを導入しました。
CloudWatch Agentのインストール
# Amazon Linux
wget https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
sudo rpm -U ./amazon-cloudwatch-agent.rpm
エージェント設定ファイル
{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/nginx/access.log",
"log_group_name": "/aws/ec2/nginx/access",
"log_stream_name": "{instance_id}",
"timezone": "Local"
}
]
}
}
}
}
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
-a fetch-config -m ec2 -s \
-c file:/opt/aws/amazon-cloudwatch-agent/etc/config.json
EC2インスタンスのIAMロールには CloudWatchAgentServerPolicy をアタッチしておきます。
2. メトリクスフィルターの作成
CloudWatch Logsのロググループ /aws/ec2/nginx/access に対してメトリクスフィルターを作成します。
今回のnginxログフォーマットでは、X-Forwarded-Forヘッダーに実際のクライアントIPが含まれているため、それをディメンションとして抽出しました。
フィルターパターン例:
[client_ip, dash1, dash2, timestamp, request, status_code=499, body_bytes,
referer, user_agent, real_client_ip, request_time, upstream_response_time]
メトリクス設定:
| 項目 | 値 |
|---|---|
| フィルター名 | Nginx499ByIP |
| メトリクス名前空間 | CustomMetrics/Nginx |
| メトリクス名 | ClientDisconnect499 |
| メトリクス値 | 1 |
| ディメンション(ClientIP) | $real_client_ip |
ポイントは、ディメンションにクライアントIPを設定することです。これにより、IPアドレスごとに499エラーの発生回数を個別にカウントできます。
3. WAF IPセットとWeb ACLルールの作成
先にブロック先となるWAF IPセットを作成しておきます。
IPセット設定:
| 項目 | 値 |
|---|---|
| IP set name | BlockedClients499 |
| Region | ALBと同じリージョン |
| IP version | IPv4 |
| IP addresses | (空) |
作成後、対象のWeb ACLにIPセットルールを追加します。
| 項目 | 値 |
|---|---|
| Rule type | IP set |
| Rule name | Block499Clients |
| IP set | BlockedClients499 |
| Action | Block |
4. CloudWatch アラームの作成
メトリクスフィルターで作成したカスタムメトリクスに対してアラームを設定します。
| 項目 | 値 |
|---|---|
| メトリクス |
CustomMetrics/Nginx → ClientDisconnect499
|
| 統計 | Sum |
| 期間 | 60秒 |
| しきい値 | 10以上 |
| データポイント | 1/1 |
| 欠落データの処理 | 欠落データを正常として処理 |
5. EventBridge Ruleの作成
CloudWatch アラームの状態変化をLambda関数にルーティングするEventBridge Ruleを作成します。SNSを経由せず、EventBridgeで直接Lambdaを呼び出すことで構成をシンプルにしています。
イベントパターン:
{
"source": ["aws.cloudwatch"],
"detail-type": ["CloudWatch Alarm State Change"],
"detail": {
"state": {
"value": ["ALARM"]
},
"alarmName": [{
"prefix": "Nginx-499-High-Rate-"
}]
}
}
ターゲット: Lambda関数を直接指定します。
aws events put-targets \
--rule WAF-Block-On-Nginx-499 \
--targets "Id"="1","Arn"="arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:waf-ip-block-manager"
6. Lambda関数:ブロック追加&解除(1関数で両方担当)
Lambda関数は action パラメータによって処理を分岐させます。EventBridge Ruleからの呼び出し時はブロック追加、EventBridge Schedulerからの呼び出し時はブロック解除を行います。
処理の流れを図にします。
以下、主要な処理のコードです。
エントリーポイントとブロック追加
def lambda_handler(event, context):
if event.get('action') == 'remove':
return handle_remove(event)
return handle_add(event)
def handle_add(event):
"""499エラー検知時:WAFにIPを追加し、2分後の削除をスケジュール"""
detail = event.get('detail', {})
if detail.get('state', {}).get('value', '') != 'ALARM':
return {'statusCode': 200, 'body': 'Not in ALARM state'}
problem_ips = get_problem_ips(detail.get('alarmName', ''))
ipset_id = os.environ['IPSET_ID']
scope = os.environ.get('SCOPE', 'REGIONAL')
for client_ip in problem_ips:
if not should_notify(client_ip): # S3で同日の重複チェック
continue
add_ip_to_waf(client_ip, ipset_id, scope)
schedule_removal(client_ip, ipset_id, scope)
return {'statusCode': 200, 'body': f'Blocked {len(problem_ips)} IPs'}
WAF IPセットの操作
def add_ip_to_waf(client_ip, ipset_id, scope):
response = wafv2.get_ip_set(Scope=scope, Id=ipset_id)
current_ips = response['IPSet']['Addresses']
ip_cidr = f"{client_ip}/32"
if ip_cidr not in current_ips:
current_ips.append(ip_cidr)
wafv2.update_ip_set(
Scope=scope, Id=ipset_id,
Addresses=current_ips, LockToken=response['LockToken']
)
def remove_ip_from_waf(client_ip, ipset_id, scope):
response = wafv2.get_ip_set(Scope=scope, Id=ipset_id)
current_ips = response['IPSet']['Addresses']
ip_cidr = f"{client_ip}/32"
if ip_cidr in current_ips:
current_ips.remove(ip_cidr)
wafv2.update_ip_set(
Scope=scope, Id=ipset_id,
Addresses=current_ips, LockToken=response['LockToken']
)
EventBridge Schedulerで2分後の解除をスケジュール
def schedule_removal(client_ip, ipset_id, scope):
removal_time = datetime.utcnow() + timedelta(minutes=2)
schedule_name = f"remove-{client_ip.replace('.', '-')}-{int(datetime.utcnow().timestamp())}"
scheduler.create_schedule(
Name=schedule_name,
ScheduleExpression=f"at({removal_time.strftime('%Y-%m-%dT%H:%M:%S')})",
Target={
'Arn': os.environ['LAMBDA_FUNCTION_ARN'],
'RoleArn': os.environ['SCHEDULER_ROLE_ARN'],
'Input': json.dumps({
'action': 'remove',
'client_ip': client_ip,
'ipset_id': ipset_id,
'scope': scope,
'schedule_name': schedule_name
})
},
FlexibleTimeWindow={'Mode': 'OFF'}
)
S3による重複チェック
def should_notify(ip_address):
"""S3で同日の通知済みチェック。未通知ならファイルを作成してTrueを返す"""
bucket = os.environ.get('S3_BUCKET', '')
if not bucket:
return True
today_str = datetime.utcnow().strftime('%Y-%m-%d')
key = f"waf-blocked-ips/{today_str}/{ip_address}.json"
try:
s3.head_object(Bucket=bucket, Key=key)
return False # 既に通知済み
except Exception:
s3.put_object(Bucket=bucket, Key=key,
Body=json.dumps({'ip_address': ip_address, 'notification_date': today_str}),
ContentType='application/json')
return True
7. 環境変数の設定
Lambda関数に以下の環境変数を設定します。
| 環境変数 | 説明 | 値の例 |
|---|---|---|
IPSET_ID |
WAF IPセットのID | a1b2c3d4-5678-... |
SCOPE |
WAFのスコープ | REGIONAL |
LAMBDA_FUNCTION_ARN |
自身のLambda ARN | arn:aws:lambda:... |
SCHEDULER_ROLE_ARN |
EventBridge Scheduler用IAMロール | arn:aws:iam::... |
S3_BUCKET |
通知重複チェック用S3バケット | my-waf-logs |
8. IAMロールの設定
Lambda関数用ロール
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["wafv2:GetIPSet", "wafv2:UpdateIPSet"],
"Resource": "arn:aws:wafv2:ap-northeast-1:ACCOUNT_ID:regional/ipset/*"
},
{
"Effect": "Allow",
"Action": ["cloudwatch:ListMetrics", "cloudwatch:GetMetricStatistics"],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["scheduler:CreateSchedule", "scheduler:DeleteSchedule"],
"Resource": "arn:aws:scheduler:ap-northeast-1:ACCOUNT_ID:schedule/default/remove-*"
},
{
"Effect": "Allow",
"Action": ["iam:PassRole"],
"Resource": "arn:aws:iam::ACCOUNT_ID:role/EventBridge-Scheduler-Lambda-Role"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:HeadObject"],
"Resource": "arn:aws:s3:::my-waf-logs/waf-blocked-ips/*"
},
{
"Effect": "Allow",
"Action": ["logs:PutLogEvents", "logs:CreateLogGroup", "logs:CreateLogStream"],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
EventBridge Scheduler用ロール
EventBridge SchedulerがLambda関数を呼び出すための専用ロールも必要です。
信頼ポリシー:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "scheduler.amazonaws.com" },
"Action": "sts:AssumeRole"
}
]
}
権限ポリシー:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:waf-ip-block-manager-*"
}
]
}
動作フロー(具体例)
ブロック開始
10:00:00 - クライアント 203.0.113.100 が60秒間に12回の499エラーを発生
10:00:05 - CloudWatch アラーム発火(しきい値10を超過)
10:00:06 - EventBridge Rule → Lambda関数を呼び出し
10:00:07 - Lambda関数実行:
├ S3で重複チェック → 未通知なのでファイル作成
├ WAF IPセットに "203.0.113.100/32" を追加
└ EventBridge Scheduler作成:
名前: "remove-203-0-113-100-1731578407"
実行時刻: 10:02:07
イベント: {"action": "remove", "client_ip": "203.0.113.100", ...}
→ 203.0.113.100 からのアクセスがブロックされる
ブロック解除(2分後)
10:02:07 - EventBridge Scheduler が自動実行
10:02:08 - Lambda関数実行 (action=remove):
├ WAF IPセットから "203.0.113.100/32" を削除
└ スケジュール "remove-203-0-113-100-1731578407" を削除
→ 203.0.113.100 からのアクセスが再び可能に
構築時にハマったポイント
EventBridge RuleのRoleArn問題
EventBridge RuleのターゲットにLambda関数を指定する際、RoleArnを設定してしまうとLambdaが呼び出されません。RoleArnはKinesisやSQSなどLambda以外のターゲット用です。Lambda関数の場合はリソースベースポリシーで権限を付与します。
# Lambda関数にEventBridgeからの呼び出し権限を付与
aws lambda add-permission \
--function-name waf-ip-block-manager \
--statement-id AllowEventBridgeInvoke \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn arn:aws:events:ap-northeast-1:ACCOUNT_ID:rule/WAF-Block-On-Nginx-499
CloudWatchアラームの「1つのメトリクスを選択」
メトリクスフィルターで複数のディメンション(ClientIP)を持つメトリクスを使う場合、CloudWatchアラームの作成画面で特定のIPを選択するよう求められます。すべてのIPを横断的に監視するには、メトリクス数学式(Math Expression)を使用する必要がありました。
運用上の注意点
誤検知への対策
正規ユーザーが一時的にネットワーク不安定な状況で499エラーを発生させる可能性もあります。しきい値(60秒で10回)やブロック期間(2分)は、実際のトラフィックパターンを見ながらチューニングしてください。
WAF IPSetのサイズ制限
WAF IPセットには1つあたり 最大10,000件 のIPアドレスを登録できます。EventBridge Schedulerで確実に2分後に解除されるとはいえ、大量攻撃時にはIPセットが膨らむ可能性があるため注意が必要です。
S3による通知重複の防止
同じIPアドレスに対して同日中に何度もアラームが発火する場合があります。S3に日付+IPごとのファイルを作成しておくことで、同日の重複通知を防止しています。DynamoDBを使わず、S3のシンプルなファイル存在チェック(head_object)で実装できるためお手軽です。
まとめ
nginxの499エラーを起点として、CloudWatch → EventBridge → Lambda → WAF の連携で自動ブロック・自動解除の仕組みを構築しました。
ポイントをまとめると、以下の通りです。
- CloudWatchメトリクスフィルターでIPアドレスごとに任意のHTTPステータスをカウントできる
- WAFレイヤーでのブロックなので、バックエンドに負荷をかけずに不正クライアントを遮断できる
- Lambda関数1つ+環境変数の切り替えで複数環境に対応
参考
- NGINX - src/http/ngx_http_request.h
- AWS WAF IP Set
- Amazon EventBridge Scheduler
- CloudWatch メトリクスフィルター
付録
Lambda関数 全体コード
import json
import boto3
import os
from datetime import datetime, timedelta
wafv2 = boto3.client('wafv2')
cloudwatch = boto3.client('cloudwatch')
scheduler = boto3.client('scheduler')
s3 = boto3.client('s3')
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event, separators=(',', ':'))}")
if event.get('action') == 'remove':
return handle_remove(event)
return handle_add(event)
def handle_add(event):
"""499エラー検知時:WAFにIPを追加し、2分後の削除をスケジュール"""
detail = event.get('detail', {})
alarm_name = detail.get('alarmName', '')
state_value = detail.get('state', {}).get('value', '')
if state_value != 'ALARM':
return {'statusCode': 200, 'body': 'Not in ALARM state'}
problem_ips = get_problem_ips(alarm_name)
if not problem_ips:
return {'statusCode': 200, 'body': 'No problematic IPs found'}
ipset_id = os.environ['IPSET_ID']
scope = os.environ.get('SCOPE', 'REGIONAL')
for client_ip in problem_ips:
if not should_notify(client_ip):
print(f"Already notified for {client_ip} today, skipping")
continue
add_ip_to_waf(client_ip, ipset_id, scope)
schedule_removal(client_ip, ipset_id, scope)
return {'statusCode': 200, 'body': f'Blocked {len(problem_ips)} IPs'}
def handle_remove(event):
"""2分後:WAFからIPを削除し、スケジュールをクリーンアップ"""
client_ip = event['client_ip']
ipset_id = event['ipset_id']
scope = event['scope']
schedule_name = event.get('schedule_name', '')
remove_ip_from_waf(client_ip, ipset_id, scope)
if schedule_name:
try:
scheduler.delete_schedule(Name=schedule_name)
print(f"Deleted schedule: {schedule_name}")
except Exception as e:
print(f"Warning: Could not delete schedule: {str(e)}")
return {'statusCode': 200, 'body': f'Unblocked {client_ip}'}
def should_notify(ip_address):
"""S3で同日の通知済みチェック。未通知ならファイルを作成してTrueを返す"""
bucket = os.environ.get('S3_BUCKET', '')
if not bucket:
return True
today_str = datetime.utcnow().strftime('%Y-%m-%d')
key = f"waf-blocked-ips/{today_str}/{ip_address}.json"
try:
s3.head_object(Bucket=bucket, Key=key)
return False
except Exception:
s3.put_object(
Bucket=bucket, Key=key,
Body=json.dumps({
'ip_address': ip_address,
'notification_date': today_str,
'last_updated': datetime.utcnow().isoformat()
}),
ContentType='application/json'
)
return True
def add_ip_to_waf(client_ip, ipset_id, scope):
"""WAF IPセットにIPを追加"""
response = wafv2.get_ip_set(Scope=scope, Id=ipset_id)
current_ips = response['IPSet']['Addresses']
lock_token = response['LockToken']
ip_cidr = f"{client_ip}/32"
if ip_cidr not in current_ips:
current_ips.append(ip_cidr)
wafv2.update_ip_set(
Scope=scope, Id=ipset_id,
Addresses=current_ips, LockToken=lock_token
)
print(f"SUCCESS: Added {client_ip} to WAF block list")
def remove_ip_from_waf(client_ip, ipset_id, scope):
"""WAF IPセットからIPを削除"""
response = wafv2.get_ip_set(Scope=scope, Id=ipset_id)
current_ips = response['IPSet']['Addresses']
lock_token = response['LockToken']
ip_cidr = f"{client_ip}/32"
if ip_cidr in current_ips:
current_ips.remove(ip_cidr)
wafv2.update_ip_set(
Scope=scope, Id=ipset_id,
Addresses=current_ips, LockToken=lock_token
)
print(f"SUCCESS: Removed {client_ip} from WAF block list")
def schedule_removal(client_ip, ipset_id, scope):
"""EventBridge Schedulerで2分後の削除をスケジュール"""
removal_time = datetime.utcnow() + timedelta(minutes=2)
timestamp = int(datetime.utcnow().timestamp())
schedule_name = f"remove-{client_ip.replace('.', '-')}-{timestamp}"
function_arn = os.environ['LAMBDA_FUNCTION_ARN']
scheduler_role_arn = os.environ['SCHEDULER_ROLE_ARN']
scheduler.create_schedule(
Name=schedule_name,
ScheduleExpression=f"at({removal_time.strftime('%Y-%m-%dT%H:%M:%S')})",
Target={
'Arn': function_arn,
'RoleArn': scheduler_role_arn,
'Input': json.dumps({
'action': 'remove',
'client_ip': client_ip,
'ipset_id': ipset_id,
'scope': scope,
'schedule_name': schedule_name
})
},
FlexibleTimeWindow={'Mode': 'OFF'}
)
print(f"SUCCESS: Scheduled removal at {removal_time.isoformat()} UTC")
def get_problem_ips(alarm_name):
"""CloudWatchメトリクスからしきい値を超えたIPを取得"""
metric_name = 'ClientDisconnect499'
namespace = 'CustomMetrics/Nginx'
response = cloudwatch.list_metrics(
Namespace=namespace,
MetricName=metric_name
)
problem_ips = []
now = datetime.utcnow()
for metric in response.get('Metrics', []):
dimensions = {d['Name']: d['Value'] for d in metric['Dimensions']}
client_ip = dimensions.get('ClientIP')
if not client_ip:
continue
stats = cloudwatch.get_metric_statistics(
Namespace=namespace,
MetricName=metric_name,
Dimensions=metric['Dimensions'],
StartTime=now - timedelta(seconds=120),
EndTime=now,
Period=60,
Statistics=['Sum']
)
for dp in stats.get('Datapoints', []):
if dp['Sum'] >= 10:
problem_ips.append(client_ip)
break
return problem_ips
お知らせ
技術ブログを週1〜2本更新中、ソーイをフォローして最新記事をチェック!
https://qiita.com/organizations/sewii

