目的
MySQLのデータをFluentdを使ってElasticsearchに流し込むとき、logstash形式にして@timestampにMySQL内の日付データを使う。
(DBに格納しているログイン履歴を、そのままKibanaでも見えるようにしたいなーというのが要件)
使うもの
- https://github.com/uken/fluent-plugin-elasticsearch
- https://github.com/y-ken/fluent-plugin-mysql-replicator
- インストールでつまづいたが、gem install mysql を単体で先に実行することで回避。
- MySQLテーブルへの更新/削除イベントを逐次取得するFluentdプラグイン「fluent-plugin-mysql-replicator」をリリースしました が役に立つ。
状況
- MySQL
- Host: localhost
- User: dbuser
- Password: dbpass
- Database: testdb
- Table: history
- Schema: この中の loginTime を @timestamp として使いたい
Field | Type |
---|---|
login | varchar(255) |
userId | bigint(20) |
loginTime | datetime |
sessionId | varchar(255) |
- Elasticsearch
- Host: localhost
- Port: 9200
設定ファイル
<source>
type mysql_replicator
host localhost
username dbuser
password dbpass
database testdb
query SELECT login,userId,loginTime,sessionId,DATE_FORMAT(loginTime, '%Y-%m-%dT%k:%i:%s+09:00') as "@timestamp" FROM history WHERE DATE_ADD(loginTime, INTERVAL 1 MINUTE) > NOW();
primary_key sessionId
interval 1m
enable_delete no
tag replicator.testdb.history.${event}
</source>
-
README には、
"@timestamp":"2014-04-07T000:00:00-00:00"
や"@timstamp": "2014-12-19T08:01:03Z"
などの表記があるが、DATE_FORMAT(loginTime, '%Y-%m-%dT%k:%i:%s+09:00')
で動いてくれた。('%Y-%m-%dT%k:%i:%sZ'
は動かず。)
<match replicator.**>
type copy
<store>
type_name dbsync
type elasticsearch
include_tag_key true
tag_key @log_name
host localhost
port 9200
logstash_format true
</store>
<store>
type file
path /tmp/login_history
</store>
</match>
動作確認
- td-agent を再起動する
- Kibana でDBのデータが見える
{
"_index": "logstash-2015.02.16",
"_type": "dbsync",
"_id": "iXONkf2-SWWbYXN7ZngtdQ",
"_score": null,
"_source": {
"login": "TEST_LOGIN",
"userId": 123,
"loginTime": "2015-02-16 16:02:01 +0900",
"sessionId": "ab533450625c7e849272fea658426545",
"@timestamp": "2015-02-16T16:02:01+09:00",
"@log_name": "replicator.testdb.history.insert"
},
"sort": [
1424070121000,
1424070121000
]
}