Splunk Answers でいろいろやったので。
#データ
data.spl
| makeresults
| eval _raw="user,dest_ip,bytes,start_time,end_time
A,10.10.10.1,330,16/01/2020 09:30,16/01/2020 09:31
B,10.10.10.2,4859519,17/01/2020 14:21,17/01/2020 17:19
B,10.10.10.2,3370174,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,238523,17/01/2020 14:21,17/01/2020 16:19
B,10.10.10.2,3166705,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,2783036,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,3174766,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,552,17/01/2020 14:33,17/01/2020 14:34
B,10.10.10.2,24396,17/01/2020 15:56,17/01/2020 15:59
B,10.10.10.2,675660,17/01/2020 15:57,17/01/2020 16:33
B,10.10.10.2,162019,17/01/2020 15:57,17/01/2020 16:05
B,10.10.10.2,6388,17/01/2020 15:57,17/01/2020 15:59
B,10.10.10.2,398887,17/01/2020 16:00,17/01/2020 16:13
B,10.10.10.2,294,17/01/2020 16:00,17/01/2020 16:00
B,10.10.10.2,35324,17/01/2020 16:01,17/01/2020 16:04
B,10.10.10.2,294,17/01/2020 16:06,17/01/2020 16:07
B,10.10.10.2,181888,17/01/2020 16:11,17/01/2020 17:07
B,10.10.10.2,25668,17/01/2020 16:11,17/01/2020 16:14
B,10.10.10.2,517341,17/01/2020 16:19,17/01/2020 16:33
B,10.10.10.2,518877,17/01/2020 16:19,17/01/2020 16:33
B,10.10.10.2,80488,17/01/2020 16:27,17/01/2020 16:33
B,10.10.10.2,294,17/01/2020 16:27,17/01/2020 16:28
B,10.10.10.2,294,17/01/2020 16:27,17/01/2020 16:28
B,10.10.10.2,190050,17/01/2020 16:28,17/01/2020 16:33
B,10.10.10.2,39429,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,158281,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,39731,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,40338,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,100225,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,23526,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,28910,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,40860,17/01/2020 16:39,17/01/2020 16:43
B,10.10.10.2,14456,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,15816,17/01/2020 16:39,17/01/2020 16:42
C,10.10.10.2,6354,24/01/2020 13:36,24/01/2020 13:36
C,10.10.10.2,552,24/01/2020 13:55,24/01/2020 13:55
C,10.10.10.2,82751,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,254943,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,56569,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,104603,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,88522,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,168563,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,11535,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,8114,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,4691,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,25653,27/01/2020 09:07,27/01/2020 09:21
C,10.10.10.2,66339,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,52219,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,26527,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,7349,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,19109,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,20854,27/01/2020 09:09,27/01/2020 09:11
D,10.10.10.3,46783593,20/01/2020 15:27,20/01/2020 15:52
D,10.10.10.4,49106343,20/01/2020 15:27,20/01/2020 16:23
D,10.10.10.5,1033049,20/01/2020 15:38,20/01/2020 16:23
D,10.10.10.6,287151,20/01/2020 16:23,20/01/2020 16:24
D,10.10.10.4,132,20/01/2020 16:24,20/01/2020 16:24
D,10.10.10.4,62,20/01/2020 16:24,20/01/2020 16:24
D,10.10.10.5,4405811,20/01/2020 16:24,20/01/2020 16:59
D,10.10.10.4,18716525,20/01/2020 16:24,20/01/2020 16:44
D,10.10.10.7,40444,21/01/2020 09:21,21/01/2020 09:22
D,10.10.10.7,173988,21/01/2020 09:22,21/01/2020 09:24
D,10.10.10.7,2755,21/01/2020 09:24,21/01/2020 09:24
D,10.10.10.3,1148,21/01/2020 09:45,21/01/2020 09:46
D,10.10.10.3,7131,21/01/2020 09:46,21/01/2020 09:54
D,10.10.10.3,35138614,21/01/2020 09:54,21/01/2020 10:47"
| multikv forceheader=1
| table user,dest_ip,bytes,start_time,end_time
以下このデータを使用する。
#目的
ユーザ、宛先ごとに、経過時間を調べる。
#問題点
- 複数のアクセスログがある中、例えばユーザBの経過時間は
17/01/2020 14:21,17/01/2020 17:19
になる。 - もちろん日付をまたぐこともある。ただし1日程度
- もともとがCSVのデータなので時間のデータが分単位
#解決方法
ToAggregateParHour.spl
....
| table user,dest_ip,bytes,start_time,end_time
| foreach *_time
[ eval <<FIELD>> = round(strptime('<<FIELD>>',"%d/%m/%Y %H:%M"))
| eval end_time=end_time+60]
| sort user start_time
| eval user_dest=user.":".dest_ip
| streamstats current=f min(start_time) as p_start max(end_time) as t_end by user_dest
| eval change_flag= if(t_end <= start_time,"over",NULL)
| streamstats count(eval(change_flag="over")) as session by user_dest
| stats min(start_time) as start_time max(end_time) as end_time by user_dest session
| eval duration = end_time - start_time
| eval date=if(strftime(start_time,"%d")!=strftime(end_time,"%d"),mvappend(strftime(start_time,"%d"),strftime(end_time,"%d")),strftime(start_time,"%d"))
| mvexpand date
| eval time_d =round(relative_time(start_time,"+1d@d"))
| eval duration=if(date=tonumber(strftime(start_time,"%d")), time_d - start_time,start_time + duration - time_d)
| stats sum(duration) as duration by date user_dest
| table user_dest duration date
| eval duration=replace(tostring(duration,"duration"),"(\d+)\:(\d+)\:(\d+)","\1h \2min \3s")
| foreach *_*
[ eval <<MATCHSEG1>>=mvindex(split(user_dest,":"),0)
| eval <<MATCHSEG2>>=mvindex(split(user_dest,":"),1)]
| table user dest duration date
| sort user dest date
#解説
- とりあえず分単位で始まりと終わりの時間が一緒のデータもあるので一律60秒追加
- 複数フィールドを扱うのはしんどいので_user_と_dest_ip_をまとめた。
- _end_time_より_start_time_が大きくなる。つまり一旦通信が途切れるまでを
streamstats
を使ってセッション化 - ユーザおよび宛先で最小_start_time_と最大_end_time_を
streamstats
を使って集計。
epoch時間はmin
やmax
の方が信頼できる気がする。 - 一旦経過時間を計算
- 始まりの時間と終わりの時間で日付が変わっているか確認して、変わっている場合ログを複製している。
- 日付が変わっているユーザ等に対して、計算。
- ユーザ、宛先毎集計
- 経過時間等を読みやすいように変換
- お疲れ様です。
#はじめのバージョン
いろいろやってみた.spl
-----
....
| foreach *_time
[ eval <<FIELD>> = round(strptime('<<FIELD>>',"%d/%m/%Y %H:%M"))
| eval b_<<MATCHSTR>> = strftime('<<FIELD>>',"%d/%m %H:%M")]
| table user dest_ip bytes b_start start_time b_end end_time
| streamstats min(start_time) as start_p max(end_time) as end_p by user
| streamstats min(eval(min(end_p,start_time))) as start_p max(eval(max(end_p,end_time))) as end_p by user
| table user dest_ip bytes b_start start_time start_p b_end end_time end_p
| eval start_check =nullif(start_time >= start_p AND end_time >= end_p,null())
| fields - dest_ip bytes
| streamstats dc(eval(mvzip(start_p,end_p))) as flag
| dedup flag
| eval date=strftime(start_time,"%d")
| streamstats min(start_time) as start_p max(end_time) as end_p current=f by user date
| reverse
| dedup start_time user
| reverse
| eval duration=end_time - start_time + 60
| stats sum(duration) as duration by user
ユーザごと集計してみた、最初のバージョン。
streamstats
とdedup
が一番上の値だけを残すことを利用して色々やっていた。
nullif
のところで一番小さい_start_time_と一番大きい_end_time_を探している。
#まとめ
単純にログを数えるのではなく、ログの状態を数えるとなると難易度が上がります。
基本的に切り替わりを押さえてstreamstats
でセッション化がうまくいけばなんとなると思います。
今回だと通信が途切れるところに_change_flag_を作ってそこで区切りました。
生ログからだと| transaction user_dest session
が使用できると思うので、もっと簡単にできるかもしれません。