1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ジャーナルログからSSHで不正アクセスするホストのIPアドレスを収集する

Last updated at Posted at 2024-09-04

前回、自宅サーバーに不正アクセスしてきたホストについての記事を投稿しましたが、今回はその記事の元になったホストのIPアドレスをジャーナルログから収集したログファイルを生成するシェルスクリプトと、そのログファイルからIPアドレスの不正アクセス回数を集計するpythonスクリプトについて紹介します。

Qiita@pipito-yukio「不正アクセスしてきたホストの国コードを知ってセキュリティ対策に活用する」

最初に今回紹介するスクリプトが出力するファイルの抜粋を以下に示します。

① 自宅サーバー側のシェルスクリプトが保存したファイル
※不正アクセスしてきたホストのIPアドレスを含む行のみを抽出

AuthFail_ssh_2024-09-02.log
2024-09-02T00:00:01+09:00 sshd[20193]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root
2024-09-02T00:00:36+09:00 sshd[20195]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root
2024-09-02T00:00:46+09:00 sshd[20202]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=212.233.136.201
2024-09-02T00:00:59+09:00 sshd[20204]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root
...以下省略...

② クライアントPC側のpythonスクリプトで集計し保存したCSVファイル
※IPアドレスの出現回数のランキング

ssh_auth_error_2024-09-02.csv
"log_date","ip_addr","appear_count"
"2024-09-02","185.254.98.118",142
"2024-09-02","111.44.194.165",48
"2024-09-02","64.227.150.212",45
"2024-09-02","212.233.136.201",43
...以下省略...

参考図書

  • 「CentOS 7 システム管理ガイド」
    発行日が2015年11月と相当古い書籍ですが最新のUbuntuでも問題なく使用できます。

Chapter 1 systemd | 1.6 ジャーナルの制御 (journalctl)
[発行所] 株式会社 秀和システム

  • OSに付属のマニュアル
    スクリプトを実行するOSに付属したマニュアルは必須。
    ※私はファイルに出力し、Desktop PCにコピーして gedit で確認しています。
    ※1行目の内容を短く編集しています。
~$ man journalctl > ~/work/man_journalctl.txt
~$ head -n10 ~/work/man_journalctl.txt 
JOURNALCTL(1)            journalctl            JOURNALCTL(1)

NAME
       journalctl - Print log entries from the systemd journal

SYNOPSIS

       journalctl [OPTIONS...] [MATCHES...]

DESCRIPTION

自宅サーバーの journalctlコマンドのバージョン確認
※使えるコマンドのオプションはバージョンに依存するので確認が必要です。

$ journalctl --version | grep systemd
systemd 255 (255.4-1ubuntu8.4)

1. 環境

1-1. 自宅サーバー

  • OS: Ubuntu server 24.04
    実行ユーザ(ホーム)にクライアントPCの公開キーが設定されていること
    • cron で毎日定時にログ収集スクリプトを実行

1-2. クライアントPC

  • OS: OS: Ubuntu Desktop 22.0
    • python 3.10.12
      スクリプト専用の仮想環境を作成し、仮想環境内でpythonスクリプトを実行

2. スクリプト

2-1. 自宅サーバーのログ収集

指定日のSSHサービスユニットのログを取得する (00:00:00 〜 23:59:59)

2-1-1. journalctlコマンドで出力を確認

  • (1) 期間とサービスユニット名のみ指定した場合
    ※デフォルトでタイムスタンプはOSロケールの日本語でホスト名も出力されます。
$ journalctl --since="2024-09-02" --until="2024-09-02 23:59:59" -u ssh.service
 9月 02 00:00:01 example.hostname sshd[20193]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root
 9月 02 00:00:03 example.hostname sshd[20193]: Failed password for root from 113.30.191.50 port 44706 ssh2
 9月 02 00:00:05 example.hostname sshd[20193]: Received disconnect from 113.30.191.50 port 44706:11: Bye Bye [preauth]
 9月 02 00:00:05 example.hostname sshd[20193]: Disconnected from authenticating user root 113.30.191.50 port 44706 [preauth]
 9月 02 00:00:36 example.hostname sshd[20195]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root
 9月 02 00:00:38 example.hostname sshd[20195]: Failed password for root from 185.254.98.118 port 48126 ssh2
 9月 02 00:00:40 example.hostname sshd[20195]: Connection closed by authenticating user root 185.254.98.118 port 48126 [preauth]
 9月 02 00:00:46 example.hostname sshd[20202]: Invalid user rr from 212.233.136.201 port 46206
 9月 02 00:00:46 example.hostname sshd[20202]: pam_unix(sshd:auth): check pass; user unknown
 9月 02 00:00:46 example.hostname sshd[20202]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=212.233.136.201
 9月 02 00:00:48 example.hostname sshd[20202]: Failed password for invalid user rr from 212.233.136.201 port 46206 ssh2
 9月 02 00:00:49 example.hostname sshd[20202]: Received disconnect from 212.233.136.201 port 46206:11: Bye Bye [preauth]
 9月 02 00:00:49 example.hostname sshd[20202]: Disconnected from invalid user rr 212.233.136.201 port 46206 [preauth]
...一部省略...
 9月 02 23:58:05 example.hostname sshd[30310]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root
 9月 02 23:58:07 example.hostname sshd[30310]: Failed password for root from 185.254.98.118 port 40700 ssh2
 9月 02 23:58:09 example.hostname sshd[30310]: Connection closed by authenticating user root 185.254.98.118 port 40700 [preauth]
  • (2) タイムスタンプをISO形式にしホスト名を省略
    【参考】manの該当するオプションの抜粋を示します
-o, --output=
    Controls the formatting of the journal entries that are shown. Takes one of the following options:

    ...一部省略...

    short-iso
        is very similar, but shows timestamps in the RFC 3339[2] profile of ISO 8601.

        Added in version 206.
...一部省略...
--no-hostname
    Don't show the hostname field of log messages originating from the local host. This switch has an effect only on the
    short family of output modes (see above).

    Note: this option does not remove occurrences of the hostname from log entries themselves, so it does not prevent the
    hostname from being visible in the logs.

    Added in version 230.

追加したオプション: -o short-iso --no-hostname

$ journalctl --since="2024-09-02 00:00:00" --until="2024-09-02 23:59:59" -u ssh.service -o short-iso --no-hostname
2024-09-02T00:00:01+09:00 sshd[20193]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root
2024-09-02T00:00:03+09:00 sshd[20193]: Failed password for root from 113.30.191.50 port 44706 ssh2
2024-09-02T00:00:05+09:00 sshd[20193]: Received disconnect from 113.30.191.50 port 44706:11: Bye Bye [preauth]
2024-09-02T00:00:05+09:00 sshd[20193]: Disconnected from authenticating user root 113.30.191.50 port 44706 [preauth]
2024-09-02T00:00:36+09:00 sshd[20195]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root
2024-09-02T00:00:38+09:00 sshd[20195]: Failed password for root from 185.254.98.118 port 48126 ssh2
2024-09-02T00:00:40+09:00 sshd[20195]: Connection closed by authenticating user root 185.254.98.118 port 48126 [preauth]
2024-09-02T00:00:46+09:00 sshd[20202]: Invalid user rr from 212.233.136.201 port 46206
2024-09-02T00:00:46+09:00 sshd[20202]: pam_unix(sshd:auth): check pass; user unknown
2024-09-02T00:00:46+09:00 sshd[20202]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=212.233.136.201
2024-09-02T00:00:48+09:00 sshd[20202]: Failed password for invalid user rr from 212.233.136.201 port 46206 ssh2
2024-09-02T00:00:49+09:00 sshd[20202]: Received disconnect from 212.233.136.201 port 46206:11: Bye Bye [preauth]
2024-09-02T00:00:49+09:00 sshd[20202]: Disconnected from invalid user rr 212.233.136.201 port 46206 [preauth]
...以下省略...
  • (3) SSHログインエラー行の抽出
    文字列[": authentication failure;"]で grep して目的のエラー行のみを抽出します。
$ journalctl --since="2024-09-02 00:00:00" --until="2024-09-02 2024-09-02 23:59:59" -u ssh.service -o short-iso --no-hostname \
> | grep ": authentication failure;"
2024-09-02T00:00:01+09:00 sshd[20193]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root
2024-09-02T00:00:36+09:00 sshd[20195]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root
2024-09-02T00:00:46+09:00 sshd[20202]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=212.233.136.201
# ...一部省略...
2024-09-02T23:51:49+09:00 sshd[30299]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.72.105.48  user=root
2024-09-02T23:58:05+09:00 sshd[30310]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=185.254.98.118  user=root

不正アクセスしてきたホストのIPアドレスが含まれた行のみとなります。

logname= uid=0 euid=0 tty=ssh ruser= rhost=113.30.191.50  user=root

2-1-2. シェルスクリプト

journalctlコマンドで指定日のSSHサービスユニットのログを抽出し指定したファイルに保存する

  • ログ抽出日の算出
    • 引数がない場合 ※cronでの実行を想定
      スクリプト実行日の前日の日付をログ抽出日とする
    • 引数が有る場合 ※アドホックの実行を想定
      引数で指定された日をログ抽出日とする
  • ログ抽出日の最終時刻を設定
  • ログ抽出日をログファイル名に含める
  • journalctlコマンドとgrepを組み合わせて抽出したログを指定先に保存
    (保存ファイル名) ~/work/journal_logs/AuthFail_ssh_[ロク抽出日].log
bin/ssh_auth_error.sh
#!/bin/bash

# sshサービスに関する前日のジャーナルログを所定のディレクトリに出力するスクリプト
# (利用想定) cron で 毎日 0:10 に実行される

next_day() {
   retval=$(date -d "$1 1 day" +'%F')
   echo "$retval"
}

before_day() {
   retval=$(date -d "$1 - 1 day" +'%F')
   echo "$retval"
}

HOME="/home/testuser"
CMD="/usr/bin/journalctl"
GREP_KWD=": authentication failure;"
OUTPUT_DIR="$HOME/work/journal_logs"

# 引数(開始日付のみ)の有無で開始日付を設定する
start_day=
if [ $# -eq 0 ]; then
    # スクリプト実行日 ※cron実行を想定
   today=$(date +'%F')
    # 前日のデータが対象
   start_day=$(before_day "$today")
else
    # 指定日
   start_day="$1"
fi
end_datetime="$start_day 23:59:59"
file_name="AuthFail_ssh_$start_day.log"

$CMD --since="$start_day" --until="$end_datetime" -u ssh.service -o short-iso --no-hostname \
   | grep "$GREP_KWD" > "$OUTPUT_DIR/$file_name"
echo "Saved $file_name" 

実行ユーザのcronを編集し指定した時刻にシェルスクリプトを登録する

$ crontab -e

毎日 00:05 にシェルスクリプトを実行するように追記

# DO NOT EDIT THIS FILE - edit the master and reinstall.
# (/tmp/crontab.h3U0CD/crontab installed on Fri Jun 21 15:34:05 2024)
# (Cron version -- $Id: crontab.c,v 2.13 1994/01/17 03:20:37 vixie Exp $)
# Edit this file to introduce tasks to be run by cron.
# ...一部省略...
# m h  dom mon dow   command
# ここに追記
05 00 * * *	/home/cronuser/bin/ssh_auth_error_log.sh

2-2. クライアントPCでログを集計する

2-2-1. 自宅サーバーのログファイルをローカルPCにコピーする

scpコマンドでクライアントPCの指定したディレクトリにコピーする

$ cd ~/Documents/qiita
$ ~/Documents/qiita$ scp user1@myserver:~/work/journal_logs/AuthFail_ssh_2024-09-02.log .
AuthFail_ssh_2024-09-02.log              100%  341KB  10.6MB/s   00:00.

2-2-2. pythonスクリプト

2-2-2-1. ファイル操作関数

from typing import List, Optional

# ログフアイル読み込み
def read_text(file_name: str) -> List[str]:
    with open(file_name, 'r') as fp:
        lines: List[str] = [ln for ln in fp]
        return lines


# CSV出力
def write_csv(
        file_name: str, save_list: List[str],
        header: Optional[str] = None) -> None:
    with open(file_name, 'w') as fp:
        if header is not None:
            fp.write(f"{header}\n")
        for line in save_list:
            fp.write(f"{line}\n")
        fp.flush()


# 指定した保存先にCSVファイルを保存
def save_csvfile(param_save_path: str, filename: str, csv_list: List[str]) -> str:
    save_path: str
    if param_save_path.find("~") == 0:
        save_path = os.path.join(os.path.expanduser(param_save_path))
    else:
        save_path = param_save_path
    # 保存先ディレクトリ ※存在しなければデイレクトリを作成する
    if not os.path.exists(save_path):
        os.mkdir(save_path)

    save_file: str = os.path.join(save_path, filename)
    write_csv(save_file, csv_list, header='"log_date","ip_addr","appear_count"')
    return save_file

2-2-2-2. IPアドレスの抽出

(1) 正規表現の定義

自宅サーバーで収集したログには下記のような2種類のパターンがあるので両方にマッチする正規表現を定義します。

  • rhost=[IPアドレス]
    authentication failure; ...一部省略... ruser= rhost=216.181.226.86
  • rhost=[IPアドレス] user=[ユーザー名]
    authentication failure; ...一部省略... ruser= rhost=218.92.0.96 user=root
import re

# rhhost の後ろに何もないケースと "user=xxx"があるパターンがある
# authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=216.181.226.86
# authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.96  user=root
re_auth_fail: re.Pattern = re.compile(
    r"^.+?rhost=([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*$"
)

(2) 抽出関数の定義

正規表現に一致した行からIPアドレスのみを抽出します。

def extract_ip_tolist(lines: List[str]) -> List[str]:
    result: List = []
    for line in lines:
        mat: Optional[re.Match] = re_auth_fail.search(line)
        if mat:
            result.append(mat.group(1))
    return result

2-2-2-3. IPアドレスの出現回数を集計する

正規表現で抽出したIPアドレスのリストを collections.Counter のコンストラクタに設定

Counterの使い方については下記公式ドキュメントが参考になります。
Documentation >> Python 標準ライブラリ >> データ型 >> collections --- コンテナデータ型

from collections import Counter

# エラーログファイルの読み込み
f_path: str = "error_logs/AuthFail_ssh_2024-07-09.log"
lines: List[str] = read_text(f_path)
# ipアドレスの抽出
ip_list: List[str] = extract_ip_tolist(lines)
# 抽出した ip の出現数をカウント
counter: Counter = Counter(ip_list)

(1) IPアドレスの上位 N件 を出力する場合

top_N: int = 20
for item in counter.most_common(top_N):
    app_logger.info(item)

以下のようにIPアドレスの出現数が 20件 (Top 20) 出力されます。

('1.92.5.77', 716)
('103.171.134.90', 467)
('220.250.41.11', 57)
('34.175.118.185', 42)
('175.178.237.54', 41)
('141.11.229.140', 41)
('194.67.82.57', 41)
('81.208.169.112', 34)
('141.145.203.77', 34)
('114.205.92.219', 31)
('203.194.106.73', 30)
('120.88.46.226', 30)
('103.100.209.77', 30)
('161.35.18.131', 30)
('124.156.199.148', 30)
('43.134.63.221', 30)
('83.97.73.43', 30)
('91.237.163.36', 30)
('59.36.254.224', 30)
('117.80.234.141', 28)

(2) 全件出力する場合

for item in counter.most_common():
    app_logger.info(item)

以下のように全件(71)出力されます。※一部割愛します。

counter.elements.size: 71
('1.92.5.77', 716)
('103.171.134.90', 467)
('220.250.41.11', 57)
('34.175.118.185', 42)
('175.178.237.54', 41)
('141.11.229.140', 41)
('194.67.82.57', 41)
('81.208.169.112', 34)
('141.145.203.77', 34)
('114.205.92.219', 31)
('203.194.106.73', 30)
('120.88.46.226', 30)
('103.100.209.77', 30)
('161.35.18.131', 30)
('124.156.199.148', 30)
('43.134.63.221', 30)
('83.97.73.43', 30)
('91.237.163.36', 30)
('59.36.254.224', 30)
('117.80.234.141', 28)
#...一部省略...
('5.30.128.238', 1)
('117.70.94.155', 1)
('210.10.221.238', 1)

2-2-3. スクリプトの全てのソース

入力パラメータ

  • エラーログファイル(必須): --log-file [filepath]
  • 出力先: --output [console(default) | csv]
    • console: コンソール出力 (既定値)
      • IPアドレスの出現数の Top N 位: --show-top N
        ※指定なしの場合は0として全件出力する
      • 前ゼロ加工済みIPアドレスの昇順に N 件分出力: sort-ip-addr
        ※指定された場合のみ
    • csv: CSVファイル保存
      • 出力先ディレクトリ(必須): --save-path [directory]
        ※指定なしの場合: "~/Documents/qiita"
      • 出現数の最小値: --appear-threshold 出現数
        ※未指定の場合は30回 (既定値)
ExportCSV_with_autherrorlog.py
import argparse
import logging
import os
import re
from collections import Counter
from datetime import date
from typing import List, Optional, Set, Tuple

"""
自宅サーバー取得したSSH不正ログインエラーログからIPアドレスの出現数を集計するスクリプト
"""

# rhhost の後ろに何もないケースと "user=xxx"があるパターンがある
# authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=216.181.226.86
# authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=218.92.0.96  user=root
re_auth_fail: re.Pattern = re.compile(
    r"^.+?rhost=([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*$"
)
# ファイル名のログ日付抽出
re_log_file: re.Pattern = re.compile(r"^AuthFail_ssh_(\d{4}-\d{2}-\d{2})\.log$")


def read_text(file_name: str) -> List[str]:
    with open(file_name, 'r') as fp:
        lines: List[str] = [ln for ln in fp]
        return lines


def write_csv(
        file_name: str, save_list: List[str],
        header: Optional[str] = None) -> None:
    with open(file_name, 'w') as fp:
        if header is not None:
            fp.write(f"{header}\n")
        for line in save_list:
            fp.write(f"{line}\n")
        fp.flush()


def save_csvfile(param_save_path: str, filename: str, csv_list: List[str]) -> str:
    save_path: str
    if param_save_path.find("~") == 0:
        save_path = os.path.join(os.path.expanduser(param_save_path))
    else:
        save_path = param_save_path
    # 保存先ディレクトリ ※存在しなければデイレクトリを作成する
    if not os.path.exists(save_path):
        os.mkdir(save_path)

    save_file: str = os.path.join(save_path, filename)
    write_csv(save_file, csv_list, header='"log_date","ip_addr","appear_count"')
    return save_file


def extract_log_date(file_path: str) -> str:
    # ファイル名から日付を取得
    b_name: str = os.path.basename(file_path)
    f_mat: Optional[re.Match] = re_log_file.search(b_name)
    if f_mat:
        return f_mat.group(1)
    else:
        return date.today().isoformat()


def extract_ip_tolist(lines: List[str]) -> List[str]:
    result: List = []
    for line in lines:
        mat: Optional[re.Match] = re_auth_fail.search(line)
        if mat:
            result.append(mat.group(1))
    return result


def convert_sorted_ip_list(ip_list: List[str]) -> List[str]:
    # カウンターオブジェクトから重複のないキーを取得
    ip_set: Set[str] = set(ip_list)
    # ソート用の前ゼロ加工したIPアドレスを格納するリスト
    full_ip_list: List[str] = []
    ip_4: List[str]
    for ip_addr in ip_set:
        ip_4 = ip_addr.split(".")
        # IPアドレスの各パーツを前ゼロ加工して結合する
        ip_full: str = (f"{int(ip_4[0]):03}.{int(ip_4[1]):03}."
                        f"{int(ip_4[2]):03}.{int(ip_4[3]):03}")
        full_ip_list.append(ip_full)
    # 前ゼロ加工したIPアドレスリストを数値的にソート
    sorted_list: List[str] = sorted(full_ip_list)
    # IPアドレスを元に戻す
    result: List[str] = []
    for full_ip in sorted_list:
        ip_4 = full_ip.split(".")
        # 各パーツに分解し、パーツごとに数値に戻して元のIPアドレスに戻す
        org_ip: str = f"{int(ip_4[0])}.{int(ip_4[1])}.{int(ip_4[2])}.{int(ip_4[3])}"
        result.append(org_ip)
    return result


def batch_main():
    logging.basicConfig(format="%(message)s")
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.INFO)

    # コマンドラインパラメータ
    # --log-file: エラーログファイル ※必須
    # (A) コンソール出力: --output console ※デフォルト
    #    --show-top: Top N, デフォルト(=0) 全て出力
    #    --sort-ip-addr: 指定された場合、前ゼロ加工したIPアドレスの昇順でソート
    # (B) CSVファイル出力: --output csv
    #    --save-path: CSVの出力先ディレクトリ ※デフォルト: "~/Documents/csv"
    #    --appear-threshold: 出現数の最小値 ※デフォルト: 30回
    parser = argparse.ArgumentParser()
    parser.add_argument("--log-file", required=True, type=str,
                        help="Log File name.")
    parser.add_argument("--output", type=str,
                        choices=["console", "csv"], default="console",
                        help="Output console or csv file, default console.")
    parser.add_argument("--sort-ip-addr", action="store_true",
                        help="Sort ip-addr for output console.")
    parser.add_argument("--show-top", type=int, default=0,
                        help="IPアドレスのランキング(Top N 位), 規定値(=0)なら全て出力.")
    parser.add_argument("--appear-threshold", type=int, default=30,
                        help="出現数の閾値 N回以上。既定値=30回")
    # CSVファイルの保存先 ※未指定ならデフォルト "~/Documents/csv"
    parser.add_argument("--save-path", type=str, default="~/Documents/csv",
                        help="Save csv file path(Directory).")
    args: argparse.Namespace = parser.parse_args()
    log_file: str = args.log_file
    app_logger.info(f"log-file: {log_file}")

    # 出力モード
    output: str = args.output
    p_output: str = f"output: {output}"
    if output == "console":
        app_logger.info(
            f"{p_output}, show-top: {args.show_top}, sort-ip-addr: {args.sort_ip_addr}"
        )
    else:
        app_logger.info(f"{p_output}, appear-threshold: {args.appear_threshold}")
        app_logger.info(f"save_path: {args.save_path}")

    # エラーログファイルの存在チェック
    f_path: str
    if log_file.find("~/") == 0:
        f_path = os.path.expanduser(log_file)
    else:
        f_path = log_file
    if not os.path.exists(f_path):
        app_logger.error(f"FileNotFound: {f_path}")
        exit(1)

    # エラーログファイルの読み込み
    lines: List[str] = read_text(f_path)
    ip_list: List[str] = extract_ip_tolist(lines)
    list_size: int = len(ip_list)
    app_logger.info(f"ip_list.size: {list_size}")

    if list_size > 0:
        # 抽出した ip の出現数をカウント
        counter: Counter = Counter(ip_list)
        app_logger.info(f"counter.elements.size: {len(counter)}")
        if output == "csv":
            # ファイル出力する場合は出現回数の閾値
            appear_threshold: int = args.appear_threshold
            # ファイル名からログ日付を取り出す ※シェルスクリプトでログ日付がファイル名の末尾に付与されている
            log_date: str = extract_log_date(f_path)
            csv_list: List[str] = []
            # CSV出力: 出現回数が指定件数以上
            for (ip_addr, cnt) in counter.most_common():
                if cnt >= appear_threshold:
                    csv_line: str = f'"{log_date}","{ip_addr}",{cnt}'
                    csv_list.append(csv_line)
            app_logger.info(f"output_lines: {len(csv_list)}")
            # 保存ファイル名
            save_name: str = f"ssh_auth_error_{log_date}.csv"
            saved_file: str = save_csvfile(args.save_path, save_name, csv_list)
            app_logger.info(f"Saved: {saved_file}")
        else:
            # コンソール出力の場合: 全件 | Top N (1以上)
            most_common: List[Tuple[str, int]]
            if args.show_top == 0:
                # 全件
                most_common = counter.most_common()
            else:
                # Top N位
                most_common = counter.most_common(args.show_top)
            if args.sort_ip_addr:
                # 前ゼロ加工済みIPアドレスの昇順
                tmp_list: List[str] = [ip for (ip, cnt) in most_common]
                sorted_ip_list = convert_sorted_ip_list(tmp_list)
                for ip in sorted_ip_list:
                    app_logger.info(f"('{ip}', {counter[ip]})")
            else:
                # 出現回数のランキング(降順)
                for item in most_common:
                    app_logger.info(item)
    else:
        app_logger.info(f"不正アクセスに該当するIPアドレス未検出.")


if __name__ == '__main__':
    batch_main()

2-3. pythonスクリプトの実行

python 仮想環境に入る

$ . py_venv/py_psycopg2/bin/activate
(py_psycopg2) $ 

ログフアイルの指定: --log-file error_logs/AuthFail_ssh_2024-07-09.log

2-3-1. コンソールに出力

(1) 出現回数のトップ 30 位まで出力: --show-top 30

(py_psycopg2) $ python ExportCSV_with_autherrorlog.py \
> --log-file error_logs/AuthFail_ssh_2024-07-09.log \
> --show-top 30
log-file: error_logs/AuthFail_ssh_2024-07-09.log
output: console, show-top: 30, sort-ip-addr: False
ip_list.size: 2027
counter.elements.size: 71
('1.92.5.77', 716)
('103.171.134.90', 467)
('220.250.41.11', 57)
('34.175.118.185', 42)
('175.178.237.54', 41)
('141.11.229.140', 41)
('194.67.82.57', 41)
('81.208.169.112', 34)
('141.145.203.77', 34)
('114.205.92.219', 31)
('203.194.106.73', 30)
('120.88.46.226', 30)
('103.100.209.77', 30)
('161.35.18.131', 30)
('124.156.199.148', 30)
('43.134.63.221', 30)
('83.97.73.43', 30)
('91.237.163.36', 30)
('59.36.254.224', 30)
('117.80.234.141', 28)
('139.198.163.221', 27)
('190.153.249.99', 25)
('200.85.58.110', 24)
('49.51.187.152', 24)
('79.10.24.181', 23)
('45.145.4.203', 15)
('141.98.10.125', 15)
('218.156.108.222', 10)
('42.51.41.163', 6)
('170.64.229.180', 5)

(2) 出現回数のトップ 20 位 かつ 前ゼロ加工済みIPアドレスでソート
--show-top 20 --sort-ip-addr

前ゼロ加工済みIPアドレスでソートするメリットは、同一ネットワークに属する思われる複数のIPアドレスを見つけるのが容易になります。

(py_psycopg2) $ python ExportCSV_with_autherrorlog.py \
> --log-file error_logs/AuthFail_ssh_2024-07-09.log \
> --show-top 20 --sort-ip-addr
log-file: error_logs/AuthFail_ssh_2024-07-09.log
output: console, show-top: 20, sort-ip-addr: True
ip_list.size: 2027
counter.elements.size: 71
('1.92.5.77', 716)
('34.175.118.185', 42)
('43.134.63.221', 30)
('59.36.254.224', 30)
('81.208.169.112', 34)
('83.97.73.43', 30)
('91.237.163.36', 30)
('103.100.209.77', 30)
('103.171.134.90', 467)
('114.205.92.219', 31)
('117.80.234.141', 28)
('120.88.46.226', 30)
('124.156.199.148', 30)
('141.11.229.140', 41)
('141.145.203.77', 34)
('161.35.18.131', 30)
('175.178.237.54', 41)
('194.67.82.57', 41)
('203.194.106.73', 30)
('220.250.41.11', 57)

2-3-2. CSVファイル出力

  • 出力タイプ: --output csv
  • CSVファイル保存先: --save-path csv
  • 出力するIPアドスレの最小出現回数: --appear-threshold 30 ※出現回数が30以上
(py_psycopg2) $ python ExportCSV_with_autherrorlog.py \
> --log-file error_logs/AuthFail_ssh_2024-07-09.log \
> --output csv --save-path csv --appear-threshold 30
log-file: error_logs/AuthFail_ssh_2024-07-09.log
output: csv, appear-threshold: 30
save_path: csv
ip_list.size: 2027
counter.elements.size: 71
output_lines: 19
Saved: csv/ssh_auth_error_2024-07-09.csv

ファイルの内容を下記に示します。

csv/ssh_auth_error_2024-07-09.csv
"log_date","ip_addr","appear_count"
"2024-07-09","1.92.5.77",716
"2024-07-09","103.171.134.90",467
"2024-07-09","220.250.41.11",57
"2024-07-09","34.175.118.185",42
"2024-07-09","175.178.237.54",41
"2024-07-09","141.11.229.140",41
"2024-07-09","194.67.82.57",41
"2024-07-09","81.208.169.112",34
"2024-07-09","141.145.203.77",34
"2024-07-09","114.205.92.219",31
"2024-07-09","203.194.106.73",30
"2024-07-09","120.88.46.226",30
"2024-07-09","103.100.209.77",30
"2024-07-09","161.35.18.131",30
"2024-07-09","124.156.199.148",30
"2024-07-09","43.134.63.221",30
"2024-07-09","83.97.73.43",30
"2024-07-09","91.237.163.36",30
"2024-07-09","59.36.254.224",30

最後に

CSVファイルはデータベースのテーブルに登録することを前提にしています。

このCSVファイルをテーブルに登録するスクリプトについては、Qiita投稿サイトの下記コンテンツをご覧ください。
Qiita@pipito-yukio「psycopg2 バッチ処理に適したクエリーを作成する」

今回紹介したスクリプトとサンプルのログファイル下記 GitHub リポジトリで公開しています。

(GitHub) pipito-yukio / qiita-posts / Linux / collect_ip_from_journal_logs

[リソース一覧]

collect_ip_from_journal_logs/
├── README.md
├── bin
│   └── ssh_auth_error.sh                  # 自宅サーバー側のシェルスクリプト
└── python
    ├── ExportCSV_with_autherrorlog.py     # クライアントPCで実行するpythonスクリプト
    ├── csv
    │   └── ssh_auth_error_2024-07-09.csv  # 出力されたCSVファイル
    └── error_logs
        └── AuthFail_ssh_2024-07-09.log    # サンプルのSSHエラーログファイル
1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?