どうも、ishida(@kojiisd)です。
AWS IoTアドベントカレンダーを書くためにQiitaアカウントを取り、それ以来全く使っていなかったのでそろそろ使おうかと改心。
最近割とガチでSiddhiを使い始めたのですが、情報がなさ過ぎて思うように進まない!
そもそも技術ではよくある、発表されたら誰かが翻訳して、、、という流れもなければ外人が「使ってみた」を公開しているわけでもなく、かなり嵌ったわけです。
というわけで、そんな経験をメモ。あくまで自分の理解です。
概要
CEPでリアルタイムストリームデータ処理をしている中で、スライドウィンドウと閾値を設けて判定を加えたりできる技術。
Esperも技術解としては存在するが、パフォーマンス的にSiddhiの方が優秀(らしい)
基礎
とある処理から流れてきているデータをマージしたり計算したりして、結果新しい処理の流れを生み出す、というコンセプト。
よくあるサンプル
from StockExchangeStream[volume <10]
insert into StockQuoteStream symbol, volume
上記はStockExchangeStreamというストリーム処理の流れを受けて、volumeパラメータが10より小さいものを抽出し、StockQuoteStreamという名前付けされた新しいStreamに対して、symbolとvolumeパラメータを追加する、という意味。
→結果、新しいStockQuoteStreamという名前の流れが生成される。
→別のSiddhiでこの名前を流入元として指定することが可能。
実際には、上記のQueryを実行する前に、StrockExchnageStreamの定義が必要になるので、
以下のようなdefineをする必要がある。
QueryEditorを利用すると、この辺を自動的に実施してくれるっぽい。
define stream StockExchangeStream (symbol string, price float, volume int );
注意点
- 使える予約語に注意。SQLと似ているが、少しずつ違う。WHERE句が使えないなど、Conditionの用い方は気を付けること。
- 「;」のつける位置に注意。defineのブロックの最後は改行、insertのブロックの最後も改行。それ以外の文末は「;」にしないとSyntax Errorが発生する。
計算結果のストリームを受け取り、その中で100より大きい計算結果が10分間の間に5回より多く発生した場合にresultStreamというストリームに結果を書き込むQueryは以下。
define stream inStream(id string, calcResult float,timestamp long);
define stream inDisStream(id string, calcResult float,timestamp long)
@info(name='streamQuery')from inStream[calcResult>100] select id, calcResult, timestamp
insert into inDisStream;
from inDisStream[count(id)>5]#window.time(10 min) select avg(calcResult) as avgResult
insert into resultStream
感想
いやぁ、わかりづらい。Java使ってる身としては処理を書く順番など感覚が所々違っているので、理解するのに時間がかかりました。