概要
- 一定回数以上不正アクセスを行ったIPアドレスをAWS WAFのブロックルールに自動的に追加する仕組みをAWS Lambdaで実装する
注意点
- 前提として、不正アクセスは同一IPアドレスで実行されています
- 不正アクセスをしたIPアドレスをWAFでブロックし、以降別のユーザーにそのIPアドレスが割り振られた場合はそのユーザーはシステムを利用できなくなります
- 定期的な解除機能も作ると安心です
- 特定の国からの不正アクセスが多く当該国からの利用を想定していないなら、地理的一致ルールステートメントなどを利用する方が簡単です
- アクセス数が多く、特定のURIに対して行われているのであればレートベースのルールを利用する方が簡単です
-
レートベースのルールステートメント
- 本記事作成時点で設定できる最小レートは100
-
レートベースのルールステートメント
やってみた
システム図
簡単ですが今回実装する仕組みです。
アプリケーションの設定
アプリケーションではアクセスしたIPアドレスと不正アクセスされたことがわかる文言(ここではエラーログ)を記録します。
エラーログにIPアドレスが記録されているとログの抽出が楽になります。
そうなっていなくても、例えばユーザーIDなどのIPアドレスとエラーログを繋ぐ情報が記録されていればログの抽出は可能です。
CloudWatchの設定
アプリケーションログに対してメトリクスフィルターを作成し、それを用いてアラームを作成します。
アラーム状態トリガーを作成し、アラーム発生状態ならSNSでLambda関数を呼び出します。
Lambda関数を呼び出すためのSNSトピックがなければ作成します。
WAFの設定
IPアドレスを追加するIPセットを作成します。
Web ACLsを作成します。
ACLに対しマネージではないルールで、先ほどのIPリストをブロックするように設定します。
Lambda関数
SNSからの呼び出し情報を元に、CloudWatchのログからIPアドレスを抽出します。
Lambda関数の実行ロールにはWAF v2とCloudWatchへのアクセス設定が必要です(WAFではなくてWAF v2)。
今回はエラーログにアクセスIPアドレスが記録されていない前提なので、エラーログとIPアドレスそれぞれに対しログを抽出しています。
エラーログとIPアドレスが記録されるアクセスログを繋ぐ情報は user_id
です。
各IPアドレスによるエラー発生件数を確認することで、アラーム発生期間に同じエラーを起こしてしまった通常ユーザーを巻き込みません。
update_ip_set
は全てのIPアドレスを上書き更新してしまうので注意が必要です。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/wafv2.html#WAFV2.Client.update_ip_set
# -*- coding: utf-8 -*-
import boto3
import json
import datetime
import calendar
import re
import os
# 何分前までを抽出対象期間とするか
TIME_FROM_MIN=5
# INFO: ログをフィルターするのに用いる情報
# Lambda関数の環境変数から固定の値を読み込む、eventから呼び出し元の情報を読み込むこともできる
LOG_GROUP_NAME = '/application-log-group'
FILTER_PATTERN = 'フィルターする文言'
BLACK_IP_ADDRESS_THRESHOLD = 10
IP_SET_ID = 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
IP_SET_NAME = 'blacklist_by_lambda'
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
try:
time_str = message['StateChangeTime'][:19]
time_range = get_time(time_str)
# エラーログをフィルター
id_dict = {}
next_token = ''
while True:
response = fetch_logs(LOG_GROUP_NAME, FILTER_PATTERN, time_range, next_token)
for event in response['events']:
message = event['message']
id_dict = create_id_dict(message, id_dict)
if response.get("nextToken"):
next_token = response["nextToken"]
else:
break
# IPアドレス一覧を抽出
black_ip_dict = {}
next_token = ''
for uid in id_dict:
while True:
response = fetch_logs(LOG_GROUP_NAME, '{ $.user_id = "' + str(uid) + '" }', time_range, next_token)
for event in response['events']:
message = event['message']
black_ip_dict = append_ip_list(message, black_ip_dict)
if response.get("nextToken"):
next_token = response["nextToken"]
else:
break
update_waf_ip_set(black_ip_dict)
except Exception as e:
print(e)
raise e
def get_time(time_str):
timeto = datetime.datetime.strptime(time_str ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)
u_from = int(timefrom.timestamp()) * 1000
u_to = int(timeto.timestamp()) * 1000
return [u_from, u_to]
def fetch_logs(logGroupName, filterPattern, time_range, next_token):
logs = boto3.client('logs')
if next_token:
response = logs.filter_log_events(
logGroupName = logGroupName,
filterPattern = filterPattern,
startTime = time_range[0],
endTime = time_range[1],
nextToken = next_token
)
else:
response = logs.filter_log_events(
logGroupName = logGroupName,
filterPattern = filterPattern,
startTime = time_range[0],
endTime = time_range[1]
)
return response
def create_id_dict(message, id_dict):
regex = re.compile(r"user_id:(\d+)")
uid = regex.search(message).groups()[0]
if not uid in id_dict.keys():
uid_count = 1
else:
uid_count = id_dict[uid]['count'] + 1
id_dict[uid] = {
'user_id': uid,
'count': uid_count
}
return id_dict
def append_ip_list(message, black_ip_dict):
regex = re.compile(r"\"ip\":\"(.*?)\"")
ip_address = regex.search(message).groups()[0]
if not ip_address in black_ip_dict.keys():
ip_count = 1
else:
ip_count = black_ip_dict[ip_address]['count'] + 1
black_ip_dict[ip_address] = {
'ip_address': f"{ip_address}/32",
'count': ip_count
}
return black_ip_dict
def update_waf_ip_set(black_ip_dict):
waf = boto3.client('wafv2')
update_list = []
for x in black_ip_dict:
if black_ip_dict[x]['count'] < BLACK_IP_ADDRESS_THRESHOLD:
continue
update_list.append(f"{black_ip_dict[x]['ip_address']}")
# 発生件数が一定以上でなければ除外
if (len(update_list) == 0):
return
response = waf.get_ip_set(
Name=IP_SET_NAME,
Scope='REGIONAL',
Id=IP_SET_ID
)
# 全部置き換えてしまうのでマージするように今までのIPアドレスを控えておく
registered_list = response['IPSet']['Addresses']
# INFO: 本来は更新のエラーチェックが必要
response = waf.update_ip_set(
Name=IP_SET_NAME,
Scope='REGIONAL',
Id=IP_SET_ID,
LockToken=response['LockToken'],
Addresses=[*registered_list, *update_list]
)
この関数を実行するとIPアドレスが追加され続けて解除はされません。通常ユーザーに割り振られてしまうとシステムを使うことができなくなります。
不正アクセスには波があるので数時間置きにWAFのIPリストを解除するバッチを作ることを推奨します。
Lambda関数のテストイベントJSON
ロググループの情報とアラーム発生時刻(UCT)を設定することでテストできます。
{
"Records": [
{
"Sns": {
"Message": "{\"Trigger\": {\"MetricName\": \"metric-name\",\"Namespace\": \"namespace\"}, \"StateChangeTime\": \"2023-02-05T00:00:00\"}"
}
}
]
}