0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ターミナルでかんたんなタイムラインを表示するスクリプト

Last updated at Posted at 2023-10-24

モチベ

SIEM入れるほどしっかり作ってない環境でサーバのログを眺めている際、かんたんなタイムチャートを表示させてログの傾向を見たいなぁ、、というケースがあります。

ちょっと雑ですが、かんたんなスクリプト作ってみました。
数日後にはどっかにやってしまいそうなので、保存がてら。

動作結果

こんなふうになるスクリプトです。

$ sudo cat /var/log/secure* | grep sshd | egrep "(Accepted|Failed)" | awk '{print $1,$2,$3","$11}' | ./log2timeline.py
State from: 1900-10-20 22:45:43
State to:   1900-10-22 10:41:00
10.0.0.1   : lii....l.i....i..........i..i.il.......i....................................ii
10.0.0.128 : ............................................................................il

時刻が1900年になってしまっているのは年数表示がないrsyslogの標準形式のログを食わせているせいなんですが、まぁ、何をしたいかはおわかりただけるかなと。

コード

こんなです。
かなり雑に作っちゃているので、使いながらちょこちょこ手直ししていこうかと思ってます。

今回の学びは頭の処理。
以下を使わせていただきました。
bashとpythonでのヒアドキュメントの仕様の違いをつかう、bash的にはエラーが出るけどexecした先で処理を終えてしまうからエラー表示が出るまでに処理がでない。
なるほどーと目から鱗が落ちる処理でした。
https://stackoverflow.com/questions/69759506/is-it-possible-to-construct-a-shebang-that-works-for-both-python-2-and-3

まぁ、そもそもCentos 7用にpython2でも動作するように調整するのもめんどくさかったのですが。。。
Centos7、早くEOLしてしまってほしい。。。

$ cat log2timeline.py
#!/usr/bin/env bash
_=''''
command -v python3 &> /dev/null && exec python3 $0 "$@" || exec python $0 "$@"
'''

import sys
import argparse


def timeformat_detecter(date_str, format=format):
    import datetime
    import time


    formats = [
            "%Y/%m/%d %Y:%M:%S.%f",
            "%Y/%m/%d %Y:%M:%S",
            "%b %d %H:%M:%S",
            ]

    if format:
        try:
            utime = time.mktime(datetime.datetime.strptime(date_str, format).timetuple())
            return utime
        except:
            return None

    for format in formats:
        try:
            utime = time.mktime(datetime.datetime.strptime(date_str, format).timetuple())
            return utime
        except:
            pass

    return None


def load_file(logfile=None, format=None):
    logs = []

    fd = open(logfile, "r")
    for line in fd.readlines():
        try:
            timestr, desc = line.strip().split(",")
            utime = timeformat_detecter(timestr, format)
            logs.append({'utime': utime, 'desc': desc})
        except:
            print("Decode error, {}".format(line))

    return logs


def decode_pipe(format=None):
    logs = []

    for line in sys.stdin:
        try:
            timestr, desc = line.strip().split(",")
            utime = timeformat_detecter(timestr, format)
            logs.append({'utime': utime, 'desc': desc})
        except:
            print("Decode error, {}".format(line))

    return logs


def logs2stats(logs, utime_from=None, utime_to=None):
    import shutil
    import math

    # Get from, to
    logs_from = min([l['utime'] for l in logs])
    logs_to = max([l['utime'] for l in logs])

    utime_from = utime_from if utime_from else logs_from
    utime_to = utime_to if utime_to else logs_to

    # Get Width
    try:
        terminal_size = shutil.get_terminal_size((80, 20))
        width_max = terminal_size.columns
    except:
        width_max = 80

    desc_max = max(len(l['desc']) for l in logs)
    width = width_max - desc_max - 5

    if width <= 10:
        raise Exception("Terminal width is too small.")

    # Calcurate stats
    stats = {}
    unit = (utime_to - utime_from) / width

    for log in logs:
        desc = log['desc']
        if len(desc) < desc_max:
            desc += ' ' * (desc_max - len(desc))

        utime = log['utime']
        timeslot = int(math.floor((utime - utime_from) / unit))
        if timeslot == width:
            timeslot -= 1

        if desc not in stats.keys():
            stats[desc] = [0] * width
        stats[desc][timeslot] += 1

    num_max = max([max(s) for s in stats.values()])
    median = math.floor(num_max / 2)

    return stats, utime_from, utime_to, median


def print_header(utime_from, utime_to):
    import datetime

    date_from = datetime.datetime.fromtimestamp(utime_from)
    date_to = datetime.datetime.fromtimestamp(utime_to)

    print("State from: {}".format(date_from))
    print("State to:   {}".format(date_to))

    return


def print_stats(log_stats, log_median):
    for desc in log_stats.keys():
        timeline = ''
        for count in log_stats[desc]:
            cchar = "."
            if count == 0:
                cchar = '.'
            elif count < log_median:
                cchar = 'i'
            else:
                cchar = 'l'
            timeline += cchar
        print("{} : {}".format(desc, timeline))


def main():
    parser = argparse.ArgumentParser(description='Pipe <date>,<desc> log data.')
    parser.add_argument('--format', dest='date_format', type=str)
    parser.add_argument('--from', dest='date_from', type=str)
    parser.add_argument('--to', dest='date_to', type=str)

    if sys.stdin.isatty():
        parser.add_argument('logfile', type=str)

    args = parser.parse_args()

    dtime_format = args.date_format
    dtime_from = args.date_from
    dtime_to = args.date_to

    if sys.stdin.isatty():
        logfile = args.logfile
        logs = load_file(logfile, dtime_format)
    else:
        logs = decode_pipe(dtime_format)

    utime_from = timeformat_detecter(dtime_from, dtime_format)
    utime_to = timeformat_detecter(dtime_to, dtime_format)
    log_stats, utime_from, utime_to, log_median = logs2stats(logs, utime_from, utime_from)

    print_header(utime_from, utime_to)
    print_stats(log_stats, log_median)


if __name__ == '__main__':
    main()

こんなところで。
。。。こういうツール思いつかなかったので作ったのですが、標準ではいっているツールであったら切ないなぁと思いつつ。
なんかありそうな気もするんですけどね。。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?