Edited at

Embulkを使ってRDSのslow query logをElasticsearchに投入する。

More than 1 year has passed since last update.

Amazon RDSではデフォルトの設定で、slow query logはmysql.slow_logという

テーブルに保存されています。Embulkを使ってmysql.slow_logに保存された

slow query logをElasticsearchに投入してみます。


必要なプラグインのインストール

embulk mkbundleを利用して、必要なプラグインをGemfileで管理

できるようにします。

$ embulk mkbundle embulk-aurora-move-slowlog

$ cd embulk-move-slowlog

必要なプラグインをGemfileに記載します。今回はRDSのDBにあるslowlogを

Elasticsearchに投入するので、Gemfileに以下のような記述をします。

source 'https://rubygems.org/'

gem 'embulk', '~> 0.8.0'
gem 'embulk-input-mysql', '~> 0.8.2'
gem 'embulk-output-elasticsearch_ruby', '~> 0.1.4'
gem 'embulk-filter-typecast', '~> 0.1.5'
gem 'embulk-filter-column', '~> 0.6.0'

Gemfileを終えたら以下のコマンドでプラグインのインストールを行います。

$ embulk bundle


Embulkの設定ファイルの作成

以下のような設定ファイルを作成します。


config.yml

in:

type: mysql
user: xxxxxx
password: yyyyy
database: mysql
table: slow_log
host: zzzzzz.ap-northeast-1.rds.amazonaws.com
select: "*"
column_options:
start_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S+09:00"}
incremental: true
incremental_columns: [start_time]
options:
useLegacyDatetimeCode: false
serverTimezone: UTC
filters:
- type: typecast
columns:
- {name: query_time, type: long}
- {name: lock_time, type: long}
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log
index_type: log
request_timeout: 60

query_timeとlock_timeについてはスキーマを確認したところ、time型で

格納されていました。このままですとElasticsearchに投入した際に正確な

値が入らないので、embulk-filter-typecastを使用してlong型に変更

しました。

またRDSのタイムゾーンがJSTになっていて、embulkがUTCになっている場合に、

検索がうまくいかなかったり、Resumeの機能を利用した際に作成されるファイルの

時刻が巻き戻ってしまったりしたので、UTCとして、取り扱った上で、column_optionsでタイムゾーン+09:00を追加したstringに変換して、Elasticsearch側のmappingで時刻を合わせるように対応しました。


Elasticsearch側のmappingの設定

前述の理由により、start_timeのformatをtimestamp_formatで出力したformatに

合わせるように指定します。


slow_log.json

{

"template": "slow_log-*",
"mappings": {
"log": {
"properties": {
"start_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ssZZ"
},
"user_host": {
"type": "string",
"index": "not_analyzed"
},
"query_time": {
"type": "long"
},
"lock_time": {
"type": "long"
},
"row_sent": {
"type": "long"
},
"rows_examined": {
"type": "long"
},
"db": {
"type": "string",
"index": "not_analyzed"
},
"last_insert_id": {
"type": "long"
},
"insert_id": {
"type": "long"
},
"server_id": {
"type": "long"
},
"sql_text": {
"type": "string",
"index": "not_analyzed"
},
"thread_id": {
"type": "long"
}
}
}
}
}

上記のslow_log.jsonをtemplateとしてElasticsearchに登録します。

$ curl -XPUT elasticsearch.example.com:9200/_template/slow_log -d "$(cat slow_log.json)"

typeは集計に使いそうなquery_timeとlock_timeをlong型にした以外は、

出来るだけmysqlのスキーマに合わせて設定しました。

以下がslow_logのスキーマになりますが、thread_idやserver_idあたりは、

活用方法がなさそうなので、string型でもいいのかもしれません。

mysql> desc mysql.slow_log;

+----------------+---------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| start_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| user_host | mediumtext | NO | | NULL | |
| query_time | time | NO | | NULL | |
| lock_time | time | NO | | NULL | |
| rows_sent | int(11) | NO | | NULL | |
| rows_examined | int(11) | NO | | NULL | |
| db | varchar(512) | NO | | NULL | |
| last_insert_id | int(11) | NO | | NULL | |
| insert_id | int(11) | NO | | NULL | |
| server_id | int(10) unsigned | NO | | NULL | |
| sql_text | mediumtext | NO | | NULL | |
| thread_id | bigint(21) unsigned | NO | | NULL | |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)


Embulkの実行

以下のコマンドで実行します。



$ embulk run config.yml -b ./



またembulk-input-mysqlではResumeをサポートしているため、以下のコマンドを

実行することで前回の実行時のレコード以降を取り込むことができます。

$ embulk run config.yml -c diff.yml -b ./

config.ymlにあるincremental: trueとincremental_columns: [start_time]の設定により、前回取り込まれたstart_time移行のデータが2回目以降で取り込まれるようになります。

diff.ymlには以下の内容が出力され、embulkが実行される度にlast_recordの

部分が更新されていきます。


diff.yml

in:

last_record: ['2017-03-29T21:00:02.000000Z']
out: {}


日時でElasticsearchを管理するための工夫

embulk-output-elasticsearch_rubyではindex名に%Y%m%dを加えることで

Elasticsearchのindex名を日毎に変更することが出来ます。


config.yml

in:

type: mysql
...
中略
...
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log-%Y%m%d
index_type: log
request_timeout: 60

上記のような記述にすることにより、slow_log-20170301のようなindex名で

データが投入されるので、mapping templateで設定したindex名に沿った状態で、

日毎にindexを分けることが出来ます。このようにすることで1週間前に投入

されたindexを削除することが容易になると思います。


参考記事

以下の記事を参考にさせて頂きました。ありがとうございます。

1.ELBのログをEmbulkでElasticsearchに入れたメモ

2.embulk mkbundle の使い方

3.Fluentdのバッチ版Embulk(エンバルク)のまとめ