CentOSの環境を用意してGo言語もインストールしたので、SensorBeeというものを試してみます。
WMwareでCentOS7環境を作る
CentOSにGo言語のインストール
SensorBeeとは何か
SensorBee is an open source, lightweight, stateful streaming data processing engine for the Internet of Things (IoT).
ひとことで言うと、「IoT向け軽量ストリーミングエンジン」ということらしいです。
SensorBeeをインストールしてみる
goがインストールされていて、環境変数GOPATHが設定されている必要があります。
インストールは"go get"で行います。
$ go get gopkg.in/sensorbee/sensorbee.v0/...
tutorialを動かしてみる
まずはtutorialを取得してきます。
$ go get github.com/sensorbee/tutorial/wordcount
設定ファイル類をワークディレクトリにコピーします。
$ mkdir -p work/sensorbee_tl
$ cp go/src/github.com/sensorbee/tutorial/wordcount/config/* work/sensorbee_tl/
コピーしたファイルは3つです。
$ cd work/sensorbee_tl/
$ ls
build.yaml sensorbee.yaml wordcount.bql
ファイルの内容を眺めてみます。
$ cat build.yaml
plugins:
- github.com/sensorbee/tutorial/wordcount/plugin
$ cat sensorbee.yaml
topologies:
wordcount:
bql_file: wordcount.bql
network:
listen_on: ":15601"
$ cat wordcount.bql
CREATE SOURCE sentences TYPE wc_sentences;
CREATE STREAM words AS
SELECT RSTREAM name, text AS word
FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
CREATE STREAM word_counts AS
SELECT ISTREAM word, count(*)
FROM words [RANGE 60 SECONDS]
GROUP BY word;
なんとなく雰囲気を感じますかね・・・?
続いて、build.yamlがあるディレクトリでbuild_sensorbeeを実行します。
すると「sensorbee」という実行ファイル、「sensorbee_main.go」というGoプログラムが作成されます
$ build_sensorbee
sensorbee_main.go
$ ls
build.yaml sensorbee sensorbee.yaml sensorbee_main.go wordcount.bql
「sensorbee_main.go」の中身は以下の通りとなります。
$ cat sensorbee_main.go
package main
import (
_ "github.com/sensorbee/tutorial/wordcount/plugin"
_ "gopkg.in/sensorbee/sensorbee.v0/bql/udf/builtin"
"gopkg.in/sensorbee/sensorbee.v0/cmd/lib/run"
"gopkg.in/sensorbee/sensorbee.v0/cmd/lib/runfile"
"gopkg.in/sensorbee/sensorbee.v0/cmd/lib/shell"
"gopkg.in/sensorbee/sensorbee.v0/cmd/lib/topology"
"gopkg.in/sensorbee/sensorbee.v0/version"
"gopkg.in/urfave/cli.v1"
"os"
"time"
)
func init() {
// TODO
time.Local = time.UTC
}
func main() {
app := cli.NewApp()
app.Name = "sensorbee"
app.Usage = "SensorBee built with build_sensorbee 0.7.1"
app.Version = version.Version
app.Commands = []cli.Command{
run.SetUp(),
runfile.SetUp(),
shell.SetUp(),
topology.SetUp(),
}
if err := app.Run(os.Args); err != nil {
os.Exit(1)
}
}
実行ファイル"sensorbee"を使用して、サーバーとして起動します。
$ ./sensorbee run
time="2017-08-26T08:58:26Z" level=info msg="Setting up the server context" config="{\"logging\":{\"log_destinationless_tuples\":false,\"log_dropped_tuples\":false,\"min_log_level\":\"info\",\"summarize_dropped_tuples\":false,\"target\":\"stderr\"},\"network\":{\"listen_on\":\":15601\"},\"storage\":{\"uds\":{\"params\":{},\"type\":\"in_memory\"}},\"topologies\":{}}"
time="2017-08-26T08:58:26Z" level=info msg="Starting the server on :15601"
続けて別のターミナルから操作します。
$ ss -nat | grep 15601
LISTEN 0 128 :::15601 :::*
$ curl http://localhost:15601/api/v1/runtime_status
{"gomaxprocs":2,"goroot":"/usr/local/go","goversion":"go1.9","hostname":"monaca","num_cgo_call":1,"num_cpu":2,"num_goroutine":4,"pid":26461,"user":"centos","working_directory":"/home/centos/work/sensorbee_tl"}
Topologyというものを作ります。
$ ./sensorbee topology create wordcount
$ echo $?
0
作成したTopologyを指定してシェルを起動します。
$ ./sensorbee shell -t wordcount
wordcount> EVAL "Hello" || ", world!";
"Hello, world!"
"Topology"というものは後で調べた方がよさそうなキーワードですね。
ここで実行できるのがSQLライクな操作言語のBQLです。
BQLでSOURCEを作成します。
CREATE SOURCE sentences TYPE wc_sentences;
Tutorial通り、作成したSOURCEをFROM句に指定してSELECTします。
SELECT RSTREAM * FROM sentences [RANGE 1 TUPLES];
{"name":"jacob","text":"excepteur esse excepteur minim aliquip consectetur incididunt ipsum"}
{"name":"jacob","text":"do ut quis anim laboris officia elit mollit ipsum"}
{"name":"jacob","text":"ut nostrud sunt voluptate ad aliquip deserunt pariatur anim"}
FROM句にユーザ関数を適用してテキストを分割するサンプルです。
SELECT RSTREAM * FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
{"name":"isabella","text":"ut"}
{"name":"isabella","text":"do"}
{"name":"isabella","text":"quis"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"pariatur"}
{"name":"isabella","text":"reprehenderit"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"mollit"}
SQLのVIEWのようにSELECT結果を新たにSTREAMとすることもできます。
CREATE STREAM words AS
SELECT RSTREAM name, text AS word
FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
これでwordcountできるようになります。
SELECT ISTREAM word, count(*) FROM words [RANGE 60 SECONDS] GROUP BY word;
シェルの終了はexitです。
wordcount> exit
$
exitしてもサーバを停止しない限り、CREATEしたSTREAMなどは有効ですが、サーバを再起動すると、Topologyから失われます。
また、サーバが停止されていても、"./sensorbee shell -t wordcount"で対話シェルに入れます(なにかしようとすると当然エラーとなりますが)。
サーバを停止するごとにTopologyを作成しなおすのは大変なので、サーバ起動時に設定ファイルを指定することができます。
$ cat sensorbee.yaml
topologies:
wordcount:
bql_file: wordcount.bql
network:
listen_on: ":15601"
$ ./sensorbee run -c sensorbee.yaml
wordcountのTutorialを一通り動かしてみたので、次はwordcountの内容を探ってみたいと思います。
終わりです。