1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Splunkで時系列データを集計する その2

Last updated at Posted at 2020-03-10

その1でもいろいろやったが、それよりも面倒なやつをやったので忘備録としてまとめます。
#課題

[02/18/2020 09:45:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:35.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:48:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 18:25:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 19:25:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX

なログがあり、_STARTING_から_SUCCESS_までの時間の各5分ごとにJOB(例:CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M)数を数え、時系列データで表示したい。

#問題
1.各時間のログがない。
みてのとおり、STARING RUNNING _SUCCESS_の三つの状態のログしかなく、その間の経過時間は集計できる。
しかし、その間の各5分間のログはもちろんない。
2.状態が三つ
二つだと楽。しかし三つ、しかも5分間隔だと同時に動くJOBがある。
3.最後はグラフ表示
qiita3.png
こんな形をご所望でした。
#作りました。

timeline2.spl
| makeresults
| eval _raw="[02/18/2020 09:45:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:35.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:48:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 18:25:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 19:25:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX"
| makemv delim="
" _raw
| stats count by _raw
| rex "\[(?<timestamp>.*)\] (?<session>\S+) .*STATUS: (?<status>\S+) JOB: (?<job>\S+) MACHINE: (?<machine>\S+)"
| table timestamp session status job machine
| eval _time=strptime(timestamp,"%m/%d/%Y %T.%3Q")
| eval status=if(status="STARTING" OR status="RUNNING","RUNNING",status)
| eval job_status=job.":".status
| timechart cont=f span=5m count by job_status
| makecontinuous span=5m _time
| foreach *:RUNNING [ eval <<FIELD>> = nullif('<<FIELD>>','<<MATCHSTR>>:SUCCESS')]
| foreach *:SUCCESS [ eval <<FIELD>> = if(isnull('<<MATCHSTR>>:RUNNING') AND '<<FIELD>>'=0 ,NULL, '<<FIELD>>')]
| reverse
| filldown *:SUCCESS
| reverse
| foreach *:RUNNING [ eval <<FIELD>> = if(isnull('<<FIELD>>') AND '<<MATCHSTR>>:SUCCESS' > 0 , 1, '<<FIELD>>' )]
| filldown *:RUNNING
| foreach *:* [ eval <<MATCHSEG1>> = max('<<MATCHSEG1>>:RUNNING','<<MATCHSEG1>>:SUCCESS')]
| fields - *:*

#解説

  • | eval status|eval job_statusのところは2つのフィールドを同時に扱うのはしんどかったのでまとめました。
  • | timechart cont=f と makecontinuousで ログの時間で各5分間ごとの時間を作成
  • reverse前のforeachは時間によって別なJOBが両方0な場合があるのでNULLにしている。

    これはfilldownを使用する前の準備

    あえてnullifを使ってNULLを作ってみました。
  • filldown*(アスタリスク)が使用できるので、こんなことができます。
  • reverseを2回やっているのはひっくり返して最後(SUCCESS)から1を埋めていっている。そうすると2番目(RUNNING)の0のところで止まってくれる。
    それで元に戻している。
  • 次のforeachは最初から2番目までを1で埋める前の下準備でJOBが2の時刻以外を10に戻している。
  • 最後のforeachで2つの状態の値があるので、最終的にJOBの名前に統一してあげている。
    evalmaxが使えるので(2,1,0)の値の一番大きい値としている。
  • お疲れ様でした。

#苦労したところ

  1. timechart cont=fからmakecontinuousはやってみたらすんなり出来た
  2. streamstatsで頑張れるかなとも思ったけど、JOB数が増えることに対応出来なさそうだったから早急に諦めました。stats count(any*) as *は可能だけどevalを使うとなると*が使えないので無理ですね。
  3. filldownを使うのは早々と決めたがJOBが2のところで大苦戦。こいつを次の時間は1にしなきゃいけない。
    | foreach *:RUNNING [ eval <<FIELD>> = if(isnull('<<FIELD>>') AND '<<MATCHSTR>>:SUCCESS' > 0 , 1, '<<FIELD>>' )]のところの前後で一回値を確かめてみてください。:cry:
  4. timechartで時間を作っているのでどうしてもJOBの片方は0 0の時がある。evalでいれてもよかったかもしれなかったけど、あえてforeachでJOBが増えても大丈夫にしている。
    SPLは値を保持出来ないのでこんな感じでやってます。もっとスッキリとやりたかった:sweat:

#やりなおしてみた

timechart2.spl
| makeresults
| eval _raw="[02/18/2020 09:45:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: STARTING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:47:35.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 10:48:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: RUNNING JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 18:25:15.1318] CAUAJM_I_40245 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_C_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX
[02/18/2020 19:25:15.1318] CAUAJM_I_40244 EVENT: CHANGE_STATUS STATUS: SUCCESS JOB: CFDW_ADHOC_A_AIMSAS_D_INV_LNITEM_BILLING_CHGS_M MACHINE: XXXX"
| makemv delim="
" _raw
| stats count by _raw
| rex "\[(?<timestamp>.*)\] (?<session>\S+) .*STATUS: (?<status>\S+) JOB: (?<job>\S+) MACHINE: (?<machine>\S+)"
| table timestamp session status job machine
| eval _time=strptime(timestamp,"%m/%d/%Y %T.%3Q")
| bin span=5min _time
| reverse
| streamstats values(_time) as range  by session
| reverse
| eval start=mvindex(range,0), end=mvindex(range,1)
| streamstats current=f values(start) as start_p1 by job
| eval end=if(start=start_p1,null(),end)
| eval timerange=if(isnull(end),start,mvrange(start,end,300))
| fields _time status job timerange
| mvexpand timerange
| rename timerange as _time
| table _time job status
| timechart span=5m count(status) as count by job

すっきり、画面は一緒になったので省略
#解説その2
よくよく考えてみると、その1でやったmvexpandが使えんじゃね。ということでやってみた。

binで時間を整えてreversestreamstatsで経過時間を抽出。

やはりjob数が2になるところで苦戦:cry: 重複してログが出来てしまう。
結局直前の時間と比較| eval end=if(start=start_p1,null(),end)して| eval timerange=if(isnull(end),start,mvrange(start,end,300))で時間を作ってあげることで解決した。
こうすることで、重複している時間とstatus="SUCCESS"のところはログが一つしか作らなくなる。

あとはmvexpandしてあげれば、素直にログができるので、timechartで数えておしまい。
:sweat:なんだろう、この素直さは
#まとめ
ログがあれば、数えて終わりだと思います。

無いログを作れるのがSplunkのいいところ。
似たような案件を抱えている方がいましたら、japaneseのタグをつけて日本語でSplunk>Answersに質問してみてください。
サンプルログ必須ですけど、クエリーは作成します。

ダッシュボードは心の師匠に聞いてください


#####更新履歴
R2.3.11 やりなおしてみた を追加

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?