Amazon RDSではデフォルトの設定で、slow query logはmysql.slow_logという
テーブルに保存されています。Embulkを使ってmysql.slow_logに保存された
slow query logをElasticsearchに投入してみます。
必要なプラグインのインストール
embulk mkbundleを利用して、必要なプラグインをGemfileで管理
できるようにします。
$ embulk mkbundle embulk-aurora-move-slowlog
$ cd embulk-move-slowlog
必要なプラグインをGemfileに記載します。今回はRDSのDBにあるslowlogを
Elasticsearchに投入するので、Gemfileに以下のような記述をします。
source 'https://rubygems.org/'
gem 'embulk', '~> 0.8.0'
gem 'embulk-input-mysql', '~> 0.8.2'
gem 'embulk-output-elasticsearch_ruby', '~> 0.1.4'
gem 'embulk-filter-typecast', '~> 0.1.5'
gem 'embulk-filter-column', '~> 0.6.0'
Gemfileを終えたら以下のコマンドでプラグインのインストールを行います。
$ embulk bundle
Embulkの設定ファイルの作成
以下のような設定ファイルを作成します。
in:
type: mysql
user: xxxxxx
password: yyyyy
database: mysql
table: slow_log
host: zzzzzz.ap-northeast-1.rds.amazonaws.com
select: "*"
column_options:
start_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S+09:00"}
incremental: true
incremental_columns: [start_time]
options:
useLegacyDatetimeCode: false
serverTimezone: UTC
filters:
- type: typecast
columns:
- {name: query_time, type: long}
- {name: lock_time, type: long}
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log
index_type: log
request_timeout: 60
query_timeとlock_timeについてはスキーマを確認したところ、time型で
格納されていました。このままですとElasticsearchに投入した際に正確な
値が入らないので、embulk-filter-typecastを使用してlong型に変更
しました。
またRDSのタイムゾーンがJSTになっていて、embulkがUTCになっている場合に、
検索がうまくいかなかったり、Resumeの機能を利用した際に作成されるファイルの
時刻が巻き戻ってしまったりしたので、UTCとして、取り扱った上で、column_optionsでタイムゾーン+09:00を追加したstringに変換して、Elasticsearch側のmappingで時刻を合わせるように対応しました。
Elasticsearch側のmappingの設定
前述の理由により、start_timeのformatをtimestamp_formatで出力したformatに
合わせるように指定します。
{
"template": "slow_log-*",
"mappings": {
"log": {
"properties": {
"start_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ssZZ"
},
"user_host": {
"type": "string",
"index": "not_analyzed"
},
"query_time": {
"type": "long"
},
"lock_time": {
"type": "long"
},
"row_sent": {
"type": "long"
},
"rows_examined": {
"type": "long"
},
"db": {
"type": "string",
"index": "not_analyzed"
},
"last_insert_id": {
"type": "long"
},
"insert_id": {
"type": "long"
},
"server_id": {
"type": "long"
},
"sql_text": {
"type": "string",
"index": "not_analyzed"
},
"thread_id": {
"type": "long"
}
}
}
}
}
上記のslow_log.jsonをtemplateとしてElasticsearchに登録します。
$ curl -XPUT elasticsearch.example.com:9200/_template/slow_log -d "$(cat slow_log.json)"
typeは集計に使いそうなquery_timeとlock_timeをlong型にした以外は、
出来るだけmysqlのスキーマに合わせて設定しました。
以下がslow_logのスキーマになりますが、thread_idやserver_idあたりは、
活用方法がなさそうなので、string型でもいいのかもしれません。
mysql> desc mysql.slow_log;
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| start_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| user_host | mediumtext | NO | | NULL | |
| query_time | time | NO | | NULL | |
| lock_time | time | NO | | NULL | |
| rows_sent | int(11) | NO | | NULL | |
| rows_examined | int(11) | NO | | NULL | |
| db | varchar(512) | NO | | NULL | |
| last_insert_id | int(11) | NO | | NULL | |
| insert_id | int(11) | NO | | NULL | |
| server_id | int(10) unsigned | NO | | NULL | |
| sql_text | mediumtext | NO | | NULL | |
| thread_id | bigint(21) unsigned | NO | | NULL | |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)
Embulkの実行
以下のコマンドで実行します。
$ embulk run config.yml -b ./
またembulk-input-mysqlではResumeをサポートしているため、以下のコマンドを
実行することで前回の実行時のレコード以降を取り込むことができます。
$ embulk run config.yml -c diff.yml -b ./
config.ymlにあるincremental: trueとincremental_columns: [start_time]の設定により、前回取り込まれたstart_time移行のデータが2回目以降で取り込まれるようになります。
diff.ymlには以下の内容が出力され、embulkが実行される度にlast_recordの
部分が更新されていきます。
in:
last_record: ['2017-03-29T21:00:02.000000Z']
out: {}
日時でElasticsearchを管理するための工夫
embulk-output-elasticsearch_rubyではindex名に%Y%m%dを加えることで
Elasticsearchのindex名を日毎に変更することが出来ます。
in:
type: mysql
...
中略
...
out:
type: elasticsearch_ruby
mode: normal
nodes:
- {host: "elasticsearch.example.com", port: 9200 }
index: slow_log-%Y%m%d
index_type: log
request_timeout: 60
上記のような記述にすることにより、slow_log-20170301のようなindex名で
データが投入されるので、mapping templateで設定したindex名に沿った状態で、
日毎にindexを分けることが出来ます。このようにすることで1週間前に投入
されたindexを削除することが容易になると思います。
参考記事
以下の記事を参考にさせて頂きました。ありがとうございます。
1.ELBのログをEmbulkでElasticsearchに入れたメモ
2.embulk mkbundle の使い方
3.Fluentdのバッチ版Embulk(エンバルク)のまとめ