Beatsとは?
公式:https://www.elastic.co/products/beats
Logstash, Elasticsearch, Kibanaで知られるelastic社の新プロダクトです。
一言で言えば、さまざまなデータを収集・加工して転送するツールのプラットフォームです。
「fluentd」や「Logstash」のようなものといえばイメージが湧くでしょうか。
2015年11月24日にv1.0.0がリリースされ、当記事執筆時点でv1.1.2となっています。
特徴
- バックエンドとしてElasticsearch, Logstashを想定
- 機能が非常にシンプルで軽い
- Go言語実装
- マルチプラットフォーム
- 別途ランタイム(JVMとか)のインストール不要
- 設定ファイルはYAML形式
公開されているBeat
elastic社から公開されているBeatは次の4つとなります。
名称 | 概要 |
---|---|
Packetbeat | パケット情報を収集するBeat |
Topbeat | CPUやメモリなどのリソース情報を収集するBeat |
Filebeat | ログファイルを収集するBeat |
Winlogbeat | Windowsのイベントログを収集するBeat |
OSSコミュニティから提供されているBeatも数多くあります。
https://www.elastic.co/guide/en/beats/libbeat/current/community-beats.html
構成
Beatsは「libbeat」という共通機能(データ送信、設定管理、ロギング etc...)を提供するコンポーネントと
データの種類に応じて収集・加工を担う個々のコンポーネント(上図でいうPacketbeatやTopbeat)から構成されます。
個々のBeatは1Beatにつき1実行ファイルの構成となります。
ユースケースに応じてBeatを自作することもできます。(後述)
BeatsとLogstashの棲み分け
elastic社のデータ収集ツールとしてはLogstashが存在しますが、Beatsとの棲み分けはどうなるのでしょうか?
上図のとおり、「データの収集」はBeatsが担当し、「データのフィルタリングや加工処理」はLogstashが担当するという棲み分けになりそうです。
またBeatsはElasticsearch, Logstash, File, Console, Redis(非推奨)に対してしかデータ連携ができないため、その他の媒体にデータを連携したい場合は、一旦Logstashを経由する形になるかと思います。
今までelastic社のデータ可視化プラットフォームとして各ツールの頭文字をとって「ELK」(Elasticsearch + Logstash + Kibana)と言われていましたが、今後は「BELK」(Beats + Elasticsearch + Logstash + Kibana)ないし「BLK」(Beats + Logstash + Kibana)になっていくという感じでしょうか。
使ってみた
今回は公式から提供されているBeatsの中から「Topbeat」を試してみます。
環境情報
- OS:CentOS6.5 on VirtualBox + Vagrant
- Beats:1.1.2
- Elasticsearch:2.1.0
- Kibana:4.3.0
※ ElasticsearchとKibanaはインストールされている前提とします。
Topbeatインストール
https://www.elastic.co/guide/en/beats/topbeat/current/topbeat-installation.html
aptとyumで利用可能なリポジトリがあります。今回はyumでインストールします。
1 . 公開鍵インポート
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
2 . リポジトリ追加
[beats]
name=Elastic Beats Repository
baseurl=https://packages.elastic.co/beats/yum/el/$basearch
enabled=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
gpgcheck=1
3 . インストール
yum install topbeat
※deb, rpm, tgz, zip形式でも公開されています。Windowsの場合はZIPを落としましょう。
Topbeat設定
最低限の設定としてデータ転送先(今回はElasticsearch)のアドレスを記述します。
output:
### Elasticsearch as output
elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (http and 9200)
# In case you specify and additional path, the scheme is required: http://localhost:9200/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]
Elasticsearchインデックス定義
topbeat.template.json
をElasticsearchに食わせてインデックスを定義しておきます。
curl -XPUT 'http://localhost:9200/_template/topbeat' -d@/etc/topbeat/topbeat.template.json
Topbeat起動
service topbeat start
可視化
Kibanaからデータが連携されることを確認します。
デフォルトのインデックス名は topbeat-*
となっています。
なんとTopbeat用ダッシュボードのサンプルまでもgithubで公開されています。
https://github.com/elastic/beats-dashboards
直下のシェル叩いてDashboard定義をElasticsearchに食わせればOKです。
./load.sh -url "http://localhost:9200"
Topbeat用のダッシュボードを開くとマシン毎/プロセス毎のCPU使用率やらメモリ使用率やらが
可視化されています。
ここまで
わずか10分程度で作業できました。物凄い手軽ですね。
作ってみた
Developer Guideが提供されてますので、オリジナルなBeatを作ることもできます。
https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html
今回作るBeat
今回はWindowsの起動しているアプリケーションタスク一覧(プロセスではない)を収集するBeat
通称「Winappbeat」を作成してみます。
動機はこれを後輩のPCに仕込んで、業務中何やってるのか監視したいなーという悪巧みです。
まあ、ネタは何でもいいんです。(;^_^A
構成
先にも説明しましたが、Beatは主に「データ収集をに担うコンポーネント」と「データを送信するコンポーネント(Publisher)」の2つのコンポーネントから構成されます。
後者の「Publisher」を含む「設定管理」「ロギング」「デーモン/サービス」といった基本機能は「libbeat」という共通機能として提供されていますので、開発する場合は「データ収集をに担うコンポーネント(Beat custom logic)」部分だけを実装すれば問題ありません。
環境情報
- OS:Windows7
- Go:1.6
※Go言語で開発するための基本的な設定は完了している前提とします。
libbeat取得
公式で提供されているlibbeatのソースコードを go get
します。
go get github.com/elastic/beats
Beatテンプレート生成
0から作ってもいいのですが、
beat-generatorというテンプレートジェネレータが公開されているので利用します。
https://github.com/elastic/beat-generator
※Python、Cookiecutterが事前にインストールされている必要があります。
コマンドを実行すると、
cookiecutter https://github.com/elastic/beat-generator.git
Beatのテンプレートが生成されます。(projectnameを尋ねられるのでWinappbeatを指定)
winappbeat
│ .gitattributes
│ .gitignore
│ .travis.yml
│ CONTRIBUTING.md
│ glide.yaml
│ LICENSE
│ main.go
│ main_test.go
│ Makefile
│ README.md
│ winappbeat.yml ※
│
├─beater
│ winappbeat.go ※
│
├─config
│ config.go ※
│
├─docs
│ index.asciidoc
│
├─etc
│ beat.yml
│ fields.yml
│ winappbeat.template.json
│
└─tests
└─system
│ requirements.txt
│ test_base.py
│ winappbeat.py
│
└─config
winappbeat.yml.j2
最低限実装すべきファイルは「※」を付与したファイルとなります。
順を追って見ていきましょう。
設定ファイル作成
今回はデータを取得する間隔(ミリ秒)を設定できるようにします。
またアプリケーション一覧はWindowsのEnumWindows
関数を利用して取得しますが、
「ウィンドウ名が空のウィンドウを取得するかどうか」と「不可視ウィンドウを取得するかどうか」は選択できるようにしておきます。
今回はウィンドウ名が空ではない可視ウィンドウを取得します。
input:
# Defines how often an event is sent to the output
period: 10s
# Whether to get only window that has name
onlyNotEmpty: true
# Whether to get only visible window
onlyVisible: true
設定ファイルに対応する構造体を実装
設定ファイルに対応する構造体を実装します。
package config
type Config struct {
Winappbeat WinappbeatConfig
}
type WinappbeatConfig struct {
Period string `yaml:"period"`
OnlyNotEmpty string `yaml:"onlyNotEmpty"`
OnlyVisible string `yaml:"onlyVisible"`
}
データ取得・転送ロジックを実装
いわゆるメインとなる処理を実装します。
開発者はlibbeatが提供するBeater
インターフェースで定義されている以下のメソッドを実装する必要があります。
type Beater interface {
Config(*Beat) error
Setup(*Beat) error
Run(*Beat) error
Cleanup(*Beat) error
Stop()
}
- Config … 設定ファイルを扱うメソッド
- Setup … 初期処理を行うメソッド
- Run … データ収集のメイン処理を行うメソッド
- Cleanup … 終了処理を行うメソッド
- Stop … 停止要求があった場合の処理を行うメソッド
Config
設定ファイルの読み込みを行う処理を実装します。
func (bt *Winappbeat) Config(b *beat.Beat) error {
// Load beater configuration
err := cfgfile.Read(&bt.Configuration, "")
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
}
return nil
}
Setup
初期処理を実装します。
ここでは設定ファイルの値がセットされていない場合のデフォルト値をセットします。
func (bt *Winappbeat) Setup(b *beat.Beat) error {
// Setting default period if not set
if bt.Configuration.Winappbeat.Period == "" {
bt.Configuration.Winappbeat.Period = "30s"
}
var err error
bt.period, err = time.ParseDuration(bt.Configuration.Winappbeat.Period)
if err != nil {
return err
}
// Setting default onlyNotEmpty if not set
if bt.Configuration.Winappbeat.OnlyNotEmpty == "" {
bt.Configuration.Winappbeat.OnlyNotEmpty = "true"
}
bt.onlyNotEmpty, err = strconv.ParseBool(bt.Configuration.Winappbeat.OnlyNotEmpty)
if err != nil {
return err
}
// Setting default onlyVisible if not set
if bt.Configuration.Winappbeat.OnlyVisible == "" {
bt.Configuration.Winappbeat.OnlyVisible = "true"
}
bt.onlyVisible, err = strconv.ParseBool(bt.Configuration.Winappbeat.OnlyVisible)
if err != nil {
return err
}
return nil
}
Run
メインロジックを実装します。
ここでは、設定ファイルで設定した間隔毎に、Windowsの起動中アプリケーションを取得して、
Elasticsearchに連携する処理を実装します。
func (bt *Winappbeat) Run(b *beat.Beat) error {
ticker := time.NewTicker(bt.period)
counter := 1
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}
list := listWindows(win.HWND(0))
for _, w := range list {
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Name,
"counter": counter,
"windowname": w.name,
"windowcategory": w.nameCategory,
"windowclass": w.class,
"left": w.r.Left,
"top": w.r.Top,
"width": w.r.Right - w.r.Left,
"height": w.r.Bottom - w.r.Top,
}
b.Events.PublishEvent(event)
}
counter++
}
}
func listWindows(hwnd win.HWND) []window {
var d cbData
d.list = make([]window, 0)
d.pid = make(map[uint32]string)
win.EnumChildWindows(hwnd, syscall.NewCallback(perWindow), uintptr(unsafe.Pointer(&d)))
return d.list
}
func perWindow(hwnd win.HWND, param uintptr) uintptr {
d := (*cbData)(unsafe.Pointer(param))
w := window{hwnd: hwnd}
w.visible = win.IsWindowVisible(hwnd)
win.GetWindowRect(hwnd, &w.r)
w.name = getWindowName(hwnd, getWindowText)
slice := strings.Split(w.name, " - ")
w.nameCategory = slice[len(slice)-1]
w.hasChild = win.GetWindow(hwnd, win.GW_CHILD) != 0
if w.name != "" {
if w.visible {
d.list = append(d.list, w)
}
}
return 1
}
func getWindowName(hwnd win.HWND, get *syscall.LazyProc) string {
const bufSiz = 512
var buf [bufSiz]uint16
siz, _, _ := get.Call(uintptr(hwnd), uintptr(unsafe.Pointer(&buf[0])), uintptr(len(buf)))
if siz == 0 {
return ""
}
name := syscall.UTF16ToString(buf[:siz])
if siz == bufSiz-1 {
name = name + "\u22EF"
}
return name
}
少し長くなっってますが、肝はevent
オブジェクトにデータを設定してPublishEvent
で送信する部分です。
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Name,
"counter": counter,
"windowname": w.name,
"windowcategory": w.nameCategory,
"windowclass": w.class,
"left": w.r.Left,
"top": w.r.Top,
"width": w.r.Right - w.r.Left,
"height": w.r.Bottom - w.r.Top,
}
b.Events.PublishEvent(event)
Cleanup
終了処理を実装します。今回は特に何もしません。
func (bt *Winappbeat) Cleanup(b *beat.Beat) error {
return nil
}
Setup
停止要求があった場合の処理を実装します。
チャネルをクローズしてRun
メソッドの処理を終了します。
func (bt *Winappbeat) Stop() {
close(bt.done)
}
コンパイル
go build
して実行ファイルを生成します。
go build -ldflags -H=windowsgui main.go
実行
生成されたwinappbeat.exeを起動します。
連携確認
Kibanaからデータが連携されていることを確認します。
event
オブジェクトに設定したデータが送られてきてますね。
ダッシュボード作成
赤裸々な情報が取得できてしまったので割愛させてください。
ネタバラシをしたときに後輩からは一線を引かれた気がしますが、しっかりと働いてました。
まとめ
いかがでしたしょうか。簡単にですがBeatの使い方と作り方でした。
個人的に思うところとしては、こんな感じです。
-
Go言語実装がGood
クロスコンパイルできるので展開しやすい、実行ランタイムを別途インストールする必要がないなど、
Beatのようなエージェント型のツールとの相性いいなと感じました。
-
個々のBeatsが「小さく」「疎」であることがGood
1つのドでかいバイナリではなく、必要なものを必要なだけ組み合わせて利用できます。
-
データ分析基盤、可視化基盤(Elasticsearch + Kibana)との連携が容易なところがGood
今回のようにKibanaのダッシュボードもセットで配布すれば、瞬殺でモニタリングまでできます。
-
Elasticsearch, Logstash以外にも設定ファイルレベルで連携先が選べると嬉しい。