2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Splunkで時系列データを集計する

Posted at

Splunk Answers でいろいろやったので。
#データ

data.spl
| makeresults 
| eval _raw="user,dest_ip,bytes,start_time,end_time
A,10.10.10.1,330,16/01/2020 09:30,16/01/2020 09:31
B,10.10.10.2,4859519,17/01/2020 14:21,17/01/2020 17:19
B,10.10.10.2,3370174,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,238523,17/01/2020 14:21,17/01/2020 16:19
B,10.10.10.2,3166705,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,2783036,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,3174766,17/01/2020 14:21,17/01/2020 15:39
B,10.10.10.2,552,17/01/2020 14:33,17/01/2020 14:34
B,10.10.10.2,24396,17/01/2020 15:56,17/01/2020 15:59
B,10.10.10.2,675660,17/01/2020 15:57,17/01/2020 16:33
B,10.10.10.2,162019,17/01/2020 15:57,17/01/2020 16:05
B,10.10.10.2,6388,17/01/2020 15:57,17/01/2020 15:59
B,10.10.10.2,398887,17/01/2020 16:00,17/01/2020 16:13
B,10.10.10.2,294,17/01/2020 16:00,17/01/2020 16:00
B,10.10.10.2,35324,17/01/2020 16:01,17/01/2020 16:04
B,10.10.10.2,294,17/01/2020 16:06,17/01/2020 16:07
B,10.10.10.2,181888,17/01/2020 16:11,17/01/2020 17:07
B,10.10.10.2,25668,17/01/2020 16:11,17/01/2020 16:14
B,10.10.10.2,517341,17/01/2020 16:19,17/01/2020 16:33
B,10.10.10.2,518877,17/01/2020 16:19,17/01/2020 16:33
B,10.10.10.2,80488,17/01/2020 16:27,17/01/2020 16:33
B,10.10.10.2,294,17/01/2020 16:27,17/01/2020 16:28
B,10.10.10.2,294,17/01/2020 16:27,17/01/2020 16:28
B,10.10.10.2,190050,17/01/2020 16:28,17/01/2020 16:33
B,10.10.10.2,39429,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,158281,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,39731,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,40338,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,100225,17/01/2020 16:36,17/01/2020 16:39
B,10.10.10.2,23526,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,28910,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,40860,17/01/2020 16:39,17/01/2020 16:43
B,10.10.10.2,14456,17/01/2020 16:39,17/01/2020 16:42
B,10.10.10.2,15816,17/01/2020 16:39,17/01/2020 16:42
C,10.10.10.2,6354,24/01/2020 13:36,24/01/2020 13:36
C,10.10.10.2,552,24/01/2020 13:55,24/01/2020 13:55
C,10.10.10.2,82751,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,254943,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,56569,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,104603,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,88522,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,168563,27/01/2020 09:06,27/01/2020 09:09
C,10.10.10.2,11535,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,8114,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,4691,27/01/2020 09:07,27/01/2020 09:09
C,10.10.10.2,25653,27/01/2020 09:07,27/01/2020 09:21
C,10.10.10.2,66339,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,52219,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,26527,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,7349,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,19109,27/01/2020 09:09,27/01/2020 09:11
C,10.10.10.2,20854,27/01/2020 09:09,27/01/2020 09:11
D,10.10.10.3,46783593,20/01/2020 15:27,20/01/2020 15:52
D,10.10.10.4,49106343,20/01/2020 15:27,20/01/2020 16:23
D,10.10.10.5,1033049,20/01/2020 15:38,20/01/2020 16:23
D,10.10.10.6,287151,20/01/2020 16:23,20/01/2020 16:24
D,10.10.10.4,132,20/01/2020 16:24,20/01/2020 16:24
D,10.10.10.4,62,20/01/2020 16:24,20/01/2020 16:24
D,10.10.10.5,4405811,20/01/2020 16:24,20/01/2020 16:59
D,10.10.10.4,18716525,20/01/2020 16:24,20/01/2020 16:44
D,10.10.10.7,40444,21/01/2020 09:21,21/01/2020 09:22
D,10.10.10.7,173988,21/01/2020 09:22,21/01/2020 09:24
D,10.10.10.7,2755,21/01/2020 09:24,21/01/2020 09:24
D,10.10.10.3,1148,21/01/2020 09:45,21/01/2020 09:46
D,10.10.10.3,7131,21/01/2020 09:46,21/01/2020 09:54
D,10.10.10.3,35138614,21/01/2020 09:54,21/01/2020 10:47" 
| multikv forceheader=1
| table user,dest_ip,bytes,start_time,end_time

以下このデータを使用する。
#目的
ユーザ、宛先ごとに、経過時間を調べる。
#問題点

  1. 複数のアクセスログがある中、例えばユーザBの経過時間は
    17/01/2020 14:21,17/01/2020 17:19になる。
  2. もちろん日付をまたぐこともある。ただし1日程度
  3. もともとがCSVのデータなので時間のデータが分単位

#解決方法

ToAggregateParHour.spl
    ....
 | table user,dest_ip,bytes,start_time,end_time 
 | foreach *_time 
     [ eval <<FIELD>> = round(strptime('<<FIELD>>',"%d/%m/%Y %H:%M")) 
     | eval end_time=end_time+60] 
 | sort user start_time
 | eval user_dest=user.":".dest_ip
 | streamstats current=f min(start_time) as p_start max(end_time) as t_end by user_dest 
 | eval change_flag= if(t_end <= start_time,"over",NULL) 
 | streamstats count(eval(change_flag="over")) as session by user_dest 
 | stats min(start_time) as start_time max(end_time) as end_time by user_dest session 
 | eval duration = end_time - start_time
 | eval date=if(strftime(start_time,"%d")!=strftime(end_time,"%d"),mvappend(strftime(start_time,"%d"),strftime(end_time,"%d")),strftime(start_time,"%d")) 
 | mvexpand date
 | eval time_d =round(relative_time(start_time,"+1d@d")) 
 | eval duration=if(date=tonumber(strftime(start_time,"%d")), time_d - start_time,start_time + duration - time_d) 
 | stats sum(duration) as duration by date user_dest 
 | table user_dest duration date 
 | eval duration=replace(tostring(duration,"duration"),"(\d+)\:(\d+)\:(\d+)","\1h \2min \3s") 
 | foreach *_* 
     [ eval <<MATCHSEG1>>=mvindex(split(user_dest,":"),0) 
     | eval <<MATCHSEG2>>=mvindex(split(user_dest,":"),1)] 
 | table user dest duration date
 | sort user dest date

#解説

  1. とりあえず分単位で始まりと終わりの時間が一緒のデータもあるので一律60秒追加
  2. 複数フィールドを扱うのはしんどいので_user_と_dest_ip_をまとめた。
  3. _end_time_より_start_time_が大きくなる。つまり一旦通信が途切れるまでをstreamstatsを使ってセッション化
  4. ユーザおよび宛先で最小_start_time_と最大_end_time_をstreamstatsを使って集計。
    epoch時間はminmaxの方が信頼できる気がする。
  5. 一旦経過時間を計算
  6. 始まりの時間と終わりの時間で日付が変わっているか確認して、変わっている場合ログを複製している。
  7. 日付が変わっているユーザ等に対して、計算。
  8. ユーザ、宛先毎集計
  9. 経過時間等を読みやすいように変換
  10. お疲れ様です。

#はじめのバージョン

いろいろやってみた.spl
-----
....
| foreach *_time 
    [ eval <<FIELD>> = round(strptime('<<FIELD>>',"%d/%m/%Y %H:%M")) 
    | eval b_<<MATCHSTR>> = strftime('<<FIELD>>',"%d/%m %H:%M")] 
| table user dest_ip bytes b_start start_time b_end end_time 
| streamstats min(start_time) as start_p max(end_time) as end_p by user 
| streamstats min(eval(min(end_p,start_time))) as start_p max(eval(max(end_p,end_time))) as end_p by user 
| table user dest_ip bytes b_start start_time start_p b_end end_time end_p 
| eval start_check =nullif(start_time >= start_p AND end_time >= end_p,null()) 
| fields - dest_ip bytes 
| streamstats dc(eval(mvzip(start_p,end_p))) as flag 
| dedup flag 
| eval date=strftime(start_time,"%d") 
| streamstats min(start_time) as start_p max(end_time) as end_p current=f by user date 
| reverse 
| dedup start_time user 
| reverse 
| eval duration=end_time - start_time + 60 
| stats sum(duration) as duration by user

ユーザごと集計してみた、最初のバージョン。
streamstatsdedupが一番上の値だけを残すことを利用して色々やっていた。
nullifのところで一番小さい_start_time_と一番大きい_end_time_を探している。

#まとめ
単純にログを数えるのではなく、ログの状態を数えるとなると難易度が上がります。
基本的に切り替わりを押さえてstreamstatsでセッション化がうまくいけばなんとなると思います。
今回だと通信が途切れるところに_change_flag_を作ってそこで区切りました。
生ログからだと| transaction user_dest sessionが使用できると思うので、もっと簡単にできるかもしれません。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?