3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

簡易IPアドレス自動ブロック機能の実装

Posted at

概要

  • 一定回数以上不正アクセスを行ったIPアドレスをAWS WAFのブロックルールに自動的に追加する仕組みをAWS Lambdaで実装する

注意点

  • 前提として、不正アクセスは同一IPアドレスで実行されています
  • 不正アクセスをしたIPアドレスをWAFでブロックし、以降別のユーザーにそのIPアドレスが割り振られた場合はそのユーザーはシステムを利用できなくなります
    • 定期的な解除機能も作ると安心です
  • 特定の国からの不正アクセスが多く当該国からの利用を想定していないなら、地理的一致ルールステートメントなどを利用する方が簡単です
  • アクセス数が多く、特定のURIに対して行われているのであればレートベースのルールを利用する方が簡単です

やってみた

システム図

簡単ですが今回実装する仕組みです。

image.png

アプリケーションの設定

アプリケーションではアクセスしたIPアドレスと不正アクセスされたことがわかる文言(ここではエラーログ)を記録します。

エラーログにIPアドレスが記録されているとログの抽出が楽になります。
そうなっていなくても、例えばユーザーIDなどのIPアドレスとエラーログを繋ぐ情報が記録されていればログの抽出は可能です。

CloudWatchの設定

アプリケーションログに対してメトリクスフィルターを作成し、それを用いてアラームを作成します。

アラーム状態トリガーを作成し、アラーム発生状態ならSNSでLambda関数を呼び出します。
Lambda関数を呼び出すためのSNSトピックがなければ作成します。

WAFの設定

IPアドレスを追加するIPセットを作成します。

Web ACLsを作成します。
ACLに対しマネージではないルールで、先ほどのIPリストをブロックするように設定します。

Lambda関数

SNSからの呼び出し情報を元に、CloudWatchのログからIPアドレスを抽出します。
Lambda関数の実行ロールにはWAF v2とCloudWatchへのアクセス設定が必要です(WAFではなくてWAF v2)。

今回はエラーログにアクセスIPアドレスが記録されていない前提なので、エラーログとIPアドレスそれぞれに対しログを抽出しています。
エラーログとIPアドレスが記録されるアクセスログを繋ぐ情報は user_id です。

各IPアドレスによるエラー発生件数を確認することで、アラーム発生期間に同じエラーを起こしてしまった通常ユーザーを巻き込みません。

update_ip_set は全てのIPアドレスを上書き更新してしまうので注意が必要です。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/wafv2.html#WAFV2.Client.update_ip_set

lambda_function.py
# -*- coding: utf-8 -*-
import boto3
import json
import datetime
import calendar
import re
import os

# 何分前までを抽出対象期間とするか
TIME_FROM_MIN=5

# INFO: ログをフィルターするのに用いる情報
# Lambda関数の環境変数から固定の値を読み込む、eventから呼び出し元の情報を読み込むこともできる
LOG_GROUP_NAME = '/application-log-group'
FILTER_PATTERN = 'フィルターする文言'

BLACK_IP_ADDRESS_THRESHOLD = 10

IP_SET_ID = 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
IP_SET_NAME = 'blacklist_by_lambda'

def lambda_handler(event, context):
    message = json.loads(event['Records'][0]['Sns']['Message'])   

    try:
        time_str = message['StateChangeTime'][:19]
        time_range = get_time(time_str)
    
        # エラーログをフィルター
        id_dict = {}
    
        next_token = ''
        while True:
            response = fetch_logs(LOG_GROUP_NAME, FILTER_PATTERN, time_range, next_token)
    
            for event in response['events']:
                message = event['message']
                id_dict = create_id_dict(message, id_dict)
       
            if response.get("nextToken"):
                next_token = response["nextToken"]
            else:
                break
    
        # IPアドレス一覧を抽出
        black_ip_dict = {}
    
        next_token = ''
        for uid in id_dict:
            while True:
                response = fetch_logs(LOG_GROUP_NAME, '{ $.user_id = "' + str(uid) + '" }', time_range, next_token)
    
                for event in response['events']:
                    message = event['message']
    
                    black_ip_dict = append_ip_list(message, black_ip_dict)
    
                if response.get("nextToken"):
                    next_token = response["nextToken"]
                else:
                    break

        update_waf_ip_set(black_ip_dict)
    
    except Exception as e:
        print(e)
        raise e
    

def get_time(time_str):

    timeto = datetime.datetime.strptime(time_str ,'%Y-%m-%dT%H:%M:%S') + datetime.timedelta(minutes=1)
    timefrom = timeto - datetime.timedelta(minutes=TIME_FROM_MIN)

    u_from = int(timefrom.timestamp()) * 1000
    u_to = int(timeto.timestamp()) * 1000

    return [u_from, u_to]


def fetch_logs(logGroupName, filterPattern, time_range, next_token):
    logs = boto3.client('logs')
 
    if next_token:
        response = logs.filter_log_events(
            logGroupName = logGroupName,
            filterPattern = filterPattern,
            startTime = time_range[0],
            endTime = time_range[1],
            nextToken = next_token
        )
    else:
        response = logs.filter_log_events(
            logGroupName = logGroupName,
            filterPattern = filterPattern,
            startTime = time_range[0],
            endTime = time_range[1]
        )

    return response


def create_id_dict(message, id_dict): 
    regex = re.compile(r"user_id:(\d+)")
    uid = regex.search(message).groups()[0]

    if not uid in id_dict.keys():
        uid_count = 1
    else:
        uid_count = id_dict[uid]['count'] + 1

    id_dict[uid] = {
        'user_id': uid,
        'count': uid_count
    }

    return id_dict


def append_ip_list(message, black_ip_dict):
    regex = re.compile(r"\"ip\":\"(.*?)\"")
    ip_address = regex.search(message).groups()[0]

    if not ip_address in black_ip_dict.keys():
        ip_count = 1
    else:
        ip_count = black_ip_dict[ip_address]['count'] + 1

    black_ip_dict[ip_address] = {
        'ip_address': f"{ip_address}/32",
        'count': ip_count
    }

    return black_ip_dict


def update_waf_ip_set(black_ip_dict):
    waf = boto3.client('wafv2')

    update_list = []
    for x in black_ip_dict:
        if black_ip_dict[x]['count'] < BLACK_IP_ADDRESS_THRESHOLD:
            continue

        update_list.append(f"{black_ip_dict[x]['ip_address']}")

    # 発生件数が一定以上でなければ除外
    if (len(update_list) == 0):
        return

    response = waf.get_ip_set(
       Name=IP_SET_NAME,
       Scope='REGIONAL',
       Id=IP_SET_ID
    )

    # 全部置き換えてしまうのでマージするように今までのIPアドレスを控えておく
    registered_list = response['IPSet']['Addresses']

    # INFO: 本来は更新のエラーチェックが必要
    response = waf.update_ip_set(
        Name=IP_SET_NAME,
        Scope='REGIONAL',
        Id=IP_SET_ID,
        LockToken=response['LockToken'],
        Addresses=[*registered_list, *update_list]
    )

この関数を実行するとIPアドレスが追加され続けて解除はされません。通常ユーザーに割り振られてしまうとシステムを使うことができなくなります。
不正アクセスには波があるので数時間置きにWAFのIPリストを解除するバッチを作ることを推奨します。

Lambda関数のテストイベントJSON

ロググループの情報とアラーム発生時刻(UCT)を設定することでテストできます。

{
  "Records": [
    {
      "Sns": {
        "Message": "{\"Trigger\": {\"MetricName\": \"metric-name\",\"Namespace\": \"namespace\"}, \"StateChangeTime\": \"2023-02-05T00:00:00\"}"
      }
    }
  ]
}
3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?