LoginSignup
1
2

More than 5 years have passed since last update.

SensorBeeに触れてみた

Last updated at Posted at 2017-08-28

CentOSの環境を用意してGo言語もインストールしたので、SensorBeeというものを試してみます。

WMwareでCentOS7環境を作る
CentOSにGo言語のインストール

SensorBeeとは何か

SensorBee is an open source, lightweight, stateful streaming data processing engine for the Internet of Things (IoT).

ひとことで言うと、「IoT向け軽量ストリーミングエンジン」ということらしいです。

SensorBeeをインストールしてみる

goがインストールされていて、環境変数GOPATHが設定されている必要があります。

インストールは"go get"で行います。

$ go get gopkg.in/sensorbee/sensorbee.v0/...

tutorialを動かしてみる

まずはtutorialを取得してきます。

$ go get github.com/sensorbee/tutorial/wordcount

設定ファイル類をワークディレクトリにコピーします。

$ mkdir -p work/sensorbee_tl
$ cp go/src/github.com/sensorbee/tutorial/wordcount/config/* work/sensorbee_tl/

コピーしたファイルは3つです。

$ cd work/sensorbee_tl/
$ ls
build.yaml  sensorbee.yaml  wordcount.bql

ファイルの内容を眺めてみます。

$ cat build.yaml
plugins:
  - github.com/sensorbee/tutorial/wordcount/plugin
$ cat sensorbee.yaml
topologies:
  wordcount:
    bql_file: wordcount.bql

network:
  listen_on: ":15601"
$ cat wordcount.bql
CREATE SOURCE sentences TYPE wc_sentences;

CREATE STREAM words AS
    SELECT RSTREAM name, text AS word
        FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];

CREATE STREAM word_counts AS
    SELECT ISTREAM word, count(*)
        FROM words [RANGE 60 SECONDS]
        GROUP BY word;

なんとなく雰囲気を感じますかね・・・?

続いて、build.yamlがあるディレクトリでbuild_sensorbeeを実行します。
すると「sensorbee」という実行ファイル、「sensorbee_main.go」というGoプログラムが作成されます

$ build_sensorbee
sensorbee_main.go
$ ls
build.yaml  sensorbee  sensorbee.yaml  sensorbee_main.go  wordcount.bql

「sensorbee_main.go」の中身は以下の通りとなります。

$ cat sensorbee_main.go
package main

import (
 _ "github.com/sensorbee/tutorial/wordcount/plugin"
 _ "gopkg.in/sensorbee/sensorbee.v0/bql/udf/builtin"
 "gopkg.in/sensorbee/sensorbee.v0/cmd/lib/run"
 "gopkg.in/sensorbee/sensorbee.v0/cmd/lib/runfile"
 "gopkg.in/sensorbee/sensorbee.v0/cmd/lib/shell"
 "gopkg.in/sensorbee/sensorbee.v0/cmd/lib/topology"
 "gopkg.in/sensorbee/sensorbee.v0/version"
 "gopkg.in/urfave/cli.v1"
 "os"
 "time"
)

func init() {
 // TODO
 time.Local = time.UTC
}

func main() {
 app := cli.NewApp()
 app.Name = "sensorbee"
 app.Usage = "SensorBee built with build_sensorbee 0.7.1"
 app.Version = version.Version
 app.Commands = []cli.Command{
  run.SetUp(),
  runfile.SetUp(),
  shell.SetUp(),
  topology.SetUp(),
 }
 if err := app.Run(os.Args); err != nil {
  os.Exit(1)
 }
}

実行ファイル"sensorbee"を使用して、サーバーとして起動します。

$ ./sensorbee run
time="2017-08-26T08:58:26Z" level=info msg="Setting up the server context" config="{\"logging\":{\"log_destinationless_tuples\":false,\"log_dropped_tuples\":false,\"min_log_level\":\"info\",\"summarize_dropped_tuples\":false,\"target\":\"stderr\"},\"network\":{\"listen_on\":\":15601\"},\"storage\":{\"uds\":{\"params\":{},\"type\":\"in_memory\"}},\"topologies\":{}}"
time="2017-08-26T08:58:26Z" level=info msg="Starting the server on :15601"

続けて別のターミナルから操作します。

$ ss -nat | grep 15601
LISTEN     0      128         :::15601                   :::*
$ curl http://localhost:15601/api/v1/runtime_status
{"gomaxprocs":2,"goroot":"/usr/local/go","goversion":"go1.9","hostname":"monaca","num_cgo_call":1,"num_cpu":2,"num_goroutine":4,"pid":26461,"user":"centos","working_directory":"/home/centos/work/sensorbee_tl"}

Topologyというものを作ります。

$ ./sensorbee topology create wordcount
$ echo $?
0

作成したTopologyを指定してシェルを起動します。

$ ./sensorbee shell -t wordcount
wordcount> EVAL "Hello" || ", world!";
"Hello, world!"

"Topology"というものは後で調べた方がよさそうなキーワードですね。

ここで実行できるのがSQLライクな操作言語のBQLです。
BQLでSOURCEを作成します。

CREATE SOURCE sentences TYPE wc_sentences;

Tutorial通り、作成したSOURCEをFROM句に指定してSELECTします。

SELECT RSTREAM * FROM sentences [RANGE 1 TUPLES];
{"name":"jacob","text":"excepteur esse excepteur minim aliquip consectetur incididunt ipsum"}
{"name":"jacob","text":"do ut quis anim laboris officia elit mollit ipsum"}
{"name":"jacob","text":"ut nostrud sunt voluptate ad aliquip deserunt pariatur anim"}

FROM句にユーザ関数を適用してテキストを分割するサンプルです。

SELECT RSTREAM * FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
{"name":"isabella","text":"ut"}
{"name":"isabella","text":"do"}
{"name":"isabella","text":"quis"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"pariatur"}
{"name":"isabella","text":"reprehenderit"}
{"name":"isabella","text":"enim"}
{"name":"isabella","text":"mollit"}

SQLのVIEWのようにSELECT結果を新たにSTREAMとすることもできます。

CREATE STREAM words AS 
  SELECT RSTREAM name, text AS word
    FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];

これでwordcountできるようになります。

SELECT ISTREAM word, count(*) FROM words [RANGE 60 SECONDS] GROUP BY word;

シェルの終了はexitです。

wordcount> exit
$

exitしてもサーバを停止しない限り、CREATEしたSTREAMなどは有効ですが、サーバを再起動すると、Topologyから失われます。
また、サーバが停止されていても、"./sensorbee shell -t wordcount"で対話シェルに入れます(なにかしようとすると当然エラーとなりますが)。

サーバを停止するごとにTopologyを作成しなおすのは大変なので、サーバ起動時に設定ファイルを指定することができます。

$ cat sensorbee.yaml
topologies:
  wordcount:
    bql_file: wordcount.bql

network:
  listen_on: ":15601"
$ ./sensorbee run -c sensorbee.yaml

wordcountのTutorialを一通り動かしてみたので、次はwordcountの内容を探ってみたいと思います。

終わりです。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2