モチベ
SIEM入れるほどしっかり作ってない環境でサーバのログを眺めている際、かんたんなタイムチャートを表示させてログの傾向を見たいなぁ、、というケースがあります。
ちょっと雑ですが、かんたんなスクリプト作ってみました。
数日後にはどっかにやってしまいそうなので、保存がてら。
動作結果
こんなふうになるスクリプトです。
$ sudo cat /var/log/secure* | grep sshd | egrep "(Accepted|Failed)" | awk '{print $1,$2,$3","$11}' | ./log2timeline.py
State from: 1900-10-20 22:45:43
State to: 1900-10-22 10:41:00
10.0.0.1 : lii....l.i....i..........i..i.il.......i....................................ii
10.0.0.128 : ............................................................................il
時刻が1900年になってしまっているのは年数表示がないrsyslogの標準形式のログを食わせているせいなんですが、まぁ、何をしたいかはおわかりただけるかなと。
コード
こんなです。
かなり雑に作っちゃているので、使いながらちょこちょこ手直ししていこうかと思ってます。
今回の学びは頭の処理。
以下を使わせていただきました。
bashとpythonでのヒアドキュメントの仕様の違いをつかう、bash的にはエラーが出るけどexecした先で処理を終えてしまうからエラー表示が出るまでに処理がでない。
なるほどーと目から鱗が落ちる処理でした。
https://stackoverflow.com/questions/69759506/is-it-possible-to-construct-a-shebang-that-works-for-both-python-2-and-3
まぁ、そもそもCentos 7用にpython2でも動作するように調整するのもめんどくさかったのですが。。。
Centos7、早くEOLしてしまってほしい。。。
$ cat log2timeline.py
#!/usr/bin/env bash
_=''''
command -v python3 &> /dev/null && exec python3 $0 "$@" || exec python $0 "$@"
'''
import sys
import argparse
def timeformat_detecter(date_str, format=format):
import datetime
import time
formats = [
"%Y/%m/%d %Y:%M:%S.%f",
"%Y/%m/%d %Y:%M:%S",
"%b %d %H:%M:%S",
]
if format:
try:
utime = time.mktime(datetime.datetime.strptime(date_str, format).timetuple())
return utime
except:
return None
for format in formats:
try:
utime = time.mktime(datetime.datetime.strptime(date_str, format).timetuple())
return utime
except:
pass
return None
def load_file(logfile=None, format=None):
logs = []
fd = open(logfile, "r")
for line in fd.readlines():
try:
timestr, desc = line.strip().split(",")
utime = timeformat_detecter(timestr, format)
logs.append({'utime': utime, 'desc': desc})
except:
print("Decode error, {}".format(line))
return logs
def decode_pipe(format=None):
logs = []
for line in sys.stdin:
try:
timestr, desc = line.strip().split(",")
utime = timeformat_detecter(timestr, format)
logs.append({'utime': utime, 'desc': desc})
except:
print("Decode error, {}".format(line))
return logs
def logs2stats(logs, utime_from=None, utime_to=None):
import shutil
import math
# Get from, to
logs_from = min([l['utime'] for l in logs])
logs_to = max([l['utime'] for l in logs])
utime_from = utime_from if utime_from else logs_from
utime_to = utime_to if utime_to else logs_to
# Get Width
try:
terminal_size = shutil.get_terminal_size((80, 20))
width_max = terminal_size.columns
except:
width_max = 80
desc_max = max(len(l['desc']) for l in logs)
width = width_max - desc_max - 5
if width <= 10:
raise Exception("Terminal width is too small.")
# Calcurate stats
stats = {}
unit = (utime_to - utime_from) / width
for log in logs:
desc = log['desc']
if len(desc) < desc_max:
desc += ' ' * (desc_max - len(desc))
utime = log['utime']
timeslot = int(math.floor((utime - utime_from) / unit))
if timeslot == width:
timeslot -= 1
if desc not in stats.keys():
stats[desc] = [0] * width
stats[desc][timeslot] += 1
num_max = max([max(s) for s in stats.values()])
median = math.floor(num_max / 2)
return stats, utime_from, utime_to, median
def print_header(utime_from, utime_to):
import datetime
date_from = datetime.datetime.fromtimestamp(utime_from)
date_to = datetime.datetime.fromtimestamp(utime_to)
print("State from: {}".format(date_from))
print("State to: {}".format(date_to))
return
def print_stats(log_stats, log_median):
for desc in log_stats.keys():
timeline = ''
for count in log_stats[desc]:
cchar = "."
if count == 0:
cchar = '.'
elif count < log_median:
cchar = 'i'
else:
cchar = 'l'
timeline += cchar
print("{} : {}".format(desc, timeline))
def main():
parser = argparse.ArgumentParser(description='Pipe <date>,<desc> log data.')
parser.add_argument('--format', dest='date_format', type=str)
parser.add_argument('--from', dest='date_from', type=str)
parser.add_argument('--to', dest='date_to', type=str)
if sys.stdin.isatty():
parser.add_argument('logfile', type=str)
args = parser.parse_args()
dtime_format = args.date_format
dtime_from = args.date_from
dtime_to = args.date_to
if sys.stdin.isatty():
logfile = args.logfile
logs = load_file(logfile, dtime_format)
else:
logs = decode_pipe(dtime_format)
utime_from = timeformat_detecter(dtime_from, dtime_format)
utime_to = timeformat_detecter(dtime_to, dtime_format)
log_stats, utime_from, utime_to, log_median = logs2stats(logs, utime_from, utime_from)
print_header(utime_from, utime_to)
print_stats(log_stats, log_median)
if __name__ == '__main__':
main()
こんなところで。
。。。こういうツール思いつかなかったので作ったのですが、標準ではいっているツールであったら切ないなぁと思いつつ。
なんかありそうな気もするんですけどね。。