LoginSignup
2

More than 1 year has passed since last update.

Splunkで再帰的に検索する

Last updated at Posted at 2020-12-14

Recursively join events on child to parent fields to build chains
子フィールドから親フィールドへのイベントを再帰的に結合し、チェーンを構築します。

Splunk Answersで出会ったすごい回答の紹介。

課題

child_id parent_id
null A1
null B1
A1 A2
B1 B2
A2 C1
B2 C1
C1 C2
C2 D1
C2 E1

child_id chain
A1 A1
A2 A2 -> A1
B1 B1
B2 B2 -> B1
C1 C1 -> A2 -> A1
C1 C1 -> B2 -> B1
C2 C2 -> C1 -> A2 -> A1
C2 C2 -> C1 -> B2 -> B1
D1 D1 -> C2 -> C1 -> A2 -> A1
D1 D1 -> C2 -> C1 -> B2 -> B1
E1 E1 -> C2 -> C1 -> A2 -> A1
E1 E1 -> C2 -> C1 -> B2 -> B1

にしたい。

問題点

https://community.splunk.com/t5/Splunk-Search/Recursively-join-events-on-child-to-parent-fields-to-build/m-p/481327/highlight/true#M134885
で自分が回答したとおり

limited_recursive.spl
| makeresults 
| eval _raw="child_id parent_id
  null     A1
  null       B1
  A1         A2
  B1         B2
  A2         C1
  B2         C1
  C1         C2
  C2         D1
  C2         E1" 
| multikv forceheader=1 
| table parent_id child_id
| eval AP1=parent_id,C1=child_id
| eval data=AP1.",".C1
| eventstats values(data) as data
| streamstats count as session
| mvexpand data
| eval C2=if(mvindex(split(data,","),0)=C1,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C3=if(mvindex(split(data,","),0)=C2,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C4=if(mvindex(split(data,","),0)=C3,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| eval C5=if(mvindex(split(data,","),0)=C4,mvindex(split(data,","),1),NULL)
| stats values(*) as * by session
| mvexpand data
| stats values(*) as * by session
| fields - data
| table session parent_id C*

深さが事前にわかっていれば、こんな感じで無理やり検索ができる。
でも事前に深さなんてわからない・・・

SPLは逐次検索していくので、forみたいにカウンターを増やしながらループするのはとても難しい。

再帰関数を学ぶと、どんな世界が広がるか」の「1-3-動的計画法-メモ化再帰-へ」のように、計算量の問題もあり、一時記憶しておく場所が必要。

回答例

recursive_search.spl
|makeresults |fields - _time |eval child_id="null", parent_id="A1"
|append [|makeresults |fields - _time |eval child_id="null", parent_id="B1"]
|append [|makeresults |fields - _time |eval child_id="A1", parent_id="A2"]
|append [|makeresults |fields - _time |eval child_id="B1", parent_id="B2"] 
|append [|makeresults |fields - _time |eval child_id="A2", parent_id="C1"] 
|append [|makeresults |fields - _time |eval child_id="B2", parent_id="C1"] 
|append [|makeresults |fields - _time |eval child_id="C1", parent_id="C2"] 
|append [|makeresults |fields - _time |eval child_id="C2", parent_id="D1"]
|append [|makeresults |fields - _time |eval child_id="C2", parent_id="E1"]
|rename child_id as parent parent_id as child
|eval line=child."<-".parent
|eventstats values(parent) as parents by child
|eval depth=1
|outputlookup tree.csv
|map maxsearches=100 search="|inputlookup tree.csv
|eval con=mvindex(split(line, \"<-\"), -1)
|join type=left con [|inputlookup tree.csv |rename child as con parents as parents_2 |fields con parents_2]
|fillnull parents_2 value=\"null\"
|makemv parents_2
|mvexpand parents_2
|eval line=line.\"<-\".parents_2
|eval depth=depth+1
|outputlookup tree.csv"
|eventstats max(depth) as max_depth
|where depth==max_depth
|eval line=rtrim(line, "<-null")."<-null"
|stats values(line) as lines by child

※一番最初に動して lookupが定義されていないよ とでたら、ルックアップでlookup定義 -> tree.csvとしてください。

結果

child lines
A1 A1<-null
A2 A2<-A1<-null
B1 B1<-null
B2 B2<-B1<-null
C1 C1<-A2<-A1<-null
C1<-B2<-B1<-null
C2 C2<-C1<-A2<-A1<-null
C2<-C1<-B2<-B1<-null
D1 D1<-C2<-C1<-A2<-A1<-null
D1<-C2<-C1<-B2<-B1<-null
E1 E1<-C2<-C1<-A2<-A1<-null
E1<-C2<-C1<-B2<-B1<-null

解説

  1. 子供に対し、親がいくつあるか確認して、「子供 <- 親」をline、深さdepthも含めてtree.csvに出力
  2. mapを利用してループ開始。maxsearches=100は適当らしい。
  3. inputlookupを利用してtree.csvを読み込んだら、mvindexで一番端(その時の親)を読み出す
  4. それを、今度はtree.csvの子供と比較してjoinしてあげている。
  5. その際、親が二人いたらmakemvmvexpandで2行にしている。
  6. 再度tree.csvに出力
  7. whereで最終出力版のみ選択して、整えて出力している。

凄すぎ。

まとめ

Splunkで自作命令CPUエミュを作る」をCSVを使わなくてもやってやるぜといろいろ試していたときに、問題となったのが一時的に値を記録しておく方法。

stats list()くらいしか方法がないので、結局CSVに出力して、読み込むのがいいとなっって途中まで作って放置していました:sweat:

この質問も、いったん力尽くでやってみて、ループできね〜:cry: 記録しておけね〜:cry:と苦しんだ時に神のような回答がでてきてすごく感動したのを思い出します。

Splunk Answersがリニューアルした時に見失って結構探していました。
今回ようやく見つかったので紹介しておきます。

$\tiny{Splunk Answersが更新されてから、この回答してくれた人もみなくなってしまって悲しい。}$

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
2