SQL Databaseから時系列データを取得するMssqlbeatを使ってみた(というか作った)ので簡単ですが使い方を紹介します。(メモ的に紹介します)
準備
はじめにMssqlbeatを使うための環境を作成します。環境が整っている方は読み飛ばしてください。
Mssqlbeatをビルドする
まずはビルド環境を整えます。
sudo apt install golang-go make
以下のgitリポジトリからソースコードを取得します。
export GOPATH=~/go
export GOBIN=$GOPATH/bin
export GOMODCACHE=$GOPATH/pkg/mod
export PATH=$GOBIN:$PATH
mkdir $GOPATH/src/github.com/KentaroAOKI
cd $GOPATH/src/github.com/KentaroAOKI/
git clone git@github.com:KentaroAOKI/mssqlbeat.git
取得したらビルドしていきます。あと、make時に出てきたメッセージを参考に go get してください。
cd mssqlbeat
make
実行ファイルを確認します。
file mssqlbeat
データ取得先の SQL Server を用意する
今回は Microsoft Azure のSQL Serverを使用します。また、SQL Darabase を作成する際は、SQL authentication を選択します。
ネットワークはPublic accessを許可してFirewall ruleにビルドしたホストのIPv4アドレスを設定しています。
SQL Database にテーブルを作成する
Mssqlbeat がデータを取得する先のテーブルを作成します。クエリは以下の通り。
CREATE TABLE TimeCount (
ID INT PRIMARY KEY IDENTITY(1,1),
CurrentTime DATETIME,
CountValue INT
);
SQL Database の Query editor (preview) などを使ってテーブルを作成できます。
作成したテーブルにデータを入れる
データを1秒間隔で入れるプログラムを作成します。server_name などのSQL Databaseに接続する設定は、ご自身の環境に合わせてください。
cd $GOPATH/src/github.com/KentaroAOKI/mssqlbeat/generate_data
vi main.go
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/microsoft/go-mssqldb"
)
var db *sql.DB
var server = "<server_name>.database.windows.net"
var port = 1433
var user = "<user_id>"
var password = "<password>"
var database = "<database>"
func main() {
// Build connection string
connString := fmt.Sprintf("server=%s;user id=%s;password=%s;port=%d;database=%s;",
server, user, password, port, database)
var err error
// Create connection pool
db, err = sql.Open("sqlserver", connString)
if err != nil {
log.Fatal("Error creating connection pool: ", err.Error())
}
ctx := context.Background()
// Insert data every second
count := 0
for {
count++
err = insertTimeCount(ctx, count)
if err != nil {
log.Fatal("Error inserting data: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}
func insertTimeCount(ctx context.Context, count int) error {
tsql := `INSERT INTO TimeCount (CurrentTime, CountValue) VALUES (@CurrentTime, @CountValue);`
_, err := db.ExecContext(
ctx,
tsql,
sql.Named("CurrentTime", time.Now()),
sql.Named("CountValue", count))
return err
}
データを入れるプログラムをビルドします。
go mod init generate_data
go get github.com/microsoft/go-mssqldb
go build
実行します。1秒間隔でデータを作成します。
Ctl-Cで停止することができます。
./generate_data
SQL Database の Query editor (preview) などでデータが入っていることを確認します。
データ格納先の Elasticsearch を用意する
Elasticsearch は Azure の Elastic Cloud で作成することができます。プロダクション環境を検討の場合はこちらをご利用ください。
今回は前回の記事と同じDocker 環境を使用して、Elasticsearch を構築します。今回は1台ですが今後のためにもネットワークを作っておきます。
sudo docker network create elastic
sudo docker run --name es01 --net elastic -p 9200:9200 -it -m 1GB docker.elastic.co/elasticsearch/elasticsearch:8.17.0
作成した Elasticsearchコンテナ のユーザー「elastic」のパスワードを初期化します。
sudo docker exec -it es01 /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic
表示されたパスワードを環境変数に設定します。
export ELASTIC_PASSWORD="your_password"
SSL certificate を Elasticsearch のコンテナから home ディレクトリにコピーします。
cd ~/
sudo docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .
Mssqlbeat を使ってみる
SQL Database と Elasticsearch を用意できたら、いよいよMssqlbeat を使ってみます。
ymlファイルの作成
Mssqlbeatのymlを作成します。output や processors などは他のbeatと設定方法は同じになります。Mssqlbeat設定のわかりにくい箇所の説明は以下の通りです。
- 10秒間隔でSQLサーバーからデータを取得します。(period 設定)
- データの取得は2のクエリで行う。増やすことも減らすことも可能です。(inputs 設定)
- データの取得は最大2つ並列で実行します。(threads 設定)
- クエリの取得データの条件をプレースホルダで指定しています。LastTimeは取得したデータの最新の時間が入る。(sql_query 設定)
- SQL Databaseのテーブルの時間カラム名を CurrentTime に設定します。「SQL Database にテーブルを作成する」参照。(sql_time_column 設定)
- LastTime は再起動時にも動作するようにファイルに保存し、動作中に読み込みますが、ファイルが存在しない場合は現在の時間を LastTime にします。(sql_time_initialize_with_current_time 設定)
- SQL Database から取得したデータのカラム名を、ElasticStack に送る際に "SQL1_" Prefix を付けたフィールド名として送信する。(field_prefix 設定)
mssqlbeat:
period: 10s
threads: 2
inputs:
- field: "sql1"
enabled: true
mssqlserver_host: "<server_name>.database.windows.net"
mssqlserver_port: "1433"
mssqlserver_userid: "<user_id>"
mssqlserver_password: "<password>"
mssqlserver_database: "<database_name>"
mssqlserver_tlsmin: "1.2"
sql_query: "SELECT * FROM TimeCount WHERE CurrentTime > @LastTime"
sql_time_column: "CurrentTime"
sql_time_initialize_with_current_time: true
field_prefix: "SQL1_"
- field: "sql2"
enabled: true
mssqlserver_host: "<server_name>.database.windows.net"
mssqlserver_port: "1433"
mssqlserver_userid: "<user_id>"
mssqlserver_password: "<password>"
mssqlserver_database: "<database_name>"
mssqlserver_tlsmin: "1.2"
sql_query: "SELECT * FROM TimeCount WHERE CurrentTime > @LastTime"
sql_time_column: "CurrentTime"
sql_time_initialize_with_current_time: true
field_prefix: "SQL2_"
output.elasticsearch:
hosts: ["localhost:9200"]
protocol: "https"
username: "elastic"
password: "<ELASTIC_PASSWORD>"
ssl.certificate_authorities: ["~/http_ca.crt"]
動かしてみる
作成したymlを指定して実行します。デバックモードを有効にして、ログを標準出力に出力します。
cd $GOPATH/src/github.com/KentaroAOKI/mssqlbeat
./mssqlbeat -c mssqlbeat.yml -e -d "*"
確認してみる
インデックスの確認
インデックスが作成されていることを確認する。
cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/_cat/indices?v"
インデックスのドキュメント数をカウント
インデックスに格納されているドキュメント数をカウントして確認する。
cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_count"
field が sql1 のドキュメント数をカウント
field が sql1 のドキュメント数をカウントして確認する。
cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_count" -H 'Content-Type: application/json' -d'
{
"query": {
"term": {
"field": "sql1"
}
}
}
'
全てのドキュメントを表示
全てのドキュメントを表示して確認する。
cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
}
}
' | jq .
以下のような出力になります。(記事の関係で、hitsは1つだけ残して、他は削除しています)
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": 1.0,
"hits": [
{
"_index": "mssqlbeat-7.17.26-2025.01.05-000001",
"_id": "nahENJQBeYR8G29E8Hlo",
"_score": 1.0,
"_source": {
"@timestamp": "2025-01-05T02:21:49.633Z",
"host": {
"name": "ubuntu24"
},
"agent": {
"id": "67ccfd5d-f4cc-4e8a-9f10-0bbfeb2d686a",
"name": "ubuntu24",
"type": "mssqlbeat",
"version": "7.17.26",
"hostname": "ubuntu24",
"ephemeral_id": "e3226428-877a-404e-81b9-25fa9eb520e8"
},
"SQL2_CountValue": 40665,
"type": "ubuntu24",
"field": "sql2",
"thread": 1,
"SQL2_ID": 40665,
"SQL2_CurrentTime": "2025-01-05T02:21:49.633Z",
"ecs": {
"version": "1.12.0"
}
}
}
}
}
}
インデックスを削除
動作確認を繰り返す時など、インデックスを削除したほうが良い場合もありますので、削除する方法も書いておきます。ワイルドカードが使えない設定の場合は、インデックス名を指定してください。
cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X DELETE "https://localhost:9200/mssqlbeat-*"
processorsを使ってみる
mssqlbeat.yml にprocessorsの設定を追加して動作を確認します。
- field が sql1 のデータをドロップ
- その他のデータに"index_sql2"のtagを追加
processors:
- if:
equals:
field: "sql1"
then:
- drop_event: {}
else:
- add_tags:
tags: ["index_sql2"]
以下のようにtagが追加された出力になります。(記事の関係で、hitsは1つだけ残して、他は削除しています)
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": 1.0,
"hits": [
{
"_index": "mssqlbeat-7.17.26-2025.01.05-000001",
"_id": "nahENJQBeYR8G29E8Hlo",
"_score": 1.0,
"_source": {
"@timestamp": "2025-01-05T02:21:49.633Z",
"host": {
"name": "ubuntu24"
},
"agent": {
"id": "67ccfd5d-f4cc-4e8a-9f10-0bbfeb2d686a",
"name": "ubuntu24",
"type": "mssqlbeat",
"version": "7.17.26",
"hostname": "ubuntu24",
"ephemeral_id": "e3226428-877a-404e-81b9-25fa9eb520e8"
},
"tags": [
"index_sql2"
],
"SQL2_CountValue": 40665,
"type": "ubuntu24",
"field": "sql2",
"thread": 1,
"SQL2_ID": 40665,
"SQL2_CurrentTime": "2025-01-05T02:21:49.633Z",
"ecs": {
"version": "1.12.0"
}
}
}
}
}
}
さいごに
ざざっと作成しましたが、時間があるときに丁寧に修正していきたいです。