Edited at

Go製のData Shipper Platform「Beats」が良さそうなので紹介してみる

More than 3 years have passed since last update.


Beatsとは?

beats-petals.png

公式:https://www.elastic.co/products/beats

Logstash, Elasticsearch, Kibanaで知られるelastic社の新プロダクトです。

一言で言えば、さまざまなデータを収集・加工して転送するツールのプラットフォームです。

「fluentd」や「Logstash」のようなものといえばイメージが湧くでしょうか。

2015年11月24日にv1.0.0がリリースされ、当記事執筆時点でv1.1.2となっています。


特徴


  • バックエンドとしてElasticsearch, Logstashを想定

  • 機能が非常にシンプルで軽い

  • Go言語実装


    • マルチプラットフォーム

    • 別途ランタイム(JVMとか)のインストール不要



  • 設定ファイルはYAML形式


公開されているBeat

elastic社から公開されているBeatは次の4つとなります。

名称
概要

Packetbeat
パケット情報を収集するBeat

Topbeat
CPUやメモリなどのリソース情報を収集するBeat

Filebeat
ログファイルを収集するBeat

Winlogbeat
Windowsのイベントログを収集するBeat

OSSコミュニティから提供されているBeatも数多くあります。

https://www.elastic.co/guide/en/beats/libbeat/current/community-beats.html


構成

beats-platform.png

Beatsは「libbeat」という共通機能(データ送信、設定管理、ロギング etc...)を提供するコンポーネントと

データの種類に応じて収集・加工を担う個々のコンポーネント(上図でいうPacketbeatやTopbeat)から構成されます。

個々のBeatは1Beatにつき1実行ファイルの構成となります。

ユースケースに応じてBeatを自作することもできます。(後述)


BeatsとLogstashの棲み分け

elastic社のデータ収集ツールとしてはLogstashが存在しますが、Beatsとの棲み分けはどうなるのでしょうか?

上図のとおり、「データの収集」はBeatsが担当し、「データのフィルタリングや加工処理」はLogstashが担当するという棲み分けになりそうです。

またBeatsはElasticsearch, Logstash, File, Console, Redis(非推奨)に対してしかデータ連携ができないため、その他の媒体にデータを連携したい場合は、一旦Logstashを経由する形になるかと思います。

今までelastic社のデータ可視化プラットフォームとして各ツールの頭文字をとって「ELK」(Elasticsearch + Logstash + Kibana)と言われていましたが、今後は「BELK」(Beats + Elasticsearch + Logstash + Kibana)ないし「BLK」(Beats + Logstash + Kibana)になっていくという感じでしょうか。


使ってみた

今回は公式から提供されているBeatsの中から「Topbeat」を試してみます。


環境情報


  • OS:CentOS6.5 on VirtualBox + Vagrant

  • Beats:1.1.2

  • Elasticsearch:2.1.0

  • Kibana:4.3.0

※ ElasticsearchとKibanaはインストールされている前提とします。


Topbeatインストール

https://www.elastic.co/guide/en/beats/topbeat/current/topbeat-installation.html

aptとyumで利用可能なリポジトリがあります。今回はyumでインストールします。

1 . 公開鍵インポート

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2 . リポジトリ追加


/etc/yum.repos.d/

[beats]

name=Elastic Beats Repository
baseurl=https://packages.elastic.co/beats/yum/el/$basearch
enabled=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
gpgcheck=1

3 . インストール

yum install topbeat

※deb, rpm, tgz, zip形式でも公開されています。Windowsの場合はZIPを落としましょう。


Topbeat設定

最低限の設定としてデータ転送先(今回はElasticsearch)のアドレスを記述します。


/etc/topbeat/topbeat.yml

output:

### Elasticsearch as output
elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (http and 9200)
# In case you specify and additional path, the scheme is required: http://localhost:9200/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200
hosts: ["localhost:9200"]



Elasticsearchインデックス定義

topbeat.template.json をElasticsearchに食わせてインデックスを定義しておきます。

curl -XPUT 'http://localhost:9200/_template/topbeat' -d@/etc/topbeat/topbeat.template.json


Topbeat起動

service topbeat start


可視化

Kibanaからデータが連携されることを確認します。

デフォルトのインデックス名は topbeat-* となっています。

KibanaDiscover.png

なんとTopbeat用ダッシュボードのサンプルまでもgithubで公開されています。

https://github.com/elastic/beats-dashboards

直下のシェル叩いてDashboard定義をElasticsearchに食わせればOKです。

./load.sh -url "http://localhost:9200"

Topbeat用のダッシュボードを開くとマシン毎/プロセス毎のCPU使用率やらメモリ使用率やらが

可視化されています。

KibanaDashboard.png


ここまで

わずか10分程度で作業できました。物凄い手軽ですね。


作ってみた

Developer Guideが提供されてますので、オリジナルなBeatを作ることもできます。

https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html


今回作るBeat

今回はWindowsの起動しているアプリケーションタスク一覧(プロセスではない)を収集するBeat

通称「Winappbeat」を作成してみます。

task.png

動機はこれを後輩のPCに仕込んで、業務中何やってるのか監視したいなーという悪巧みです。

まあ、ネタは何でもいいんです。(;^_^A


構成

beat_overview.png

先にも説明しましたが、Beatは主に「データ収集をに担うコンポーネント」と「データを送信するコンポーネント(Publisher)」の2つのコンポーネントから構成されます。

後者の「Publisher」を含む「設定管理」「ロギング」「デーモン/サービス」といった基本機能は「libbeat」という共通機能として提供されていますので、開発する場合は「データ収集をに担うコンポーネント(Beat custom logic)」部分だけを実装すれば問題ありません。


環境情報


  • OS:Windows7

  • Go:1.6

※Go言語で開発するための基本的な設定は完了している前提とします。


libbeat取得

公式で提供されているlibbeatのソースコードを go get します。

go get github.com/elastic/beats


Beatテンプレート生成

0から作ってもいいのですが、

beat-generatorというテンプレートジェネレータが公開されているので利用します。

https://github.com/elastic/beat-generator

※Python、Cookiecutterが事前にインストールされている必要があります。

コマンドを実行すると、

cookiecutter https://github.com/elastic/beat-generator.git

Beatのテンプレートが生成されます。(projectnameを尋ねられるのでWinappbeatを指定)

winappbeat

│ .gitattributes
│ .gitignore
│ .travis.yml
│ CONTRIBUTING.md
│ glide.yaml
│ LICENSE
│ main.go
│ main_test.go
│ Makefile
│ README.md
│ winappbeat.yml ※

├─beater
│ winappbeat.go ※

├─config
│ config.go ※

├─docs
│ index.asciidoc

├─etc
│ beat.yml
│ fields.yml
│ winappbeat.template.json

└─tests
└─system
│ requirements.txt
│ test_base.py
│ winappbeat.py

└─config
winappbeat.yml.j2

最低限実装すべきファイルは「※」を付与したファイルとなります。

順を追って見ていきましょう。


設定ファイル作成

今回はデータを取得する間隔(ミリ秒)を設定できるようにします。

またアプリケーション一覧はWindowsのEnumWindows関数を利用して取得しますが、

「ウィンドウ名が空のウィンドウを取得するかどうか」と「不可視ウィンドウを取得するかどうか」は選択できるようにしておきます。

今回はウィンドウ名が空ではない可視ウィンドウを取得します。


winappbeat.yml

input:

# Defines how often an event is sent to the output
period: 10s
# Whether to get only window that has name
onlyNotEmpty: true
# Whether to get only visible window
onlyVisible: true


設定ファイルに対応する構造体を実装

設定ファイルに対応する構造体を実装します。


config/config.go

package config

type Config struct {
Winappbeat WinappbeatConfig
}

type WinappbeatConfig struct {
Period string `yaml:"period"`
OnlyNotEmpty string `yaml:"onlyNotEmpty"`
OnlyVisible string `yaml:"onlyVisible"`
}



データ取得・転送ロジックを実装

いわゆるメインとなる処理を実装します。

開発者はlibbeatが提供するBeaterインターフェースで定義されている以下のメソッドを実装する必要があります。

type Beater interface {

Config(*Beat) error
Setup(*Beat) error
Run(*Beat) error
Cleanup(*Beat) error
Stop()
}


  • Config … 設定ファイルを扱うメソッド

  • Setup … 初期処理を行うメソッド

  • Run … データ収集のメイン処理を行うメソッド

  • Cleanup … 終了処理を行うメソッド

  • Stop … 停止要求があった場合の処理を行うメソッド


Config

設定ファイルの読み込みを行う処理を実装します。


beater/winappbeat.go

func (bt *Winappbeat) Config(b *beat.Beat) error {

// Load beater configuration
err := cfgfile.Read(&bt.Configuration, "")
if err != nil {
return fmt.Errorf("Error reading config file: %v", err)
}
return nil
}


Setup

初期処理を実装します。

ここでは設定ファイルの値がセットされていない場合のデフォルト値をセットします。


beater/winappbeat.go

func (bt *Winappbeat) Setup(b *beat.Beat) error {

// Setting default period if not set
if bt.Configuration.Winappbeat.Period == "" {
bt.Configuration.Winappbeat.Period = "30s"
}
var err error
bt.period, err = time.ParseDuration(bt.Configuration.Winappbeat.Period)
if err != nil {
return err
}
// Setting default onlyNotEmpty if not set
if bt.Configuration.Winappbeat.OnlyNotEmpty == "" {
bt.Configuration.Winappbeat.OnlyNotEmpty = "true"
}
bt.onlyNotEmpty, err = strconv.ParseBool(bt.Configuration.Winappbeat.OnlyNotEmpty)
if err != nil {
return err
}
// Setting default onlyVisible if not set
if bt.Configuration.Winappbeat.OnlyVisible == "" {
bt.Configuration.Winappbeat.OnlyVisible = "true"
}
bt.onlyVisible, err = strconv.ParseBool(bt.Configuration.Winappbeat.OnlyVisible)
if err != nil {
return err
}
return nil
}


Run

メインロジックを実装します。

ここでは、設定ファイルで設定した間隔毎に、Windowsの起動中アプリケーションを取得して、

Elasticsearchに連携する処理を実装します。


beater/winappbeat.go

func (bt *Winappbeat) Run(b *beat.Beat) error {

ticker := time.NewTicker(bt.period)
counter := 1
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}

list := listWindows(win.HWND(0))
for _, w := range list {
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Name,
"counter": counter,
"windowname": w.name,
"windowcategory": w.nameCategory,
"windowclass": w.class,
"left": w.r.Left,
"top": w.r.Top,
"width": w.r.Right - w.r.Left,
"height": w.r.Bottom - w.r.Top,
}
b.Events.PublishEvent(event)
}
counter++
}
}

func listWindows(hwnd win.HWND) []window {
var d cbData
d.list = make([]window, 0)
d.pid = make(map[uint32]string)
win.EnumChildWindows(hwnd, syscall.NewCallback(perWindow), uintptr(unsafe.Pointer(&d)))
return d.list
}

func perWindow(hwnd win.HWND, param uintptr) uintptr {
d := (*cbData)(unsafe.Pointer(param))
w := window{hwnd: hwnd}
w.visible = win.IsWindowVisible(hwnd)
win.GetWindowRect(hwnd, &w.r)
w.name = getWindowName(hwnd, getWindowText)
slice := strings.Split(w.name, " - ")
w.nameCategory = slice[len(slice)-1]
w.hasChild = win.GetWindow(hwnd, win.GW_CHILD) != 0
if w.name != "" {
if w.visible {
d.list = append(d.list, w)
}
}
return 1
}

func getWindowName(hwnd win.HWND, get *syscall.LazyProc) string {

const bufSiz = 512

var buf [bufSiz]uint16
siz, _, _ := get.Call(uintptr(hwnd), uintptr(unsafe.Pointer(&buf[0])), uintptr(len(buf)))
if siz == 0 {
return ""
}
name := syscall.UTF16ToString(buf[:siz])
if siz == bufSiz-1 {
name = name + "\u22EF"
}
return name
}


少し長くなっってますが、肝はeventオブジェクトにデータを設定してPublishEventで送信する部分です。


beater/winappbeat.go

            event := common.MapStr{

"@timestamp": common.Time(time.Now()),
"type": b.Name,
"counter": counter,
"windowname": w.name,
"windowcategory": w.nameCategory,
"windowclass": w.class,
"left": w.r.Left,
"top": w.r.Top,
"width": w.r.Right - w.r.Left,
"height": w.r.Bottom - w.r.Top,
}
b.Events.PublishEvent(event)


Cleanup

終了処理を実装します。今回は特に何もしません。

func (bt *Winappbeat) Cleanup(b *beat.Beat) error {

return nil
}


Setup

停止要求があった場合の処理を実装します。

チャネルをクローズしてRunメソッドの処理を終了します。

func (bt *Winappbeat) Stop() {

close(bt.done)
}


コンパイル

go build して実行ファイルを生成します。

go build -ldflags -H=windowsgui main.go


実行

生成されたwinappbeat.exeを起動します。


連携確認

Kibanaからデータが連携されていることを確認します。

eventオブジェクトに設定したデータが送られてきてますね。

visualize.PNG


ダッシュボード作成

赤裸々な情報が取得できてしまったので割愛させてください。

ネタバラシをしたときに後輩からは一線を引かれた気がしますが、しっかりと働いてました。


まとめ

いかがでしたしょうか。簡単にですがBeatの使い方と作り方でした。

個人的に思うところとしては、こんな感じです。


  • Go言語実装がGood

    クロスコンパイルできるので展開しやすい、実行ランタイムを別途インストールする必要がないなど、

    Beatのようなエージェント型のツールとの相性いいなと感じました。

     


  • 個々のBeatsが「小さく」「疎」であることがGood

    1つのドでかいバイナリではなく、必要なものを必要なだけ組み合わせて利用できます。

     


  • データ分析基盤、可視化基盤(Elasticsearch + Kibana)との連携が容易なところがGood

    今回のようにKibanaのダッシュボードもセットで配布すれば、瞬殺でモニタリングまでできます。

     


  • Elasticsearch, Logstash以外にも設定ファイルレベルで連携先が選べると嬉しい。



参考