LoginSignup
2
6

More than 1 year has passed since last update.

ipblock - 超小型fail2ban

Last updated at Posted at 2023-03-17

はじめに

/etc/log/syslog を見ていると夥しい量の不法パケットが流れてきます。

statistics: max connection rate 1/60s for (smtp:192.168.1.1) at Mar 14 12:34:56

の類ですね。出現頻度も高く、postfixに負荷がかかるし、第一気持ち悪いので、自動的にblockする方法を考えました。

これは、Linuxサーバーで不正なパケットを検知し、自動的にブロックするPythonスクリプトです。syslogのログファイルを監視し、指定された正規表現パターンにマッチする不正なパケットをブロックするために、iptablesを使用します。また、特定のIPアドレスがブロックされている期間を追跡するために、sqlite3を使用します。

このスクリプトを使用することで、不正なパケットを自動的にブロックし、サーバーの負荷を軽減できます。また、手動でIPアドレスをブロックする必要がなく、セキュリティの向上に役立ちます。

fail2banという類似するシステムがあります。ipblock.pyはfail2banのように複数のlogを監視しません。何回か、その攻撃があったら、処断するという指定はなく1回目で遮断します。
例えばsshにloginを失敗して直ちに60秒遮断されるのと、3回間違えたら10分止められるのでは、そんなに遜色なくどちらもよく防御できいると思います。たいていのエラーはsyslogに入ってきますし、そうでないdeamonも設定を変えてsyslogにエラーを吐けるものも多いです。ですからsyslogだけしか見ないことは大きなハンデではないと思います。
何しろipblock.pyは軽い小さなスクリプトで、簡単にカスティマイズできるし、容易に設定できます。

互いの利点

ChatGPTに聞いてみました。
Q:
互いの利点欠点を①ipblock.pyにできてfail2banにはできないこと。②fail2banにできてipblock.pyにはできないことという観点でまとめてください。

A:
以下に、ipblock.pyとfail2banの利点と欠点を互いに比較した結果をまとめました。

ipblock.pyにできてfail2banにはできないこと:

① システムリソースの消費が少ないこと。
② 最長でも10秒でパケットをブロックする高速性能。
③ 簡単に設定ができること。
④ ブロック対象のIPアドレスの範囲を柔軟に指定できること。
⑤ ブロックしたIPアドレスのログを出力することができること。

fail2banにできてipblock.pyにはできないこと:

① 攻撃回数の指定でブロックを発動する機能があること。
② 著名なシステムで導入実績が多いこと。
③ 様々なログファイルを自動で解析し、攻撃パターンを検出できること。
④ 複数のアクションを実行できること(メール通知、iptablesルールの追加など)。
⑤ 簡単に拡張できるプラグインアーキテクチャを備えていること。

以上のように、ipblock.pyとfail2banはそれぞれに独自の利点を持っています。ipblock.pyはシンプルで高速なブロックを実現し、fail2banはより多機能で拡張性に富んでいます。利用環境や要件に合わせて適切な選択をすることが大切です。

方針検討

packetは頻繁にやってきます。LogWatchで抽出して手は間に合いません。

  1. Logを連続監視する。
  2. deamonとして監視するpython scriptをかく。
  3. iptablesを使って管理する
  4. メインのFireWallはISPのものを使い、漏れてきた不法パケットを落とす2段構えにする。
  5. 関連管理ツールも作る。

ipblock.py

ipblock.pyの本体です。

mkdir -p /etc/local/bin/

してから/etc/local/bin/ipblock.pyに置きます。

iptablesを操作して不法パケットを落とします。

/etc/local/bin/ipblock.py
#! /usr/bin/python3

# MIT License
#
# Lisence (c) 2023 Shigeru Makino <mac@agri-hitech.com>

# ipblock.py: LOGの監視とIPブロック
import subprocess
import syslog
import time
import re
import sys
import sqlite3

CONFIG_FILE = "/etc/ipblock/ipblock.conf"
BLOCKED_IPS_DB = "/var/ipblock/blocked_ips.db"
SYS_LOG = "/var/log/syslog"
global conn
global cur


# config fileの読み込み
def load_filters():
    with open(CONFIG_FILE, "r") as file:
        lines = file.readlines()
    filters = []
    for line in lines:
        if not line or line.startswith("#"):
            continue
        parts = line.strip().split(",")
        release = int(parts[0])
        proto, port_n = parts[1].split("/")
        port_n = int(port_n)
        regex = re.compile(parts[2])
        filters.append((release, proto, port_n, regex))
    return filters


# iptablesの削除
def del_filter(ip_address, proto, port_n):
    cmd = f"iptables -D INPUT -p {proto} --dport {port_n} -s {ip_address} -j DROP"
    subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    syslog.syslog(f"Unblocked {ip_address}({proto}/{port_n})")
    sql = f'DELETE FROM blocked_ips WHERE ip_address = "{ip_address}" and proto = "{proto}" and port_n = {port_n}'
    cur.execute(sql)
    conn.commit()


# iptablesの追加
def add_filter(ip_address, proto, port_n, expire):
    sql = f'INSERT INTO blocked_ips (ip_address,proto, port_n, unblock_time) VALUES ("{ip_address}","{proto}",{port_n},{expire})'
    cur.execute(sql)
    cmd = f"iptables -I INPUT -p {proto} --dport {port_n} -s {ip_address} -j DROP"
    subprocess.run(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    syslog.syslog(f"Blocked {ip_address}({proto}/{port_n})")
    conn.commit()


# ブロック期間が終了したすべてのIPアドレスのブロックを解除
def unblock_ips():
    global conn
    global cur
    now = int(time.time())
    sql = f"SELECT ip_address,proto,port_n FROM blocked_ips WHERE unblock_time < {now}"
    cur.execute(sql)
    blocked = cur.fetchall()
    for ip_address, proto, port_n in blocked:
        del_filter(ip_address, proto, port_n)


# Logを1行ずつチャック
def packet_check(line, filters):
    global conn
    global cur
    now = int(time.time())
    for release_time, proto, port_n, regex in filters:
        match = regex.search(line)
        if match:
            groups = match.groups()
            if len(groups) > 0:
                ip_address = match.group(1)
                sql = f'SELECT ip_address,proto,port_n FROM blocked_ips WHERE ip_address = "{ip_address}" and proto = "{proto}" and port_n = {port_n}'
                cur.execute(sql)
                lines = cur.fetchall()
                if lines:
                    for cip, cproto, cport_n in lines:
                        del_filter(cip, cproto, cport_n)
                expire = now + release_time
                add_filter(ip_address, proto, port_n, expire)


if __name__ == "__main__":
    syslog.syslog("IP blocking service started")
    try:
        sqlite_version = sqlite3.version
    except ModuleNotFoundError:
        syslog.syslog(
            "Error: sqlite3 module not found. Please install it and try again."
        )
        sys.exit(1)

    conn = sqlite3.connect(BLOCKED_IPS_DB)
    cur = conn.cursor()
    sql = """CREATE TABLE IF NOT EXISTS
        blocked_ips (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ip_address,
                proto TEXT,
                port_n INTEGER,
                unblock_time INTEGER)
        """
    cur.execute(sql)
    filters = load_filters()
    filter_index = 0
    with open(SYS_LOG, "r") as f:
        f.seek(0, 2)
        current = f.tell()
        f.close()

    while True:
        with open(SYS_LOG, "r") as f:
            f.seek(current, 0)
            lines = f.readlines()
            current = f.tell()
            if lines:
                for line in lines:
                    packet_check(line, filters)
        f.close()
        unblock_ips()
        time.sleep(10)
# End of ipblock.py
chmod +x /etc/local/bin/ipblock.py
apt update
apt install sqlite3
mkdir /etc/ipblock

設定ファイルを作ります。
リリースタイムは不正パケットが届き、締め出してから再度開くまでの時間です。
regexはサーチパターンの正規表現を書きます。ip_addressを抽出しますのでその部分は()でくくります。

持続時間,protocol/port,フィルターの正規表現
持続時間はパケットを阻止する時間を秒で表したものです。
正規表現はipaddressを取り出せるように書きます。
パケットは無数に来ます。
永久に阻止してはフィルター項数も膨大になり却ってCPU負荷が増えます。
短めを設定して、適当な項数になるように調整します。
下の例は構文チェッカーlintipb.pyの試験につかいので、故意に入れたバグがあります。

/etc/ipblock/ipblock.conf
# release_time[sec],port,regex
10800,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
error,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
600,tcp/error,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
E,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)600,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+\)600,tcp/error,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+\)
10800,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+)\)
10800,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+\)
10800,tcp/25,postfix.*: lost connection after AUTH from unknown\[([0-9.]+)\]
60,tcp/8080,sshd.*: Failed password for .+ from ([0-9.]+)
60,tcp/8080,sshd.*: Disconnecting authenticating user .+ from ([0-9.]+)
60,tcp/8080,sshd.*: User .+ from ([0-9.]+) not allowed because not listed in AllowUsers
60,tcp/8080,sshd.*: Failed publickey for .+ from ([0-9.]+)
60,tcp/8080,sshd.*: Invalid user .+ from ([0-9.]+)

lintipb.py

/etc/ipblock/ipblock.conf が正しく動作するかの試験スクリプトです。
/etc/local/bin/lintipb.py においてください。
指定したフィルターを/var/log/syslog に適用しマッチした行を表示します。

lintipb.py --help
usage: lintipb.py [-h] [-l LANGUAGE] [-c CONFIG] [-g LOG]

optional arguments:
  -h, --help            show this help message and exit
  -l LANGUAGE, --language LANGUAGE
                        specify language (default: en), ja, zh
  -c CONFIG, --config CONFIG
                        specif config file name (default:
                        /etc/ipblock/ipblock.conf)
  -g LOG, --log LOG     specify log file name (default: /var/log/syslog)

プログラムは以下の通りです。
現在、英日中西露韓、アラビア語の7か国語に対応してます。

/etc/local/bin/lintipb.py
#! /usr/bin/python3
# -*- encoding: utf-8
# MIT License
#
# License (c) 2023 Shigeru Makino
#

import sys
import re
import argparse


def load_filters(config_file, message):
    split_pattern = re.compile("(.*),([tcpud]+)/(.+),(.+)")
    maxlen = 100
    filters = []
    lineno = 0
    with open(config_file, "r") as f:
        lines = f.readlines()
    for line in lines:
        lineno = lineno + 1
        line = line.strip()
        if not line:
            # print(message["empty_line"].format({lineno}))
            continue
        elif line.startswith("#"):
            # print(message["comment"].format(lineno, line))
            continue
        parts = split_pattern.search(line)
        if not parts:
            print(message["invalid_setting"].format(lineno, line[0:maxlen]))
            continue
        try:
            regex = re.compile(parts.group(4))
        except re.error:
            print(message["regex"].format(lineno, line[0:maxlen]))
            continue
        try:
            release = int(parts.group(1))
        except ValueError:
            print(message["release"].format(lineno, line[0:maxlen]))
            continue
        ports = parts[1].split("/")
        protocol = parts.group(2)
        try:
            port_n = int(parts.group(3))
        except ValueError:
            print(message["invalid_port"].format(lineno, line[0:maxlen]))
            continue
        #        print(message["valid_line"].format(lineno, line[0:maxlen]))
        filters.append((release, protocol, port_n, regex, lineno))
    return filters


def packet_check(line, filters):
    for filter in filters:
        release_time, proto, port_n, regex, filterno = filter
        match = regex.search(line)
        if match:
            groups = match.groups()
            if len(groups) <= 0:
                print(message["filter"].format(filterno, regex))
                filters.remove(filter)


def defence_check(line, filters, defence_rec):
    for filter in filters:
        release_time, proto, port_n, regex, filterno = filter
        match = regex.search(line)
        if match:
            groups = match.groups()
            if len(groups) <= 0:
                defence_rec[filterno - 1] += 1


def lintipb(config_file, sys_log, message):
    print(message["hello"])
    print(message["conf_test"].format(config_file))
    filters = load_filters(config_file, message)
    print(message["defence"].format(sys_log))
    defence_rec = [0 for x in range(0, len(filters))]
    with open(sys_log, "r") as f:
        lines = f.readlines()
        f.close()
    for line in lines:
        defence_check(line, filters, defence_rec)
    i = 0
    for rec in defence_rec:
        release_time, proto, port_n, regex, filterno = filters[i]
        print(message["attack"].format(filterno, rec, regex))
        i += 1
    print(message["syslog_test"].format(sys_log))
    for line in lines:
        packet_check(line, filters)
    filters = load_filters(config_file, message)
    print(message["finished"])


if __name__ == "__main__":
    # Message in English
    en_message = {
        "hello": "#### {Hello, I'm linter for ipblock conf file",
        "conf_test": "#### Testing Configure file ({})",
        "syslog_test": "#### Testing Log file ({})",
        "filter": "{}: IP extraction failed:\n\t{}",
        "empty_line": "  {}:\n\tEmpty line",
        "comment": "  {}: Comment:\n\t{}",
        "invalid_setting": "E {}: Invalid setting in config file: \t{}",
        "regex": "E {}: Invalid regex or unbalanced parentheses:\n\t{}",
        "release": "E {}: Invalid release time format:\n\t{}",
        "invalid_port": "E {}: Invalid port number format:\n\t{}",
        "valid_line": "  {}: Valid line:\n\t{}",
        "unmatched_regex": "{}: Unable to get IP address with regex:\n\t{}",
        "defence": "#### Defence test",
        "attack": "Filter #{} defence {} times\n\t{}",
        "finished": "Finished !",
    }


    # 日本語のメッセージ
    ja_message = {
        "hello": "{#### こんにちは。私はipblockの設定ファイルを調べる チェッカーです。",
        "conf_test": "設定ファイル({})の検査",
        "syslog_test": "Log ({})への適用検査",
        "filter": "{}: IP 抽出失敗\n\t{}",
        "empty_line": "  {}: 空白行",
        "comment": "  {}: コメント:\n\t{}",
        "invalid_setting": "E {}: Configの書式がrelease,port,regexになってない:\n\t{}",
        "regex": "E {}: 不正な正規表現、()の不釣り合いなど:\n\t{}",
        "release": "E {}: 不正な持続時間表記:\n\t{}",
        "invalid_port": "E {}: 不正なポート番号表現:\n\t{}",
        "valid_line": "  {}: 正常:\n\t{}",
        "unmatched_regex": "{}: 正規表現でIPアドレスを抽出できません:\n\t",
        "defence": "#### 防御試験",
        "attack": "Filter #{}は{} 回防御",
        "finished": "終了です",
    }

   
# Mensaje en español
    es_message = {
        "hello": "#### Hola, soy el linter para el archivo de configuración de ipblock",
        "conf_test": "#### Probando archivo de configuración ({})",
        "syslog_test": "#### Probando archivo de registro ({})",
        "filter": "{}: Fallo en la extracción de IP:\n\t{}",
        "empty_line": "  {}: Línea vacía",
        "comment": "  {}: Comentario:\n\t{}",
        "invalid_setting": "E {}: Configuración inválida en el archivo de configuración:\t{}",
        "regex": "E {}: Expresión regular no válida o paréntesis desequilibrados:\n\t{}",
        "release": "E {}: Formato de tiempo de lanzamiento no válido:\n\t{}",
        "invalid_port": "E {}: Formato de número de puerto no válido:\n\t{}",
        "valid_line": "  {}: Línea válida:\n\t{}",
        "unmatched_regex": "{}: No se pudo obtener la dirección IP con la expresión regular:\n\t{}",
        "defence": "#### Prueba de defensa",
        "attack": "Filtro #{} defensa {} veces\n\t{}",
        "finished": "¡Terminado!",
    }


    ko_message = {
        "hello": "#### 안녕하세요. IP 블록 설정 파일 린터입니다.",
        "conf_test": "#### 설정 파일 ({}) 테스트 중",
        "syslog_test": "#### 로그 파일 ({}) 테스트 중",
        "filter": "{}: IP 추출 실패:\n\t{}",
        "empty_line": "  {}:\n\t빈 줄",
        "comment": "  {}: 주석:\n\t{}",
        "invalid_setting": "E {}: 설정 파일 형식 오류: release, port, regex가 아닌 설정:\n\t{}",
        "regex": "E {}: 유효하지 않은 정규식 또는 괄호 불균형:\n\t{}",
        "release": "E {}: 유효하지 않은 유지 시간 형식:\n\t{}",
        "invalid_port": "E {}: 유효하지 않은 포트 번호 형식:\n\t{}",
        "valid_line": "  {}: 유효한 줄:\n\t{}",
        "unmatched_regex": "{}: 정규식으로 IP 주소를 추출할 수 없습니다:\n\t{}",
        "defence": "#### 방어 시험",
        "attack": "{} 번 필터, {} 회 방어:",
        "finished": "완료되었습니다!",
    }


 # 中文的信息
    zh_message = {
        "hello": "你好,我是ipblock conf文件的linter",
        "conf_test": "检查配置文件({})",
        "syslog_test": "应用程序对日志文件的检查({})",
        "filter": "{}: IP 提取失败。:\n\t{}",
        "empty_line": "  {}:\n\t空行",
        "comment": "  {}: 注释:\n\t{}",
        "invalid_setting": "E {}: 配置文件行不符合格式release,port,regex:\n\t{}",
        "regex": "E {}: 无效的正则表达式或括号不平衡:\n\t{}",
        "release": "E {}: 不正确的发布版本表达:\n\t{}",
        "invalid_port": "E {}: 不正确的端口号表达:\n\t{}",
        "valid_line": "  {}: 有效行:\n\t{}",
        "unmatched_regex": "{}: 无法用正则表达式提取IP地址:\n\t{}",
        "defence": "#### 防御试验",
        "attack": "{}号过滤器防御{}次",
        "finished": "已完成",
    }

    ar_message = {
        "hello": "#### {مرحبا، أنا برنامج فحص لملفات تكوين ipblock",
        "conf_test": "#### اختبار ملف التكوين ({})",
        "syslog_test": "#### اختبار ملف السجل ({})",
        "filter": "{}: فشل استخراج عناوين IP:\n\t{}",
        "empty_line": "  {}:\n\tخط فارغ",
        "comment": "  {}: تعليق:\n\t{}",
        "invalid_setting": "E {}: إعداد غير صالح في ملف التكوين:\t{}",
        "regex": "E {}: تعبير غير صالح أو أقواس مفردة غير متوازنة:\n\t{}",
        "release": "E {}: تنسيق وقت الإصدار غير صالح:\n\t{}",
        "invalid_port": "E {}: تنسيق رقم المنفذ غير صالح:\n\t{}",
        "valid_line": "  {}: سطر صالح:\n\t{}",
        "unmatched_regex": "{}: تعذر الحصول على عنوان IP باستخدام التعبير العادي:\n\t{}",
        "defence": "#### اختبار الدفاع",
        "attack": "تصفية #{} الدفاع {} مرات\n\t{}",
        "finished": "انتهى !",
    }

    ru_message = {
        "hello": "#### {Привет, я линтер для файла конфигурации ipblock",
        "conf_test": "#### Тестирование файла конфигурации ({})",
        "syslog_test": "#### Тестирование файла логов ({})",
        "filter": "{}: Ошибка извлечения IP-адреса:\n\t{}",
        "empty_line": "  {}:\n\tПустая строка",
        "comment": "  {}: Комментарий:\n\t{}",
        "invalid_setting": "E {}: Неверная настройка в файле конфигурации:\t{}",
        "regex": "E {}: Неверное регулярное выражение или несбалансированные скобки:\n\t{}",
        "release": "E {}: Неверный формат времени выпуска:\n\t{}",
        "invalid_port": "E {}: Неверный формат номера порта:\n\t{}",
        "valid_line": "  {}: Допустимая строка:\n\t{}",
        "unmatched_regex": "{}: Не удалось получить IP-адрес с помощью регулярного выражения:\n\t{}",
        "defence": "#### Тест защиты",
        "attack": "Фильтр №{} защита {} раз\n\t{}",
        "finished": "Закончено!",
    }


    world_mes = {
        "ja": ja_message,
        "zh": zh_message,
        "en": en_message,
        "es": es_message,
        "ko": ko_message,
        "ar": ar_message,
        "ru": ru_message,
    }


    config_file = "/etc/ipblock/ipblock.conf"
    sys_log = "/var/log/syslog"
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-l",
        "--language",
        type=str,
        default="en",
        help="specify language (default: en), ja, zh, es, ko, ar, ru",
    )
    parser.add_argument(
        "-c",
        "--config",
        type=str,
        default=f"{config_file}",
        help=f"specif config file name (default: {config_file})",
    )
    parser.add_argument(
        "-g",
        "--log",
        type=str,
        default=f"{sys_log}",
        help=f"specify log file name (default: {sys_log})",
    )
    args = parser.parse_args()

    # コマンドライン引数から指定された値を使用
    config_file = args.config
    sys_log = args.log
    lang = args.language
    try:
        message = world_mes[lang]
    except KeyError:
        message = world_mes["en"]
        print(f"Sorry, I do't know {lang}, so set en")
    lintipb(config_file, sys_log, message)
    sys.exit(0)
# End of lineipb.py

先ほどのconfig fileを試しに検査します。

こんにちは。私はipblockの設定ファイルを調べる チェッカーです。
設定ファイル(conf)の検査
  1: コメント: # release_time[sec].port,regex
E 3: 不正な持続時間表記:error,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
E 4: 不正なポート番号表現: 600,tcp/error,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
E 5: 不正な持続時間表記:E,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:([0-9.]+)\)
E 7: 不正なポート番号表現: 600,tcp/error,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+\)
E 8: 不正な正規表現、()の不釣り合いなど: 10800,tcp/25,postfix.*: statistics: max connection rate 1/60s for \(smtp:[0-9.]+)\)
Log (/var/log/syslog)への適用検査
6: IP 抽出失敗:re.compile('postfix.*: statistics: max connection rate 1/60s for \\(smtp:[0-9.]+\\)')
9: IP 抽出失敗:re.compile('postfix.*: statistics: max connection rate 1/60s for \\(smtp:[0-9.]+\\)')
終了です

blocked.py

現在ipblock.pyが阻止しているパケットと有効期限を示す管理ツールです。
/etc/local/bin/blocked.py
においてください。

chmod +x /etc/local/bin/blocked.py
/etc/local/bin/blocked.py
#!/usr/bin/env python3

import sqlite3
import datetime

BLOCKED_IPS_DB = "/var/ipblock/blocked_ips.db"

conn = sqlite3.connect(BLOCKED_IPS_DB)
cur = conn.cursor()
print('ip address(proto/port)\texpire to')
tmfmt = '%Y-%m-%d %H:%M:%S'
cur.execute("SELECT ip_address, proto, port_n, unblock_time FROM blocked_ips")
blocked_ips_list = cur.fetchall()
for ip_address, proto, port_n, unblock_time in blocked_ips_list:
    unblock = datetime.datetime.fromtimestamp(unblock_time).strftime(tmfmt)
    print(f'{ip_address}({proto}/{port_n})\t{unblock}')
# End of blocked.py

fakelog.py

/usr/log/syslogに疑似エラーパケットを書き込む管理ツールです。
/etc/local/bin/fakelog.pyに置きます。

/etc/local/bin/fakelog.py
#! /usr/bin/python3

from logging import handlers, basicConfig, getLogger, DEBUG

basicConfig(level=DEBUG, handlers=[handlers.SysLogHandler(address='/dev/log')])

logger = getLogger('fake')
msg = " poatfix/smtpd[1234]: statistics: max connection rate 1/60s for (smtp:192.168.1.1) at Mar 14 12:34:56"
logger.info(msg)
msg = " postfix/anvil[1234]: lost connection after AUTH from unknown[192.168.1.1]"
logger.info(msg)
msg = " sshd[1234]: Failed password for mac from 192.168.1.1"
logger.info(msg)
# Enf of facklog.py

systemdの設定

次の設定ファイルを/etc/systemd/system/ipblock.serviceに置きます。

/etc/systemd/system/ipblock.service
[Unit]
Description=IP blocking service

[Service]
Type=simple
ExecStart= /etc/local/bin/ipblock.py
ExecReload= /usr/bin/systemctl restart ipblock.service
Restart=always

[Install]
WantedBy=multi-user.target

systemctlを有効にします。

systemctl daemon-reload
systemctl enable ipblock
systemctl start ipblock

運用

ipblockはrebootすれば自動起動します。、
ipblock.py運用中に実行すると次のように表示されます。(一例です)

# /etc/local/bin/blocked.py 
ip address(proto/port)  expire to
192.241.220.24(tcp/25)  2023-03-19 10:35:53
192.168.1.1(tcp/25)     2023-03-19 10:43:12
103.151.125.9(tcp/25)   2023-03-19 10:50:13
193.56.29.118(tcp/25)   2023-03-19 10:56:03
114.234.245.187(tcp/25) 2023-03-19 11:21:15
103.170.254.172(tcp/25) 2023-03-19 12:14:19

あまりにたくさんの項目にならないようipblock.confのリリース時間を調整してください。

/etc/local/bin/fakelog.py

で疑似ログを書き込むと、

# /etc/local/bin/blocked.py 
ip address(proto/port)  expire to
192.241.220.24(tcp/25)  2023-03-19 10:35:53
192.168.1.1(tcp/25)     2023-03-19 10:43:12

が記録され動作確認ができます。

まとめ

fail2banは攻撃回数の指定でブロックを発動する機能や、著名なシステムでの導入実績があるという利点があります。
ipblock.pyはシステムリソースの消費が少なく、高速性能でパケットをブロックすることができるという利点があります。
fail2banはコンフィグレーションファイルによる柔軟性がありますが、構成が複雑で設定に時間がかかることがあります。
ipblock.pyは単純な設定で簡単に導入できる反面、利用可能な機能が少ないため、カスタマイズが必要な場合は自分で実装する必要があります。
どちらも有用なツールであり、使用する環境や目的に応じて適切なものを選択する必要があります。

来歴

v0.1: 初版対応 filterをjsonで管理
v1.0: filterをsqlite3で管理、multi port対応
v2.0: chkipb.pyをlintipb.pyに置き換え機能強化と3か国語対応

2
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
6