はじめに
fluentd(td-agent 2.3.5)でnginxアクセスログ(ltsv形式)をAWS Elasticserviceに投げつつ
forestプラグインを使い、同時にS3にも保存しています。
このS3に保存したログを別のESドメインに投入する必要があって、ちょっとつまづいたので
その方法をメモっておきます。
以降、便宜上 運用中のESを現ES
、S3上のログを投入する先を新ES
と書きます。
前提
td-agentのs3に保存している部分の設定抜粋
〜省略〜
<match *.access.**>
@type forest
subtype copy
<template>
<store>
@type "aws-elasticsearch-service"
〜省略〜
</store>
# Amazon S3
<store>
@type s3
s3_region ap-northeast-1
s3_bucket hogehoge-logs
path web/${tag}/
<instance_profile_credentials>
retries 5
</instance_profile_credentials>
format ltsv
include_time_key true
buffer_type file
buffer_path /var/log/td-agent/buffer/s3-${tag}
time_slice_format %Y/%m/%d/${tag}.%Y%m%d%H
time_slice_wait 10m
buffer_chunk_limit 1g
buffer_queue_limit 1024
retry_limit 16
retry_wait 1s
</store>
</template>
</match>
〜省略〜
なのでオブジェクトキーは、web/nginx.access/2018/04/18/nginx.access.2018040402_1.gz
のようなパスで、1時間ごとに新しいファイル名で保存されています。
以下、参考までにログを1行のみ抜粋
host:126.113.254.90 vhost:~~~~ method:GET uri:/ query:~~~~ referer:https://~~~~/?~~~~ ua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36 reqsize:1835 apptime:0.186 reqtime:0.186 status:200 size:9282 hostname:~~~~ time:2018-04-04T02:51:13Z
やりたいこと
- Embulkで直接s3上からgzipされたltsvログ取得して新ESに取り込みたい
- 日時は、ltsvではtimeフィールドだが現ES上は @timestamp というフィールド名になっている
- 新ESに取り込む際も @timestamp というフィールド名で取り込みたい
- 現ESでは、 nginx_access-YYYY.mm.ddという日付ごとにインデックスが別れている
- 新ESに取り込む際も、日付ごとにインデックスを分けたい
- ちなみにembulk-output-elasticsearch_rubyを使えば nginx_access-%Y.%m.%d
という日付をインデックス名につけてくれるが、実行した日付で作られるだけ
なのでやりたいこととは違う
手順
環境用意
実行環境は以下の通り
- CentOS7.4
- Elasticsearch6.2
- AWS Elasticsearc Serviceを利用
- ドメイン名:es_host
- エンドポイントを内部DNSにCNAME登録
- ポートは9200ではなく80になります
-
~/.aws/credentials
に予めS3/ESアクセス権限を持ったアクセスキーを登録
Embulk導入
$ sudo yum -y install java-1.8.0-openjdk
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ export PATH="$HOME/.embulk/bin:$PATH" # .bashrcに書いてもOK
$ embulk --version
embulk 0.9.7
$ embulk selfupdate # 念のため最新化
2018-04-17 06:14:42.036 +0000: Embulk v0.9.7
Checking the latest version...
Already up-to-date. 0.9.7 is the latest version.
# 必要なプラグインを導入
$ embulk gem install embulk-input-s3
$ embulk gem install embulk-parser-ltsv
$ embulk gem install embulk-output-elasticsearch_ruby
$ embulk gem install embulk-filter-column
日付文字列をdate型にするマッピング定義を設定
Embulkで取り込む際にtimeフィールドをtimestamp形式で取り込むことができなかったので
一旦string型で新ESに投入し、下記マッピングによりdate型に変換する
$ cat << _EOS_ > template.json
{
"index_patterns": ["nginx_access-*"],
"mappings": {
"tmpl1": {
"properties": {
"@timestamp": {
"type": "date"
}
}
}
}
}
_EOS_
$ curl -XPUT -H "Content-Type: application/json" "es_host:80/_template/tmpl1?pretty" -d "$(cat template.json)"
{"acknowledged":true}
embulk設定ファイル作成
後述するスクリプトで指定する環境変数を使えるようにliquid形式で作成する
日別インデックス名、日別のS3ログを環境変数で指定することにした
$ cat << _EOS_ > nginx_access.yml.liquid
in:
type: s3
bucket: hogehoge-logs
path_prefix: web/{{ env.TAG }}/{{ env.YEAR }}/{{ env.MONTH }}/{{ env.DAY }}/
endpoint: s3-ap-northeast-1.amazonaws.com
auth_method: profile
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: LF
type: ltsv
null_value_pattern: ^(-|)$
delimiter: "\t"
quote: '"'
escape: '\'
default_timezone: Asia/Tokyo
schema:
- {name: host, type: string}
- {name: vhost, type: string}
- {name: method, type: string}
- {name: uri, type: string}
- {name: query, type: string}
- {name: referer, type: string}
- {name: ua, type: string}
- {name: reqsize, type: long}
- {name: apptime, type: double}
- {name: reqtime, type: double}
- {name: status, type: long}
- {name: size, type: long}
- {name: hostname, type: string}
- {name: time, type: string}
exec: {}
filters:
# ここでフィールド名をtimeから@timestampにリネーム
- type: rename
columns:
time: '@timestamp'
- type: column
add_columns:
- {name: '@tag', type: string, default: {{ env.TAG }}}
out:
type: elasticsearch_ruby
nodes:
- {host: {{ env.ES_HOST }}, port: {{ env.ES_PORT }}}
# ここで日別インデックス名を指定
index: {{ env.INDEX }}-{{ env.YEAR }}.{{ env.MONTH }}.{{ env.DAY }}
index_type: {{ env.INDEX }}
_EOS_
日付ごとにインデックスを作成しながらログ投入するスクリプト
$ cat << _EOS_ > run_embulk_with_dailyindex.sh
#!/bin/bash
export ES_HOST=es_host
export ES_PORT=80
export TAG=nginx.access
export INDEX=\${TAG//\./_}
# 何日前のログから現在までを取得するかを指定
FROM_DAYNUM=60
for daynum in \$(seq \${FROM_DAYNUM} -1 1)
do
export YEAR=\$(date -d "00:00 \${daynum}days ago" '+%Y')
export MONTH=\$(date -d "00:00 \${daynum}days ago" '+%m')
export DAY=\$(date -d "00:00 \${daynum}days ago" '+%d')
embulk run \${INDEX}.yml.liquid
done
_EOS_
注)このやり方だと、インデックス名の日時、td-agent処理日時、ログファイル名の日時、
などのタイムゾーンやタイミングの違いにより、インデックスの日付とログレコードの時間が
一致しない場合があるかも
いくつかのログ見た感じだと、大丈夫そう。
例えば nginx.access.2018041723_0.gzのログには4/17 23:00〜23:59のログが
記録されている。
あと、ESのインデックス名[nginx_access-2018.04.17]などの日付はUTCを元に作られ
ているので(表示上はJSTなので)ズレているように見えるけど、こういうもの。
まあ、いずれにしても今回はそこまで厳密な要件ではないので、気にしない。
ログ投入
$ bash ./run_embulk_with_dailyindex.sh
# 後始末:マッピング定義削除
$ curl -XDELETE -H "Content-Type: application/json" "es_host:80/_template/tmpl1
{"acknowledged":true}
まとめ
Embulk、便利で使い方も簡単でした。プラグインも豊富で、liquid形式で動的に指定したり、
と柔軟性もありますね。
ただ、軽く調べましたが、今回やりたいことに対応したプラグインが探せなかったので
スクリプト作って対応することにしました。誰かの参考になれば幸いです。