LoginSignup
3
1

More than 5 years have passed since last update.

S3上のltsv形式ログをEmbulkでElasticsearchに取り込む

Last updated at Posted at 2018-04-18

はじめに

fluentd(td-agent 2.3.5)でnginxアクセスログ(ltsv形式)をAWS Elasticserviceに投げつつ
forestプラグインを使い、同時にS3にも保存しています。
このS3に保存したログを別のESドメインに投入する必要があって、ちょっとつまづいたので
その方法をメモっておきます。
以降、便宜上 運用中のESを現ES、S3上のログを投入する先を新ESと書きます。

前提

td-agentのs3に保存している部分の設定抜粋

td-agent.conf
〜省略〜
<match *.access.**>
  @type forest
  subtype copy
  <template>
    <store>
      @type "aws-elasticsearch-service"
            〜省略〜
    </store>
    # Amazon S3
    <store>
      @type s3
      s3_region ap-northeast-1
      s3_bucket hogehoge-logs
      path web/${tag}/
      <instance_profile_credentials>
        retries 5
      </instance_profile_credentials>
      format ltsv
      include_time_key true

      buffer_type file
      buffer_path /var/log/td-agent/buffer/s3-${tag}
      time_slice_format %Y/%m/%d/${tag}.%Y%m%d%H
      time_slice_wait 10m
      buffer_chunk_limit 1g
      buffer_queue_limit 1024
      retry_limit 16
      retry_wait 1s
    </store>
  </template>
</match>
〜省略〜

なのでオブジェクトキーは、web/nginx.access/2018/04/18/nginx.access.2018040402_1.gz
のようなパスで、1時間ごとに新しいファイル名で保存されています。
以下、参考までにログを1行のみ抜粋

nginx.access.2018040402_1.gz
host:126.113.254.90 vhost:~~~~  method:GET  uri:/   query:~~~~  referer:https://~~~~/?~~~~  ua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36    reqsize:1835    apptime:0.186   reqtime:0.186   status:200  size:9282   hostname:~~~~   time:2018-04-04T02:51:13Z

やりたいこと

  • Embulkで直接s3上からgzipされたltsvログ取得して新ESに取り込みたい
  • 日時は、ltsvではtimeフィールドだが現ES上は @timestamp というフィールド名になっている
    • 新ESに取り込む際も @timestamp というフィールド名で取り込みたい
  • 現ESでは、 nginx_access-YYYY.mm.ddという日付ごとにインデックスが別れている
    • 新ESに取り込む際も、日付ごとにインデックスを分けたい
    • ちなみにembulk-output-elasticsearch_rubyを使えば nginx_access-%Y.%m.%d
      という日付をインデックス名につけてくれるが、実行した日付で作られるだけ
      なのでやりたいこととは違う

手順

環境用意

実行環境は以下の通り

  • CentOS7.4
  • Elasticsearch6.2
    • AWS Elasticsearc Serviceを利用
    • ドメイン名:es_host
      • エンドポイントを内部DNSにCNAME登録
      • ポートは9200ではなく80になります
  • ~/.aws/credentialsに予めS3/ESアクセス権限を持ったアクセスキーを登録

Embulk導入

$ sudo yum -y install java-1.8.0-openjdk
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" 
$ chmod +x ~/.embulk/bin/embulk
$ export PATH="$HOME/.embulk/bin:$PATH"  # .bashrcに書いてもOK
$ embulk --version
embulk 0.9.7
$ embulk selfupdate  # 念のため最新化
2018-04-17 06:14:42.036 +0000: Embulk v0.9.7
Checking the latest version...
Already up-to-date. 0.9.7 is the latest version.
# 必要なプラグインを導入
$ embulk gem install embulk-input-s3
$ embulk gem install embulk-parser-ltsv
$ embulk gem install embulk-output-elasticsearch_ruby
$ embulk gem install embulk-filter-column

日付文字列をdate型にするマッピング定義を設定

Embulkで取り込む際にtimeフィールドをtimestamp形式で取り込むことができなかったので
一旦string型で新ESに投入し、下記マッピングによりdate型に変換する

$ cat << _EOS_ > template.json
{
  "index_patterns": ["nginx_access-*"],
  "mappings": {
    "tmpl1": {
      "properties": {
        "@timestamp": {
          "type": "date" 
        }
      }
    }
  }
}
_EOS_
$ curl -XPUT -H "Content-Type: application/json" "es_host:80/_template/tmpl1?pretty" -d "$(cat template.json)" 
{"acknowledged":true}

embulk設定ファイル作成

後述するスクリプトで指定する環境変数を使えるようにliquid形式で作成する
日別インデックス名、日別のS3ログを環境変数で指定することにした

$ cat << _EOS_ > nginx_access.yml.liquid
in:
  type: s3
  bucket: hogehoge-logs
  path_prefix: web/{{ env.TAG }}/{{ env.YEAR }}/{{ env.MONTH }}/{{ env.DAY }}/
  endpoint: s3-ap-northeast-1.amazonaws.com
  auth_method: profile
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: ltsv
    null_value_pattern: ^(-|)$
    delimiter: "\t" 
    quote: '"'
    escape: '\'
    default_timezone: Asia/Tokyo
    schema:
    - {name: host, type: string}
    - {name: vhost, type: string}
    - {name: method, type: string}
    - {name: uri, type: string}
    - {name: query, type: string}
    - {name: referer, type: string}
    - {name: ua, type: string}
    - {name: reqsize, type: long}
    - {name: apptime, type: double}
    - {name: reqtime, type: double}
    - {name: status, type: long}
    - {name: size, type: long}
    - {name: hostname, type: string}
    - {name: time, type: string}
exec: {}
filters:
  # ここでフィールド名をtimeから@timestampにリネーム
  - type: rename
    columns:
      time: '@timestamp'
  - type: column
    add_columns:
      - {name: '@tag', type: string, default: {{ env.TAG }}}
out:
  type: elasticsearch_ruby
  nodes:
    - {host: {{ env.ES_HOST }}, port: {{ env.ES_PORT }}}
  # ここで日別インデックス名を指定
  index: {{ env.INDEX }}-{{ env.YEAR }}.{{ env.MONTH }}.{{ env.DAY }}
  index_type: {{ env.INDEX }}
_EOS_

日付ごとにインデックスを作成しながらログ投入するスクリプト

$ cat << _EOS_ > run_embulk_with_dailyindex.sh
#!/bin/bash

export ES_HOST=es_host
export ES_PORT=80
export TAG=nginx.access
export INDEX=\${TAG//\./_}

# 何日前のログから現在までを取得するかを指定
FROM_DAYNUM=60

for daynum in \$(seq \${FROM_DAYNUM} -1 1)
do
  export YEAR=\$(date -d "00:00 \${daynum}days ago" '+%Y')
  export MONTH=\$(date -d "00:00 \${daynum}days ago" '+%m')
  export DAY=\$(date -d "00:00 \${daynum}days ago" '+%d')
  embulk run \${INDEX}.yml.liquid
done
_EOS_

注)このやり方だと、インデックス名の日時、td-agent処理日時、ログファイル名の日時、
などのタイムゾーンやタイミングの違いにより、インデックスの日付とログレコードの時間が
一致しない場合があるかも

いくつかのログ見た感じだと、大丈夫そう。
例えば nginx.access.2018041723_0.gzのログには4/17 23:00〜23:59のログが
記録されている。
あと、ESのインデックス名[nginx_access-2018.04.17]などの日付はUTCを元に作られ
ているので(表示上はJSTなので)ズレているように見えるけど、こういうもの。
まあ、いずれにしても今回はそこまで厳密な要件ではないので、気にしない。

ログ投入

$ bash ./run_embulk_with_dailyindex.sh

# 後始末:マッピング定義削除
$ curl -XDELETE -H "Content-Type: application/json" "es_host:80/_template/tmpl1
{"acknowledged":true}

まとめ

Embulk、便利で使い方も簡単でした。プラグインも豊富で、liquid形式で動的に指定したり、
と柔軟性もありますね。
ただ、軽く調べましたが、今回やりたいことに対応したプラグインが探せなかったので
スクリプト作って対応することにしました。誰かの参考になれば幸いです。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1