0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SQL Databaseの時系列データをElasticStackに格納しよう。Mssqlbeatを使ってみた。

Last updated at Posted at 2025-01-05

SQL Databaseから時系列データを取得するMssqlbeatを使ってみた(というか作った)ので簡単ですが使い方を紹介します。(メモ的に紹介します)

準備

はじめにMssqlbeatを使うための環境を作成します。環境が整っている方は読み飛ばしてください。

Mssqlbeatをビルドする

まずはビルド環境を整えます。

sudo apt install golang-go make

以下のgitリポジトリからソースコードを取得します。

export GOPATH=~/go
export GOBIN=$GOPATH/bin
export GOMODCACHE=$GOPATH/pkg/mod
export PATH=$GOBIN:$PATH
mkdir $GOPATH/src/github.com/KentaroAOKI
cd $GOPATH/src/github.com/KentaroAOKI/
git clone git@github.com:KentaroAOKI/mssqlbeat.git

取得したらビルドしていきます。あと、make時に出てきたメッセージを参考に go get してください。

cd mssqlbeat
make

実行ファイルを確認します。

file mssqlbeat

データ取得先の SQL Server を用意する

今回は Microsoft Azure のSQL Serverを使用します。また、SQL Darabase を作成する際は、SQL authentication を選択します。

image.png

ネットワークはPublic accessを許可してFirewall ruleにビルドしたホストのIPv4アドレスを設定しています。

image.png

SQL Database にテーブルを作成する

Mssqlbeat がデータを取得する先のテーブルを作成します。クエリは以下の通り。

CREATE TABLE TimeCount (
    ID INT PRIMARY KEY IDENTITY(1,1),
    CurrentTime DATETIME,
    CountValue INT
);

SQL Database の Query editor (preview) などを使ってテーブルを作成できます。

image.png

作成したテーブルにデータを入れる

データを1秒間隔で入れるプログラムを作成します。server_name などのSQL Databaseに接続する設定は、ご自身の環境に合わせてください。

cd $GOPATH/src/github.com/KentaroAOKI/mssqlbeat/generate_data
vi main.go
main.go
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"time"

	_ "github.com/microsoft/go-mssqldb"
)

var db *sql.DB
var server = "<server_name>.database.windows.net"
var port = 1433
var user = "<user_id>"
var password = "<password>"
var database = "<database>"

func main() {
	// Build connection string
	connString := fmt.Sprintf("server=%s;user id=%s;password=%s;port=%d;database=%s;",
		server, user, password, port, database)
	var err error
	// Create connection pool
	db, err = sql.Open("sqlserver", connString)
	if err != nil {
		log.Fatal("Error creating connection pool: ", err.Error())
	}
	ctx := context.Background()

	// Insert data every second
	count := 0
	for {
		count++
		err = insertTimeCount(ctx, count)
		if err != nil {
			log.Fatal("Error inserting data: ", err.Error())
		}
		time.Sleep(1 * time.Second)
	}
}

func insertTimeCount(ctx context.Context, count int) error {
	tsql := `INSERT INTO TimeCount (CurrentTime, CountValue) VALUES (@CurrentTime, @CountValue);`
	_, err := db.ExecContext(
		ctx,
		tsql,
		sql.Named("CurrentTime", time.Now()),
		sql.Named("CountValue", count))
	return err
}

データを入れるプログラムをビルドします。

go mod init generate_data
go get github.com/microsoft/go-mssqldb
go build

実行します。1秒間隔でデータを作成します。
Ctl-Cで停止することができます。

./generate_data

SQL Database の Query editor (preview) などでデータが入っていることを確認します。

image.png

データ格納先の Elasticsearch を用意する

Elasticsearch は Azure の Elastic Cloud で作成することができます。プロダクション環境を検討の場合はこちらをご利用ください。

今回は前回の記事と同じDocker 環境を使用して、Elasticsearch を構築します。今回は1台ですが今後のためにもネットワークを作っておきます。

sudo docker network create elastic
sudo docker run --name es01 --net elastic -p 9200:9200 -it -m 1GB docker.elastic.co/elasticsearch/elasticsearch:8.17.0

作成した Elasticsearchコンテナ のユーザー「elastic」のパスワードを初期化します。

sudo docker exec -it es01 /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic

表示されたパスワードを環境変数に設定します。

export ELASTIC_PASSWORD="your_password"

SSL certificate を Elasticsearch のコンテナから home ディレクトリにコピーします。

cd ~/
sudo docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .

Mssqlbeat を使ってみる

SQL Database と Elasticsearch を用意できたら、いよいよMssqlbeat を使ってみます。

ymlファイルの作成

Mssqlbeatのymlを作成します。output や processors などは他のbeatと設定方法は同じになります。Mssqlbeat設定のわかりにくい箇所の説明は以下の通りです。

  • 10秒間隔でSQLサーバーからデータを取得します。(period 設定)
  • データの取得は2のクエリで行う。増やすことも減らすことも可能です。(inputs 設定)
  • データの取得は最大2つ並列で実行します。(threads 設定)
  • クエリの取得データの条件をプレースホルダで指定しています。LastTimeは取得したデータの最新の時間が入る。(sql_query 設定)
  • SQL Databaseのテーブルの時間カラム名を CurrentTime に設定します。「SQL Database にテーブルを作成する」参照。(sql_time_column 設定)
  • LastTime は再起動時にも動作するようにファイルに保存し、動作中に読み込みますが、ファイルが存在しない場合は現在の時間を LastTime にします。(sql_time_initialize_with_current_time 設定)
  • SQL Database から取得したデータのカラム名を、ElasticStack に送る際に "SQL1_" Prefix を付けたフィールド名として送信する。(field_prefix 設定)
mssqlbeat.yml
mssqlbeat:
  period: 10s
  threads: 2
  inputs:
    - field: "sql1"
      enabled: true
      mssqlserver_host: "<server_name>.database.windows.net"
      mssqlserver_port: "1433"
      mssqlserver_userid: "<user_id>"
      mssqlserver_password: "<password>"
      mssqlserver_database: "<database_name>"
      mssqlserver_tlsmin: "1.2"
      sql_query: "SELECT * FROM TimeCount WHERE CurrentTime > @LastTime"
      sql_time_column: "CurrentTime"
      sql_time_initialize_with_current_time: true 
      field_prefix: "SQL1_"
    - field: "sql2"
      enabled: true
      mssqlserver_host: "<server_name>.database.windows.net"
      mssqlserver_port: "1433"
      mssqlserver_userid: "<user_id>"
      mssqlserver_password: "<password>"
      mssqlserver_database: "<database_name>"
      mssqlserver_tlsmin: "1.2"
      sql_query: "SELECT * FROM TimeCount WHERE CurrentTime > @LastTime"
      sql_time_column: "CurrentTime"
      sql_time_initialize_with_current_time: true 
      field_prefix: "SQL2_"

output.elasticsearch:
  hosts: ["localhost:9200"]
  protocol: "https"
  username: "elastic"
  password: "<ELASTIC_PASSWORD>"
  ssl.certificate_authorities: ["~/http_ca.crt"]

動かしてみる

作成したymlを指定して実行します。デバックモードを有効にして、ログを標準出力に出力します。

cd $GOPATH/src/github.com/KentaroAOKI/mssqlbeat
./mssqlbeat -c mssqlbeat.yml -e -d "*"

確認してみる

インデックスの確認

インデックスが作成されていることを確認する。

cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/_cat/indices?v"

インデックスのドキュメント数をカウント

インデックスに格納されているドキュメント数をカウントして確認する。

cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_count"

field が sql1 のドキュメント数をカウント

field が sql1 のドキュメント数をカウントして確認する。

cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_count" -H 'Content-Type: application/json' -d'
{
  "query": {
    "term": {
      "field": "sql1"
    }
  }
}
'

全てのドキュメントを表示

全てのドキュメントを表示して確認する。

cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X GET "https://localhost:9200/mssqlbeat-*/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match_all": {}
  }
}
' | jq .

以下のような出力になります。(記事の関係で、hitsは1つだけ残して、他は削除しています)

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "gte"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "mssqlbeat-7.17.26-2025.01.05-000001",
        "_id": "nahENJQBeYR8G29E8Hlo",
        "_score": 1.0,
        "_source": {
          "@timestamp": "2025-01-05T02:21:49.633Z",
          "host": {
            "name": "ubuntu24"
          },
          "agent": {
            "id": "67ccfd5d-f4cc-4e8a-9f10-0bbfeb2d686a",
            "name": "ubuntu24",
            "type": "mssqlbeat",
            "version": "7.17.26",
            "hostname": "ubuntu24",
            "ephemeral_id": "e3226428-877a-404e-81b9-25fa9eb520e8"
          },
          "SQL2_CountValue": 40665,
          "type": "ubuntu24",
          "field": "sql2",
          "thread": 1,
          "SQL2_ID": 40665,
          "SQL2_CurrentTime": "2025-01-05T02:21:49.633Z",
          "ecs": {
            "version": "1.12.0"
          }
        }
      }
    }
  }
}

インデックスを削除

動作確認を繰り返す時など、インデックスを削除したほうが良い場合もありますので、削除する方法も書いておきます。ワイルドカードが使えない設定の場合は、インデックス名を指定してください。

cd ~/
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD -X DELETE "https://localhost:9200/mssqlbeat-*"

processorsを使ってみる

mssqlbeat.yml にprocessorsの設定を追加して動作を確認します。

  • field が sql1 のデータをドロップ
  • その他のデータに"index_sql2"のtagを追加
mssqlbeat.yml
processors:
  - if:
      equals:
        field: "sql1"
    then:
      - drop_event: {}
    else:
      - add_tags:
          tags: ["index_sql2"]

以下のようにtagが追加された出力になります。(記事の関係で、hitsは1つだけ残して、他は削除しています)

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 10000,
      "relation": "gte"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "mssqlbeat-7.17.26-2025.01.05-000001",
        "_id": "nahENJQBeYR8G29E8Hlo",
        "_score": 1.0,
        "_source": {
          "@timestamp": "2025-01-05T02:21:49.633Z",
          "host": {
            "name": "ubuntu24"
          },
          "agent": {
            "id": "67ccfd5d-f4cc-4e8a-9f10-0bbfeb2d686a",
            "name": "ubuntu24",
            "type": "mssqlbeat",
            "version": "7.17.26",
            "hostname": "ubuntu24",
            "ephemeral_id": "e3226428-877a-404e-81b9-25fa9eb520e8"
          },
          "tags": [
            "index_sql2"
          ],
          "SQL2_CountValue": 40665,
          "type": "ubuntu24",
          "field": "sql2",
          "thread": 1,
          "SQL2_ID": 40665,
          "SQL2_CurrentTime": "2025-01-05T02:21:49.633Z",
          "ecs": {
            "version": "1.12.0"
          }
        }
      }
    }
  }
}

さいごに

ざざっと作成しましたが、時間があるときに丁寧に修正していきたいです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?