5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulkを使ってRDSのslow query logをElasticsearchに投入する。

Last updated at Posted at 2017-03-30

Amazon RDSではデフォルトの設定で、slow query logはmysql.slow_logという
テーブルに保存されています。Embulkを使ってmysql.slow_logに保存された
slow query logをElasticsearchに投入してみます。

必要なプラグインのインストール

embulk mkbundleを利用して、必要なプラグインをGemfileで管理
できるようにします。

$ embulk mkbundle embulk-aurora-move-slowlog
$ cd embulk-move-slowlog

必要なプラグインをGemfileに記載します。今回はRDSのDBにあるslowlogを
Elasticsearchに投入するので、Gemfileに以下のような記述をします。

source 'https://rubygems.org/'
gem 'embulk', '~> 0.8.0'
gem 'embulk-input-mysql', '~> 0.8.2'
gem 'embulk-output-elasticsearch_ruby', '~> 0.1.4'
gem 'embulk-filter-typecast', '~> 0.1.5'
gem 'embulk-filter-column', '~> 0.6.0'

Gemfileを終えたら以下のコマンドでプラグインのインストールを行います。

$ embulk bundle

Embulkの設定ファイルの作成

以下のような設定ファイルを作成します。

config.yml
in:
  type: mysql
  user: xxxxxx
  password: yyyyy
  database: mysql
  table: slow_log
  host: zzzzzz.ap-northeast-1.rds.amazonaws.com
  select: "*"
  column_options:
    start_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S+09:00"}
  incremental: true
  incremental_columns: [start_time]
  options:
    useLegacyDatetimeCode: false
    serverTimezone: UTC
filters:
  - type: typecast
    columns:
      - {name: query_time, type: long}
      - {name: lock_time, type: long}
out:
  type: elasticsearch_ruby
  mode: normal
  nodes:
    - {host: "elasticsearch.example.com", port: 9200 }
  index: slow_log
  index_type: log
  request_timeout: 60

query_timeとlock_timeについてはスキーマを確認したところ、time型で
格納されていました。このままですとElasticsearchに投入した際に正確な
値が入らないので、embulk-filter-typecastを使用してlong型に変更
しました。
またRDSのタイムゾーンがJSTになっていて、embulkがUTCになっている場合に、
検索がうまくいかなかったり、Resumeの機能を利用した際に作成されるファイルの
時刻が巻き戻ってしまったりしたので、UTCとして、取り扱った上で、column_optionsでタイムゾーン+09:00を追加したstringに変換して、Elasticsearch側のmappingで時刻を合わせるように対応しました。

Elasticsearch側のmappingの設定

前述の理由により、start_timeのformatをtimestamp_formatで出力したformatに
合わせるように指定します。

slow_log.json
{
  "template": "slow_log-*",
  "mappings": {
    "log": {
      "properties": {
        "start_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ssZZ"
        },
        "user_host": {
          "type": "string",
          "index": "not_analyzed"
        },
        "query_time": {
          "type": "long"
        },
        "lock_time": {
          "type": "long"
        },
        "row_sent": {
          "type": "long"
        },
        "rows_examined": {
          "type": "long"
        },
        "db": {
          "type": "string",
          "index": "not_analyzed"
        },
        "last_insert_id": {
          "type": "long"
        },
        "insert_id": {
          "type": "long"
        },
        "server_id": {
          "type": "long"
        },
        "sql_text": {
          "type": "string",
          "index": "not_analyzed"
        },
        "thread_id": {
          "type": "long"
        }
      }
    }
  }
}

上記のslow_log.jsonをtemplateとしてElasticsearchに登録します。

$ curl -XPUT elasticsearch.example.com:9200/_template/slow_log -d "$(cat slow_log.json)"

typeは集計に使いそうなquery_timeとlock_timeをlong型にした以外は、
出来るだけmysqlのスキーマに合わせて設定しました。
以下がslow_logのスキーマになりますが、thread_idやserver_idあたりは、
活用方法がなさそうなので、string型でもいいのかもしれません。

mysql> desc mysql.slow_log;
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| Field          | Type                | Null | Key | Default           | Extra                       |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
| start_time     | timestamp           | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| user_host      | mediumtext          | NO   |     | NULL              |                             |
| query_time     | time                | NO   |     | NULL              |                             |
| lock_time      | time                | NO   |     | NULL              |                             |
| rows_sent      | int(11)             | NO   |     | NULL              |                             |
| rows_examined  | int(11)             | NO   |     | NULL              |                             |
| db             | varchar(512)        | NO   |     | NULL              |                             |
| last_insert_id | int(11)             | NO   |     | NULL              |                             |
| insert_id      | int(11)             | NO   |     | NULL              |                             |
| server_id      | int(10) unsigned    | NO   |     | NULL              |                             |
| sql_text       | mediumtext          | NO   |     | NULL              |                             |
| thread_id      | bigint(21) unsigned | NO   |     | NULL              |                             |
+----------------+---------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)

Embulkの実行

以下のコマンドで実行します。

$ embulk run config.yml -b ./

またembulk-input-mysqlではResumeをサポートしているため、以下のコマンドを
実行することで前回の実行時のレコード以降を取り込むことができます。

$ embulk run config.yml -c diff.yml -b ./

config.ymlにあるincremental: trueとincremental_columns: [start_time]の設定により、前回取り込まれたstart_time移行のデータが2回目以降で取り込まれるようになります。
diff.ymlには以下の内容が出力され、embulkが実行される度にlast_recordの
部分が更新されていきます。

diff.yml
in:
  last_record: ['2017-03-29T21:00:02.000000Z']
out: {}

日時でElasticsearchを管理するための工夫

embulk-output-elasticsearch_rubyではindex名に%Y%m%dを加えることで
Elasticsearchのindex名を日毎に変更することが出来ます。

config.yml
in:
 type: mysql
...
中略
...
out:
 type: elasticsearch_ruby
 mode: normal
 nodes:
 - {host: "elasticsearch.example.com", port: 9200 }
 index: slow_log-%Y%m%d
 index_type: log
 request_timeout: 60

上記のような記述にすることにより、slow_log-20170301のようなindex名で
データが投入されるので、mapping templateで設定したindex名に沿った状態で、
日毎にindexを分けることが出来ます。このようにすることで1週間前に投入
されたindexを削除することが容易になると思います。

参考記事

以下の記事を参考にさせて頂きました。ありがとうございます。
1.ELBのログをEmbulkでElasticsearchに入れたメモ
2.embulk mkbundle の使い方
3.Fluentdのバッチ版Embulk(エンバルク)のまとめ

5
4
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?