前回、下記投稿で自宅サーバーのジャーナルログから収集したエラーログを元にIPアドレスの出現回数のランキングファイル(CSV)を出力するpythonスクリプトを紹介しました。
Qiita@pipito.yukio「ジャーナルログからSSHで不正アクセスするホストのIPアドレスを収集する」
不正IPアドレスの出現回数のランキングファイルの以下の内容でした。
"log_date","ip_addr","appear_count"
"2024-07-09","1.92.5.77",716
"2024-07-09","103.171.134.90",467
"2024-07-09","220.250.41.11",57
"2024-07-09","34.175.118.185",42
"2024-07-09","175.178.237.54",41
"2024-07-09","141.11.229.140",41
"2024-07-09","194.67.82.57",41
"2024-07-09","81.208.169.112",34
"2024-07-09","141.145.203.77",34
"2024-07-09","114.205.92.219",31
"2024-07-09","203.194.106.73",30
"2024-07-09","120.88.46.226",30
"2024-07-09","103.100.209.77",30
"2024-07-09","161.35.18.131",30
"2024-07-09","124.156.199.148",30
"2024-07-09","43.134.63.221",30
"2024-07-09","83.97.73.43",30
"2024-07-09","91.237.163.36",30
"2024-07-09","59.36.254.224",30
今回はランキングファイルをデータベースのテーブルに登録するpythonスクリプトを紹介いたします。IPアドレスをテーブル管理することで以下のような有用な情報を出力することが可能になります。
※1日当たり出現回数30回を超え、かつ3日以上連続してアクセスしたIPアドレスを出力
$ ./dkr_exec_show_unauth_accessed_ip.sh 2024-06-10 2024-06-16
[検索期間] 2024-06-10 〜 2024-06-16
2024-06-12 | 92.118.39.40 | 73
2024-06-13 | 92.118.39.40 | 252
2024-06-14 | 92.118.39.40 | 49
2024-06-12 | 103.77.241.34 | 392
2024-06-13 | 103.77.241.34 | 3269
2024-06-14 | 103.77.241.34 | 3382
2024-06-15 | 103.77.241.34 | 3345
2024-06-16 | 103.77.241.34 | 959
1. 実行環境
- OS: Ubuntu Desktop 22.04
- データベース
Dockerコンテナ内で稼働する PostgreSQL-16 - python 3.10.12
スクリプト専用の仮想環境を作成し下記ライブラリをインストール
pip install psycopg2-binary
- データベース
2. PostgreSQL データベース
この記事用に作成したものを示します。
2-1. Dockerコンテナ用リソース
リソースファイル一覧
.
├── .env
├── Dockerfile
├── docker-compose.yml
└── initdb
├── 10_createdb.sql
└── 11_createtable.sql
テスト環境、運用環境との切り替えを容易にする環境ファイル
# Qiita投稿用データベース
#CONTAINER_NAME=postgres-qiita
HOST_PG_VOLUME=/home/qiita/database
DB_NAME=qiita_exampledb
PG_USER=postgres
PG_PASSWD=xxxxxxxxx
# display locale
LANG=C
# locale
LANGUAGE=ja_JP:ja
LC_ALL=ja_JP.UTF-8
# Time zone
TZ=Asia/Tokyo
# volume mount
HOST_HOME=/home/qiita
CONTAINER_HOME=/home/qiita
Dockerfile
FROM postgres:16-alpine
COPY initdb/*.sql /docker-entrypoint-initdb.d/
services:
postgres:
build: .
container_name: postgres-qiita
env_file: ./.env
ports:
- "5432:5432"
volumes:
- "${HOST_PG_VOLUME}/postgresql-qiita/data:/var/lib/postgresql/data"
- "${HOST_HOME}/data/sql:${CONTAINER_HOME}/data/sql"
environment:
- POSTGRES_USER=${PG_USER}
- POSTGRES_PASSWORD=${PG_PASSWD}
2-2. データベース
docker コンテナビルド時に作成するデータベースとテーブル
(1) データベース定義
CREATE ROLE developer WITH LOGIN PASSWORD 'yourpassword';
ALTER ROLE developer WITH SUPERUSER;
CREATE DATABASE qiita_exampledb WITH OWNER=developer ENCODING='UTF-8' LC_COLLATE='ja_JP.UTF-8' LC_CTYPE='ja_JP.UTF-8' TEMPLATE=template0;
GRANT ALL PRIVILEGES ON DATABASE qiita_exampledb TO developer;
(2) テーブル定義
\connect qiita_exampledb
-- Qiita投稿用テスト用スキーマ
CREATE SCHEMA mainte2;
-- 不正アクセスIPアドレス管理マスタ
CREATE TABLE mainte2.unauth_ip_addr(
id INTEGER NOT NULL,
ip_addr VARCHAR(15) NOT NULL,
reg_date DATE NOT NULL,
country_code CHAR(2)
);
CREATE SEQUENCE mainte2.ip_addr_id OWNED BY mainte2.unauth_ip_addr.id;
ALTER TABLE mainte2.unauth_ip_addr ALTER id SET DEFAULT nextval('mainte2.ip_addr_id');
ALTER TABLE mainte2.unauth_ip_addr ADD CONSTRAINT pk_unauth_ip_addr PRIMARY KEY (id);
-- IPアドレスは重複なし
CREATE UNIQUE INDEX idx_ip_addr ON mainte2.unauth_ip_addr(ip_addr);
-- 不正アクセスカウンターテープル
CREATE TABLE mainte2.ssh_auth_error(
log_date date NOT NULL,
ip_id INTEGER,
appear_count INTEGER NOT NULL
);
ALTER TABLE mainte2.ssh_auth_error ADD CONSTRAINT pk_ssh_auth_error
PRIMARY KEY (log_date, ip_id);
ALTER TABLE mainte2.ssh_auth_error ADD CONSTRAINT fk_ssh_auth_error
FOREIGN KEY (ip_id) REFERENCES mainte2.unauth_ip_addr (id);
ALTER SCHEMA mainte2 OWNER TO developer;
ALTER TABLE mainte2.unauth_ip_addr OWNER TO developer;
ALTER TABLE mainte2.ssh_auth_error OWNER TO developer;
3. pythonスクリプト
データベースへの登録処理のイメージは下記の通りになります。
スクリプトのインポート
import argparse
import csv
import logging
import os
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Optional, Tuple
import psycopg2
from psycopg2.extensions import connection, cursor
from psycopg2.extras import execute_values
from db import pgdatabase
3-1. テーブル操作関数
登録済チェック関数と一括登録関数を定義
3-1-1. 不正アクセスIPマスタテーブル
(1) IPアドレスの一括存在チェック関数
def bulk_exists_ip_addr(conn: connection,
ip_list: List[str],
logger: Optional[logging.Logger] = None) -> Dict[str, int]:
# IN ( in_clause )管理
in_clause: Tuple[str, ...] = tuple(ip_list)
if logger is not None:
logger.debug(f"in_clause: {in_clause}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT id,ip_addr FROM mainte2.unauth_ip_addr WHERE ip_addr IN %s""",
(in_clause,)
)
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# IN句で一致したIPアドレスの idとIPアドレスのタプルをすべて取得
rows: List[tuple[Any, ...]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows: {rows}")
# 戻り値: IPアドレスをキーとするIPのIDの辞書
result_dict: Dict[str, int] = {ip_addr: ip_id for (ip_id, ip_addr) in rows}
return result_dict
except (Exception, psycopg2.DatabaseError) as err:
raise err
(2) IPアドレスの一括登録関数
def bulk_insert_unauth_ip_addr(
conn: connection,
qry_params: tuple[Dict[str, Any], ...],
logger: Optional[logging.Logger] = None) -> Dict[str, int]:
if logger is not None:
logger.debug(f"qry_params: \n{qry_params}")
try:
cur: cursor
with conn.cursor() as cur:
rows: List[Tuple[Any, ...]] = execute_values(
cur,
"""
INSERT INTO mainte2.unauth_ip_addr(ip_addr, reg_date)
VALUES %s RETURNING id,ip_addr""",
qry_params,
template="(%(ip_addr)s, %(reg_date)s)",
fetch=True
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"rows: {rows}")
# 戻り値: IPアドレスをキーとするIPのIDの辞書
result_dict: Dict[str, int] = {ip_addr: ip_id for (ip_id, ip_addr) in rows}
return result_dict
except (Exception, psycopg2.DatabaseError) as err:
raise err
3-1-2. 不正アクセスカウンターテーブル
(1) ログ採取日のIPアドレス(ID)一括存在チェック
def bulk_exists_ssh_auth_error(
conn: connection,
log_date: str,
ipid_list: List[int],
logger: Optional[logging.Logger] = None) -> List[int]:
# IN ( in_clause )
in_clause: Tuple[int, ...] = tuple(ipid_list, )
if logger is not None:
logger.debug(f"in_clause: \n{in_clause}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT
ip_id
FROM
mainte2.ssh_auth_error
WHERE
log_date = %(log_date)s AND ip_id IN %(in_clause)s""",
{'log_date': log_date, 'in_clause': in_clause}
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"cur.rowcount: {cur.rowcount}")
# 戻り値を取得する
# def fetchall(self) -> list[tuple[Any, ...]]
rows: List[Tuple[Any, ...]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows: {rows}")
# 結果が1カラムだけなのでタプルの先頭[0]をリストに格納
result: List[int] = [row[0] for row in rows]
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
(2) 一括登録関数
def bulk_insert_ssh_auth_error(
conn: connection,
qry_params: tuple[Dict[str, Any], ...],
logger: Optional[logging.Logger] = None) -> None:
if logger is not None:
logger.debug(f"qry_params: \n{qry_params}")
try:
cur: cursor
with conn.cursor() as cur:
# 登録の戻り値不要
execute_values(
cur,
"""
INSERT INTO mainte2.ssh_auth_error(log_date, ip_id, appear_count)
VALUES %s""",
qry_params,
template="(%(log_date)s, %(ip_id)s, %(appear_count)s)",
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# 登録済み処理件数 ※ログレベルをINFO
logger.info(f"cur.rowcount: {cur.rowcount}")
except (Exception, psycopg2.DatabaseError) as err:
raise err
上記4つの関数の実装については下記投稿で詳しく説明しておりますのでご覧ください。
Qiita@pipito-yukio「psycopg2 バッチ処理に適したクエリーを作成する」
3-2. テーブル登録用のデータリスト作成関数
- データクラス
- 不正アクセスIPマスタ登録用:
RegUnauthIpAddr
※id は登録時に戻り値として生成されるのでIDフィールド無し - 不正アクセスIPマスタ読み込み用:
UnauthIpAddr
- 不正アクセスカウンター登録用:
SshAuthError
- 不正アクセスIPマスタ登録用:
@dataclass(frozen=True)
class RegUnauthIpAddr:
ip_addr: str
reg_date: str
@dataclass(frozen=True)
class UnauthIpAddr:
id: int
ip_addr: str
reg_date: str
@dataclass(frozen=True)
class SshAuthError:
log_date: str
ip_id: int
appear_count: int
3-2-1. 不正アクセスマスタテーブル登録用データリスト作成
- 引数
- 登録済IPアドレスとIDを保持するDict: exists_ip_dict
- CSVファイルの行データ: csv_lines
- 処理内容
-
exists_ip_dictに登録済要素が存在するどうか
- 存在する場合
登録済要素のIPアドレスを除外して登録用データを生成 - 存在しない場合
全ての行データの登録用データ生成
- 存在する場合
-
exists_ip_dictに登録済要素が存在するどうか
def get_register_ip_list(
exists_ip_dict: Dict[str, int],
csv_lines: List[str],
logger: Optional[logging.Logger] = None) -> List[RegUnauthIpAddr]:
result: List[RegUnauthIpAddr] = []
if len(exists_ip_dict) > 0:
registered_cnt: int = 0
for line in csv_lines:
fields: List[str] = line.split(",")
registered_id: Optional[int] = exists_ip_dict.get(fields[1])
if registered_id is None:
result.append(RegUnauthIpAddr(
ip_addr=fields[1], reg_date=fields[0])
)
else:
registered_cnt += 1
if registered_cnt > 0:
if logger is not None:
logger.info(f"Registered_count: {registered_cnt}")
else:
# 登録済みレコードがない場合はすべて登録
for line in csv_lines:
fields = line.split(",")
result.append(RegUnauthIpAddr(
ip_addr=fields[1], reg_date=fields[0])
)
return result
3-2-2. 不正アクセスカウンターテーブル登録用データリスト作成
[前提条件]
この関数の呼び出し前にCSVファイルの行データが全て不正アクセスマスタテーブル登録済み
- 引数
- 登録済IPアドレスとIDを保持するDict: exists_ip_dict
- CSVファイルの行データ: csv_lines
- 処理内容
exists_ip_dict からIPアドレスに対応するIDを取得して登録用データを生成する
※存在しないケースは想定しない ※プログラムのBUG
def get_register_ssh_auth_error_list(
exists_ip_dict: Dict[str, int],
csv_lines: List[str],
logger: Optional[logging.Logger] = None) -> List[SshAuthError]:
result: List[SshAuthError] = []
for line in csv_lines:
fields: List[str] = line.split(",")
ip_id: Optional[int] = exists_ip_dict.get(fields[1])
if ip_id is not None:
# 当該日のIPアドレスは不正アクセスIPアドレステーブルに登録済み
result.append(
SshAuthError(
log_date=fields[0], ip_id=ip_id, appear_count=int(fields[2])
)
)
else:
# このケースはない想定
if logger is not None:
logger.warning(f"{fields[1]} is not regstered!")
return result
3-3. テーブル登録メイン関数
3-3-1. 不正アクセステーブル登録メイン関数
- 引数
- データベース接続オブジェクト: conn
- 登録済IPアドレスとIDを保持するDict: exists_ip_dict
- 登録用データリスト: reg_ip_list
- 処理内容
- 登録用データリストをテーブル登録用のタプルリストに変換する
-
一括登録関数にタプルリストを設定して該当するテーブルにに一括登録する
戻り値として登録されたIPアドレスとIDのDictを返却する - 戻り値を引数の登録済IPアドレスとIDを保持するDictに全部追加する
def insert_unauth_ip_main(
conn: connection,
exists_ip_dict: Dict[str, int],
reg_ip_list: List[RegUnauthIpAddr],
logger: Optional[logging.Logger] = None, enable_debug=False) -> None:
# namedtupleを辞書のタプルに変換
params: Tuple[Dict[str, Any], ...] = tuple([asdict(rec) for rec in reg_ip_list])
registered_ip_ids: Dict[str, int] = bulk_insert_unauth_ip_addr(
conn, params, logger=logger
)
if logger is not None:
logger.info(f"registered_ip_ids.size: {len(registered_ip_ids)}")
if logger is not None and enable_debug:
logger.debug(f"registered_ip_ids: {registered_ip_ids}")
# 新たに登録されたIPアドレスとIDを追加する
exists_ip_dict.update(registered_ip_ids)
if logger is not None and enable_debug:
logger.debug(f"update.exists_ip_dict:\n{exists_ip_dict}")
3-3-2. 不正アクセスカウンターテーブル登録メイン関数
未登録のデータのみを一括登録する
- 引数
- データベース接続オブジェクト: conn
- 登録用データリスト: ssh_auth_error_list
- 処理内容
- 登録用データリストの先頭データからログ日付を取得
- 登録用データリストからIPアドレスのIDリストを生成
-
不正アクセスカウンターテーブルのデータ存在チェック関数を呼び出す
戻り値として登録済のIPアドレスのIDリストを返却する -
戻り値の件数と登録予定のデータリストの件数を比較する
-
登録予定のデータリストの件数が大きい場合
未登録データのタプルリストを生成し一括登録関数を呼びだして登録する - 登録予定のデータリストの件数が等しい場合
登録データ無しのメッセージ出力
-
登録予定のデータリストの件数が大きい場合
def insert_ssh_auth_error_main(
conn: connection,
ssh_auth_error_list: List[SshAuthError],
logger: Optional[logging.Logger] = None, enable_debug=False) -> None:
# 当該日にIP_IDが登録済みかどうかチェックする ※誤って同一CSVを実行した場合を想定
# 先頭レコードから当該日取得
log_date: str = ssh_auth_error_list[0].log_date
# チェック用の ip_id リスト生成
ipid_list: List[int] = [int(reg.ip_id) for reg in ssh_auth_error_list]
exists_ipid_list: List[int] = bulk_exists_ssh_auth_error(
conn, log_date, ipid_list, logger=logger if enable_debug else None
)
# 未登録の ip_id があれば登録レコード用のパラメータを生成
if len(ipid_list) > len(exists_ipid_list):
param_list: List[Any] = []
for rec in ssh_auth_error_list:
if rec.ip_id not in exists_ipid_list:
# 当該日に未登録の ip_id のみのレコードの辞書オブジェクトを追加
param_list.append(asdict(rec))
else:
if logger is not None and enable_debug:
logger.debug(f"Registered: {rec}")
if len(param_list) > 0:
if logger is not None and enable_debug:
logger.debug(f"param_list: \n{param_list}")
bulk_insert_ssh_auth_error(
conn, tuple(param_list),
logger=logger if enable_debug else None
)
else:
if logger is not None:
logger.info("ssh_auth_error テーブルに登録可能データなし.")
3-4. データペース接続モジュール
接続設定ファイル
{
"host": "{hostname}",
"port": "5432",
"database": "qiita_exampledb",
"user": "developer",
"password": "yourpassword"
}
このモジュールの実装に関しては下記 Qiita投稿に説明があるのでご覧ください。
Qiita@pipito-yukio「psycopg2 バッチ処理に適したクエリーを作成する」
import json
import logging
import socket
from typing import Optional
import psycopg2
from psycopg2.extensions import connection
"""
PostgreSQL Database接続生成クラス
"""
class PgDatabase(object):
def __init__(self, configfile,
hostname: Optional[str] = None,
logger: Optional[logging.Logger] = None):
self.logger = logger
with open(configfile, 'r') as fp:
db_conf = json.load(fp)
if hostname is None:
hostname = socket.gethostname()
db_conf["host"] = db_conf["host"].format(hostname=hostname)
# default connection is itarable curosr
self.conn = psycopg2.connect(**db_conf)
if self.logger is not None:
self.logger.debug(self.conn)
def get_connection(self) -> connection:
return self.conn
def rollback(self) -> None:
if self.conn is not None:
self.conn.rollback()
def commit(self) -> None:
if self.conn is not None:
self.conn.commit()
def close(self) -> None:
if self.conn is not None:
if self.logger is not None:
self.logger.debug(f"Close {self.conn}")
self.conn.close()
3-5. CSVファイル読み込み関数
ヘッダーをスキップして文字列の行データを取得する
def read_csv(file_name: str,
skip_header=True, header_cnt=1) -> List[str]:
with open(file_name, 'r') as fp:
reader = csv.reader(fp, dialect='unix')
if skip_header:
for skip in range(header_cnt):
next(reader)
# リストをカンマ区切りで連結する
csv_lines = [",".join(rec) for rec in reader]
return csv_lines
3-6. バッチメイン関数
- 入力パラメータ
- CSVファイルパス:
--csv-file
- CSVファイルパス:
- 処理内容
- CSVファイルからヘッダー以外を読み込む
- PostgreSQLデータペース接続オブジェクトを生成する
- CSVから取得したIPアドレス(2列目)が登録済みかチェック
戻り値の登録済IPアドレスとIDを保持するDictオブジェクトを保持する - 登録済みIPアドレスを除外した追加登録用データリストを作成する
- 不正アクセスIPマスタテーブルに新規登録する
- 不正アクセスカウンターテーブル登録用データリストを作成する
- 不正アクセスカウンターテーブルに新規登録する
- 接続をコミットする
- 接続のクローズ
def batch_main():
logging.basicConfig(format='%(levelname)s %(message)s')
app_logger = logging.getLogger(__name__)
parser: argparse.ArgumentParser = argparse.ArgumentParser()
# レコード登録用CSVファイル: ~/Documents/webriverside/csv/ssh_auth_error_[日付].csv
parser.add_argument("--csv-file", type=str, required=True,
help="Insert CSV file path.")
parser.add_argument("--enable-debug", action="store_true",
help="Enable logger debug out.")
args: argparse.Namespace = parser.parse_args()
enable_debug: bool = args.enable_debug
if enable_debug:
app_logger.setLevel(level=logging.DEBUG)
else:
app_logger.setLevel(level=logging.INFO)
app_logger.info(args)
# CSVファイルを開く
csv_file: str = args.csv_file
csv_path = os.path.join(os.path.expanduser(csv_file))
if not os.path.exists(csv_path):
app_logger.error(f"FileNotFound: {csv_path}")
exit(1)
# CSVレコード: "log_date,ip_addr,appear_count"
csv_lines: List[str] = read_csv(csv_path)
line_cnt: int = len(csv_lines)
# CSVファイル行数
app_logger.info(f"csv: {line_cnt} lines.")
if line_cnt == 0:
app_logger.warning("Empty csv record.")
exit(0)
# database
db: Optional[pgdatabase.PgDatabase] = None
try:
db = pgdatabase.PgDatabase(DB_CONF_FILE)
conn: connection = db.get_connection()
# CSVから取得したIPアドレス(2列目)が登録済みかチェック
ip_list: List[str] = [line.split(",")[1] for line in csv_lines]
exists_ip_dict: Dict[str, int] = bulk_exists_ip_addr(
conn, ip_list, logger=app_logger)
app_logger.info(f"exists_ip_dict.size: {len(exists_ip_dict)}")
if enable_debug:
app_logger.debug(f"exists_ip_dict: {exists_ip_dict}")
# 登録済みIPアドレスを除外した追加登録用のレコードリストを作成
reg_ip_datas: List[RegUnauthIpAddr] = get_register_ip_list(
exists_ip_dict, csv_lines, logger=app_logger
)
# unauth_ip_addrテーブルとssh_auth_errorテーブル登録トランザクション
reg_ip_datas_cnt: int = len(reg_ip_datas)
app_logger.info(f"reg_ip_datas.size: {reg_ip_datas_cnt}")
# 不正アクセスIPアドレステーブルに新規登録
if reg_ip_datas_cnt > 0:
insert_unauth_ip_main(
conn, exists_ip_dict, reg_ip_datas,
logger=app_logger, enable_debug=enable_debug
)
# 不正アクセスカウンターテーブル登録用リスト
ssh_auth_error_list: List[SshAuthError] = get_register_ssh_auth_error_list(
exists_ip_dict, csv_lines, logger=app_logger
)
app_logger.info(
f"Register ssh_auth_error_list.size: {len(ssh_auth_error_list)}"
)
# 不正アクセスカウンターテーブルに新規
if len(ssh_auth_error_list) > 0:
insert_ssh_auth_error_main(
conn, ssh_auth_error_list,
logger=app_logger, enable_debug=enable_debug
)
# 両方のテーブル登録で正常終了したらコミット
db.commit()
except Exception as exp:
if db is not None:
db.rollback()
app_logger.error(exp)
exit(1)
finally:
if db is not None:
db.close()
if __name__ == '__main__':
batch_main()
3-7. スクリプト実行
3-7-1. スクリプトソース
コード全体を以下に示します。
import argparse
import csv
import logging
import os
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Optional, Tuple
import psycopg2
from psycopg2.extensions import connection, cursor
from psycopg2.extras import execute_values
from db import pgdatabase
"""
Qiita投稿用スクリプト
不正アクセス集計CSVファイルを読み込み2つのテーブルに一括登録する
[スキーマ] mainte2
[テーブル]
(1) 不正アクセスIPアドレステーブル
unauth_ip_addr
(2) 不正アクセスエラーカウントテーブル
ssh_auth_error
"""
# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")
@dataclass(frozen=True)
class RegUnauthIpAddr:
ip_addr: str
reg_date: str
@dataclass(frozen=True)
class UnauthIpAddr:
id: int
ip_addr: str
reg_date: str
@dataclass(frozen=True)
class SshAuthError:
log_date: str
ip_id: int
appear_count: int
def read_csv(file_name: str,
skip_header=True, header_cnt=1) -> List[str]:
with open(file_name, 'r') as fp:
reader = csv.reader(fp, dialect='unix')
if skip_header:
for skip in range(header_cnt):
next(reader)
# リストをカンマ区切りで連結する
csv_lines = [",".join(rec) for rec in reader]
return csv_lines
def bulk_exists_ip_addr(conn: connection,
ip_list: List[str],
logger: Optional[logging.Logger] = None) -> Dict[str, int]:
# IN ( in_clause )管理
in_clause: Tuple[str, ...] = tuple(ip_list)
if logger is not None:
logger.debug(f"in_clause: {in_clause}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT id,ip_addr FROM mainte2.unauth_ip_addr WHERE ip_addr IN %s""",
(in_clause,)
)
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# IN句で一致したIPアドレスの idとIPアドレスのタプルをすべて取得
rows: List[tuple[Any, ...]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows: {rows}")
# 戻り値: IPアドレスをキーとするIPのIDの辞書
result_dict: Dict[str, int] = {ip_addr: ip_id for (ip_id, ip_addr) in rows}
return result_dict
except (Exception, psycopg2.DatabaseError) as err:
raise err
def bulk_insert_unauth_ip_addr(
conn: connection,
qry_params: tuple[Dict[str, Any], ...],
logger: Optional[logging.Logger] = None) -> Dict[str, int]:
if logger is not None:
logger.debug(f"qry_params: \n{qry_params}")
try:
cur: cursor
with conn.cursor() as cur:
rows: List[Tuple[Any, ...]] = execute_values(
cur,
"""
INSERT INTO mainte2.unauth_ip_addr(ip_addr, reg_date)
VALUES %s RETURNING id,ip_addr""",
qry_params,
template="(%(ip_addr)s, %(reg_date)s)",
fetch=True
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"rows: {rows}")
# 戻り値: IPアドレスをキーとするIPのIDの辞書
result_dict: Dict[str, int] = {ip_addr: ip_id for (ip_id, ip_addr) in rows}
return result_dict
except (Exception, psycopg2.DatabaseError) as err:
raise err
# ログ採取日のIP_IDリストが登録済みかチェックする
def bulk_exists_ssh_auth_error(
conn: connection,
log_date: str,
ipid_list: List[int],
logger: Optional[logging.Logger] = None) -> List[int]:
# IN ( in_clause )
in_clause: Tuple[int, ...] = tuple(ipid_list, )
if logger is not None:
logger.debug(f"in_clause: \n{in_clause}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT
ip_id
FROM
mainte2.ssh_auth_error
WHERE
log_date = %(log_date)s AND ip_id IN %(in_clause)s""",
{'log_date': log_date, 'in_clause': in_clause}
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"cur.rowcount: {cur.rowcount}")
# 戻り値を取得する
# def fetchall(self) -> list[tuple[Any, ...]]
rows: List[Tuple[Any, ...]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows: {rows}")
# 結果が1カラムだけなのでタプルの先頭[0]をリストに格納
result: List[int] = [row[0] for row in rows]
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
def bulk_insert_ssh_auth_error(
conn: connection,
qry_params: tuple[Dict[str, Any], ...],
logger: Optional[logging.Logger] = None) -> None:
if logger is not None:
logger.debug(f"qry_params: \n{qry_params}")
try:
cur: cursor
with conn.cursor() as cur:
# 登録の戻り値不要
execute_values(
cur,
"""
INSERT INTO mainte2.ssh_auth_error(log_date, ip_id, appear_count)
VALUES %s""",
qry_params,
template="(%(log_date)s, %(ip_id)s, %(appear_count)s)",
)
# 実行されたSQLを出力
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# 登録済み処理件数 ※ログレベルをINFO
logger.info(f"cur.rowcount: {cur.rowcount}")
except (Exception, psycopg2.DatabaseError) as err:
raise err
def get_register_ip_list(
exists_ip_dict: Dict[str, int],
csv_lines: List[str],
logger: Optional[logging.Logger] = None) -> List[RegUnauthIpAddr]:
result: List[RegUnauthIpAddr] = []
if len(exists_ip_dict) > 0:
registered_cnt: int = 0
for line in csv_lines:
fields: List[str] = line.split(",")
registered_id: Optional[int] = exists_ip_dict.get(fields[1])
if registered_id is None:
result.append(RegUnauthIpAddr(
ip_addr=fields[1], reg_date=fields[0])
)
else:
registered_cnt += 1
if registered_cnt > 0:
if logger is not None:
logger.info(f"Registered_count: {registered_cnt}")
else:
# 登録済みレコードがない場合はすべて登録
for line in csv_lines:
fields = line.split(",")
result.append(RegUnauthIpAddr(
ip_addr=fields[1], reg_date=fields[0])
)
return result
def get_register_ssh_auth_error_list(
exists_ip_dict: Dict[str, int],
csv_lines: List[str],
logger: Optional[logging.Logger] = None) -> List[SshAuthError]:
result: List[SshAuthError] = []
for line in csv_lines:
fields: List[str] = line.split(",")
ip_id: Optional[int] = exists_ip_dict.get(fields[1])
if ip_id is not None:
# 当該日のIPアドレスは不正アクセスIPアドレステーブルに登録済み
result.append(
SshAuthError(
log_date=fields[0], ip_id=ip_id, appear_count=int(fields[2])
)
)
else:
# このケースはない想定
if logger is not None:
logger.warning(f"{fields[1]} is not regstered!")
return result
def insert_unauth_ip_main(
conn: connection,
exists_ip_dict: Dict[str, int],
reg_ip_list: List[RegUnauthIpAddr],
logger: Optional[logging.Logger] = None, enable_debug=False) -> None:
# namedtupleを辞書のタプルに変換
params: Tuple[Dict[str, Any], ...] = tuple([asdict(rec) for rec in reg_ip_list])
registered_ip_ids: Dict[str, int] = bulk_insert_unauth_ip_addr(
conn, params, logger=logger
)
if logger is not None:
logger.info(f"registered_ip_ids.size: {len(registered_ip_ids)}")
if logger is not None and enable_debug:
logger.debug(f"registered_ip_ids: {registered_ip_ids}")
# 新たに登録されたIPアドレスとIDを追加する
exists_ip_dict.update(registered_ip_ids)
if logger is not None and enable_debug:
logger.debug(f"update.exists_ip_dict:\n{exists_ip_dict}")
def insert_ssh_auth_error_main(
conn: connection,
ssh_auth_error_list: List[SshAuthError],
logger: Optional[logging.Logger] = None, enable_debug=False) -> None:
# 当該日にIP_IDが登録済みかどうかチェックする ※誤って同一CSVを実行した場合を想定
# 先頭レコードから当該日取得
log_date: str = ssh_auth_error_list[0].log_date
# チェック用の ip_id リスト生成
ipid_list: List[int] = [int(reg.ip_id) for reg in ssh_auth_error_list]
exists_ipid_list: List[int] = bulk_exists_ssh_auth_error(
conn, log_date, ipid_list, logger=logger if enable_debug else None
)
# 未登録の ip_id があれば登録レコード用のパラメータを生成
if len(ipid_list) > len(exists_ipid_list):
param_list: List[Any] = []
for rec in ssh_auth_error_list:
if rec.ip_id not in exists_ipid_list:
# 当該日に未登録の ip_id のみのレコードの辞書オブジェクトを追加
param_list.append(asdict(rec))
else:
if logger is not None and enable_debug:
logger.debug(f"Registered: {rec}")
if len(param_list) > 0:
if logger is not None and enable_debug:
logger.debug(f"param_list: \n{param_list}")
bulk_insert_ssh_auth_error(
conn, tuple(param_list),
logger=logger if enable_debug else None
)
else:
if logger is not None:
logger.info("ssh_auth_error テーブルに登録可能データなし.")
def batch_main():
logging.basicConfig(format='%(levelname)s %(message)s')
app_logger = logging.getLogger(__name__)
parser: argparse.ArgumentParser = argparse.ArgumentParser()
# レコード登録用CSVファイル: ~/Documents/webriverside/csv/ssh_auth_error_[日付].csv
parser.add_argument("--csv-file", type=str, required=True,
help="Insert CSV file path.")
parser.add_argument("--enable-debug", action="store_true",
help="Enable logger debug out.")
args: argparse.Namespace = parser.parse_args()
enable_debug: bool = args.enable_debug
if enable_debug:
app_logger.setLevel(level=logging.DEBUG)
else:
app_logger.setLevel(level=logging.INFO)
app_logger.info(args)
# CSVファイルを開く
csv_file: str = args.csv_file
csv_path = os.path.join(os.path.expanduser(csv_file))
if not os.path.exists(csv_path):
app_logger.error(f"FileNotFound: {csv_path}")
exit(1)
# CSVレコード: "log_date,ip_addr,appear_count"
csv_lines: List[str] = read_csv(csv_path)
line_cnt: int = len(csv_lines)
# CSVファイル行数
app_logger.info(f"csv: {line_cnt} lines.")
if line_cnt == 0:
app_logger.warning("Empty csv record.")
exit(0)
# database
db: Optional[pgdatabase.PgDatabase] = None
try:
db = pgdatabase.PgDatabase(DB_CONF_FILE)
conn: connection = db.get_connection()
# CSVから取得したIPアドレス(2列目)が登録済みかチェック
ip_list: List[str] = [line.split(",")[1] for line in csv_lines]
exists_ip_dict: Dict[str, int] = bulk_exists_ip_addr(
conn, ip_list, logger=app_logger)
app_logger.info(f"exists_ip_dict.size: {len(exists_ip_dict)}")
if enable_debug:
app_logger.debug(f"exists_ip_dict: {exists_ip_dict}")
# 登録済みIPアドレスを除外した追加登録用のレコードリストを作成
reg_ip_datas: List[RegUnauthIpAddr] = get_register_ip_list(
exists_ip_dict, csv_lines, logger=app_logger
)
# unauth_ip_addrテーブルとssh_auth_errorテーブル登録トランザクション
reg_ip_datas_cnt: int = len(reg_ip_datas)
app_logger.info(f"reg_ip_datas.size: {reg_ip_datas_cnt}")
# 不正アクセスIPアドレステーブルに新規登録
if reg_ip_datas_cnt > 0:
insert_unauth_ip_main(
conn, exists_ip_dict, reg_ip_datas,
logger=app_logger, enable_debug=enable_debug
)
# 不正アクセスカウンターテーブル登録用リスト
ssh_auth_error_list: List[SshAuthError] = get_register_ssh_auth_error_list(
exists_ip_dict, csv_lines, logger=app_logger
)
app_logger.info(
f"Register ssh_auth_error_list.size: {len(ssh_auth_error_list)}"
)
# 不正アクセスカウンターテーブルに新規
if len(ssh_auth_error_list) > 0:
insert_ssh_auth_error_main(
conn, ssh_auth_error_list,
logger=app_logger, enable_debug=enable_debug
)
# 両方のテーブル登録で正常終了したらコミット
db.commit()
except Exception as exp:
if db is not None:
db.rollback()
app_logger.error(exp)
exit(1)
finally:
if db is not None:
db.close()
if __name__ == '__main__':
batch_main()
3-7-2. スクリプトの実行
CSVファイルの内容
"log_date","ip_addr","appear_count"
"2024-06-10","45.246.204.41",467
"2024-06-10","36.108.171.189",373
"2024-06-10","165.22.85.249",283
"2024-06-10","223.108.114.238",281
"2024-06-10","218.92.0.22",83
"2024-06-10","36.64.232.117",49
"2024-06-10","82.151.65.155",41
"2024-06-10","20.244.134.31",41
"2024-06-10","165.232.115.144",40
"2024-06-10","43.136.95.69",38
"2024-06-10","122.46.163.188",33
"2024-06-10","60.188.49.52",31
(1) 未登録のCSVファイルを指定した場合
(py_psycopg2) $ python BatchInsert_with_csv.py \
> --csv-file csv/ssh_auth_error_2024-06-10.csv
INFO csv-file: csv/ssh_auth_error_2024-06-10.csv
INFO csv: 12 lines.
INFO exists_ip_dict.size: 0
INFO reg_ip_datas.size: 12
INFO registered_ip_ids.size: 12
INFO Register ssh_auth_error_list.size: 12
(2) 登録済みのCSVファイルを指定した場合
※エラーにならないことを確認
(py_psycopg2) $ python BatchInsert_with_csv.py \
> --csv-file csv/ssh_auth_error_2024-06-10.csv
INFO csv-file: csv/ssh_auth_error_2024-06-10.csv
INFO csv: 12 lines.
INFO exists_ip_dict.size: 12
INFO Registered_count: 12
INFO reg_ip_datas.size: 0
INFO Register ssh_auth_error_list.size: 12
INFO ssh_auth_error テーブルに登録可能データなし.
4. データベースの活用
検索期間に1日当たり30回を超え、かつ3日以上連続して不正アクセスしたIPアドレスを表示するクエリーを作成。
4-1. dockerコンテナ内で実行するシェルスクリプト
#!/bin/bash
# dockerコンテナ内の psqlでクエリを実行するシェルスクリプト
# ※dockerコンテナ内で実行する必要がある
# 検索期間で出現回数が30回を超え、かつ連続して3日以上布施アフクセスしたIPアドレスを出力
# (例) 2024-06-01 2024-06-30
echo "[検索期間] $1 〜 $2"
cat<<-EOF | psql -Udeveloper qiita_exampledb --tuples-only
-- unauth_ip_addr と ssh_auth_errorの結合
WITH t_ip_joined AS(
SELECT
log_date, ip_addr, appear_count
FROM
mainte2.ssh_auth_error sae
INNER JOIN mainte2.unauth_ip_addr ip_t
ON sae.ip_id = ip_t.id
),
-- 1日あたりの不正アクセス回数が30回を超えるIPアドレス
ip_appear_count_over30 AS(
SELECT ip_addr,count(ip_addr) as ip_cnt_in_days
FROM
(SELECT log_date, ip_addr, appear_count FROM t_ip_joined WHERE appear_count > 30)
WHERE
log_date BETWEEN '${1}' AND '${2}'
GROUP BY ip_addr
),
-- 3日以上連続して不正アクセスしてきたIPアドレス
access_morethan_3days_ip AS(
SELECT ip_addr FROM ip_appear_count_over30 WHERE ip_cnt_in_days >= 3
)
-- 検索期間で3日以上連続して不正アクセスしてきたIPアドレスをログ収集日を含めて出力
-- ※IPアドレスをソート用に加工した文字列でソートして出力
SELECT
log_date, ip_addr, appear_count
FROM
t_ip_joined
WHERE
log_date BETWEEN '${1}' AND '${2}'
AND
ip_addr in (SELECT * FROM access_morethan_3days_ip)
-- IPアドレス(ドットで分割したそれぞれの数値が3桁未満なら3桁になるまで'0'を付加)順, ログ日付順
-- ソートキー用の加工例: '79.110.62.14' -> '079.110.062.014'
ORDER BY
lpad(split_part(ip_addr,'.',1),3,'0')
|| lpad(split_part(ip_addr,'.',2),3,'0')
|| lpad(split_part(ip_addr,'.',3),3,'0')
|| lpad(split_part(ip_addr,'.',4),3,'0'), log_date;
EOF
dockerコンテナ内で実行
※コンテナ環境依存するパスを省略しています。
$ docker exec -it postgres-qiita bin/bash
ea487fa93fc2:# ./show_unauth_accessed_ip_for_morethan_3days.sh \
> 2024-06-10 2024-06-16
[検索期間] 2024-06-10 〜 2024-06-16
2024-06-12 | 92.118.39.40 | 73
2024-06-13 | 92.118.39.40 | 252
2024-06-14 | 92.118.39.40 | 49
2024-06-12 | 103.77.241.34 | 392
2024-06-13 | 103.77.241.34 | 3269
2024-06-14 | 103.77.241.34 | 3382
2024-06-15 | 103.77.241.34 | 3345
2024-06-16 | 103.77.241.34 | 959
4-2. ホストOS上で実行するシェルスクリプト
dockerコンテナ内のシェルスクリプト(絶対パス指定)を呼び出すスクリプト
#!/bin/bash
# ホストOSから dockerコンテナ内のシェルスクリプトを実行するシェルスクリプト
# 検索期間で出現回数が30回を超え、かつ連続して3日以上不正アフクセスしたIPアドレスを出力
# (例) ./dkr_exec_show_unauth_accessed_ip.sh 2024-06-01 2024-06-30
script_name_in_container="show_unauth_accessed_ip_for_morethan_3days.sh"
# コンテナ内で実行する場合は絶対パス
scrpit_path_in_container="/home/qiita/data/sql/qiita_exampledb/${script_name_in_container}"
# 実行中のコンテナ内で、新しいコマンドを実行
# https://docs.docker.jp/engine/reference/commandline/exec.html
# Docs » コマンドライン リファレンス » Docker CLI (docker) » docker exec
# docker exec
# [補足説明] ...が、docker exec -ti my_container sh -c "echo a && echo b" は動作します
docker exec -it postgres-qiita sh -c "${scrpit_path_in_container} ${1} ${2}"
ホストOS上でシェルスクリプトを実行
※ホストのスクリプトパスを省略しています。
$ ./dkr_exec_show_unauth_accessed_ip.sh 2024-06-10 2024-06-16
[検索期間] 2024-06-10 〜 2024-06-16
2024-06-12 | 92.118.39.40 | 73
2024-06-13 | 92.118.39.40 | 252
2024-06-14 | 92.118.39.40 | 49
2024-06-12 | 103.77.241.34 | 392
2024-06-13 | 103.77.241.34 | 3269
2024-06-14 | 103.77.241.34 | 3382
2024-06-15 | 103.77.241.34 | 3345
2024-06-16 | 103.77.241.34 | 959
4-3. 出力結果の解析とセキュリティ対応
この記事の投稿日は9月上旬、自宅サーバーのエラーログ採取を開始してから約3ヶ月が経過しました。
- 判明したこと
- (A) 3日続けて不正アクセスするIPアドレスは頻繁にある
- (B) 4日連続 1000回 (1日当たり) を超えるIPアドレスはそれ程多くない
- 運用方針
- (A) 監視を継続する
※ 多くのホストが3日程度で不正アクセスしなくなるケースが多い - (B) パケットフィルタリングの対象とする
IPアドレスが属する国コードとネットワークアドレスを取得してネットワーク単位でパケットフィルタリングを実行する
- (A) 監視を継続する
4-3-1. 国コードとネットワークアドレスの取得
RIR公開データからIPアドレスの国コードとネットワークアドレスを取得する。
(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")
下記投稿で紹介しました。
Qiita@pipito-yukio「不正アクセスしてきたホストの国コードを知ってセキュリティ対策に活用する」
4-3-2. パケットフィルタリング実行
自宅サーバー側で、指定したネットワーク範囲からの通信を drop ゾーン設定しアクセスを破棄する。
※1 Ubuntu には firewall-cmd はインストールされていないので手動でインストール
$ sudo apt-get install firewalld
※2 ネットワーク範囲が不明の場合はホストのIPアドレスを設定する。
$ sudo firewall-cmd --zone=drop --add-source=103.77.240.0/23
success
$ sudo firewall-cmd --runtime-to-permanent
success
$ sudo systemctl restart firewalld.service
5. 最後に
過去 CentOS(5, 6, 7, stream 8) で自宅サーバーを運用していたときは、/var/log 配下の secure ログを直接 grep し SSH ログインエラーのログを取得。
それをクライアント PC の Python スクリプトで処理して出現回数が1000回を超えるIPアドレスをみつけそのIPアドレスを単独でパケットフィルタリングの対象としていました。
フィルタリングしたIPアドレスの管理もメモ帳アプリ(gedit)に実行日とIPアドレスを記録するという原始的な方法でとても管理とはいえない状態でした。
2024年05月末で CentOS Stream 8 が EOL になったことで、思い切って Ubuntu Server 24.04 に移行しセキュリティ管理もシステム化したことでセキュリティ対策も少しだけ効率的に実施することができるようになりました。
Ubuntu Server 24.04 はGUIのないヘッドレスOSなので、クライアントPCでデータベース管理することで不正アクセスしたIPアドレスの可視化も容易になります。
記事で紹介したスクリプトとサンプルを下記GihtHubで公開しています。
GitHub pipito-yuko/ qiita-posts / python / BatchInsert_from_ssh_auth_error_csv
BatchInsert_from_ssh_auth_error_csv/
├── README.md
├── docker
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── initdb
│ ├── 10_createdb.sql
│ └── 11_createtable.sql
├── scripts
│ ├── dkr_exec_show_unauth_accessed_ip.sh
│ └── show_unauth_accessed_ip_for_morethan_3days.sh
└── src
├── BatchInsert_with_csv.py
├── conf
│ └── db_conn.json
├── csv
│ ├── ssh_auth_error_2024-06-10.csv
│ ├── ssh_auth_error_2024-06-11.csv
│ ├── ssh_auth_error_2024-06-12.csv
│ ├── ssh_auth_error_2024-06-13.csv
│ ├── ssh_auth_error_2024-06-14.csv
│ ├── ssh_auth_error_2024-06-15.csv
│ └── ssh_auth_error_2024-06-16.csv
└── db
└── pgdatabase.py