Recursively join events on child to parent fields to build chains
子フィールドから親フィールドへのイベントを再帰的に結合し、チェーンを構築します。
Splunk Answersで出会ったすごい回答の紹介。
#課題
child_id | parent_id |
---|---|
null | A1 |
null | B1 |
A1 | A2 |
B1 | B2 |
A2 | C1 |
B2 | C1 |
C1 | C2 |
C2 | D1 |
C2 | E1 |
を
child_id | chain |
---|---|
A1 | A1 |
A2 | A2 -> A1 |
B1 | B1 |
B2 | B2 -> B1 |
C1 | C1 -> A2 -> A1 |
C1 | C1 -> B2 -> B1 |
C2 | C2 -> C1 -> A2 -> A1 |
C2 | C2 -> C1 -> B2 -> B1 |
D1 | D1 -> C2 -> C1 -> A2 -> A1 |
D1 | D1 -> C2 -> C1 -> B2 -> B1 |
E1 | E1 -> C2 -> C1 -> A2 -> A1 |
E1 | E1 -> C2 -> C1 -> B2 -> B1 |
にしたい。
#問題点
| makeresults
| eval _raw="child_id parent_id
null A1
null B1
A1 A2
B1 B2
A2 C1
B2 C1
C1 C2
C2 D1
C2 E1"
| multikv forceheader=1
| table parent_id child_id
| eval AP1=parent_id,C1=child_id
| eval data=AP1.",".C1
| eventstats values(data) as data
| streamstats count as session
| mvexpand data
| eval C2=if(mvindex(split(data,","),0)=C1,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C3=if(mvindex(split(data,","),0)=C2,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C4=if(mvindex(split(data,","),0)=C3,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C5=if(mvindex(split(data,","),0)=C4,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| stats values(*) as * by session
| fields - data
| table session parent_id C*
深さが事前にわかっていれば、こんな感じで無理やり検索ができる。
でも事前に深さなんてわからない・・・
SPLは逐次検索していくので、for
みたいにカウンターを増やしながらループするのはとても難しい。
「再帰関数を学ぶと、どんな世界が広がるか」の「1-3-動的計画法-メモ化再帰-へ」のように、計算量の問題もあり、一時記憶しておく場所が必要。
#回答例
|makeresults |fields - _time |eval child_id="null", parent_id="A1"
|append [|makeresults |fields - _time |eval child_id="null", parent_id="B1"]
|append [|makeresults |fields - _time |eval child_id="A1", parent_id="A2"]
|append [|makeresults |fields - _time |eval child_id="B1", parent_id="B2"]
|append [|makeresults |fields - _time |eval child_id="A2", parent_id="C1"]
|append [|makeresults |fields - _time |eval child_id="B2", parent_id="C1"]
|append [|makeresults |fields - _time |eval child_id="C1", parent_id="C2"]
|append [|makeresults |fields - _time |eval child_id="C2", parent_id="D1"]
|append [|makeresults |fields - _time |eval child_id="C2", parent_id="E1"]
|rename child_id as parent parent_id as child
|eval line=child."<-".parent
|eventstats values(parent) as parents by child
|eval depth=1
|outputlookup tree.csv
|map maxsearches=100 search="|inputlookup tree.csv
|eval con=mvindex(split(line, \"<-\"), -1)
|join type=left con [|inputlookup tree.csv |rename child as con parents as parents_2 |fields con parents_2]
|fillnull parents_2 value=\"null\"
|makemv parents_2
|mvexpand parents_2
|eval line=line.\"<-\".parents_2
|eval depth=depth+1
|outputlookup tree.csv"
|eventstats max(depth) as max_depth
|where depth==max_depth
|eval line=rtrim(line, "<-null")."<-null"
|stats values(line) as lines by child
※一番最初に動して lookupが定義されていないよ とでたら、ルックアップでlookup定義 -> tree.csv
としてください。
##結果
child | lines |
---|---|
A1 | A1<-null |
A2 | A2<-A1<-null |
B1 | B1<-null |
B2 | B2<-B1<-null |
C1 | C1<-A2<-A1<-null C1<-B2<-B1<-null |
C2 | C2<-C1<-A2<-A1<-null C2<-C1<-B2<-B1<-null |
D1 | D1<-C2<-C1<-A2<-A1<-null D1<-C2<-C1<-B2<-B1<-null |
E1 | E1<-C2<-C1<-A2<-A1<-null E1<-C2<-C1<-B2<-B1<-null |
##解説
- 子供に対し、親がいくつあるか確認して、「子供 <- 親」を
line
、深さdepth
も含めてtree.csv
に出力 -
map
を利用してループ開始。maxsearches=100
は適当らしい。 -
inputlookup
を利用してtree.csv
を読み込んだら、mvindex
で一番端(その時の親)を読み出す - それを、今度は
tree.csv
の子供と比較してjoin
してあげている。 - その際、親が二人いたら
makemv
とmvexpand
で2行にしている。 - 再度
tree.csv
に出力 -
where
で最終出力版のみ選択して、整えて出力している。
凄すぎ。
#まとめ
「Splunkで自作命令CPUエミュを作る」をCSVを使わなくてもやってやるぜといろいろ試していたときに、問題となったのが一時的に値を記録しておく方法。
stats list()
くらいしか方法がないので、結局CSVに出力して、読み込むのがいいとなっって途中まで作って放置していました
この質問も、いったん力尽くでやってみて、**ループできね〜 記録しておけね〜**と苦しんだ時に神のような回答がでてきてすごく感動したのを思い出します。
Splunk Answersがリニューアルした時に見失って結構探していました。
今回ようやく見つかったので紹介しておきます。
$\tiny{Splunk Answersが更新されてから、この回答してくれた人もみなくなってしまって悲しい。}$