LoginSignup
11
1

More than 5 years have passed since last update.

embulkでLTSV形式のログをElasticsearchに投げる

Last updated at Posted at 2017-12-20

この記事は OPENLOGI Advent Calendar 2017- Qiita の21日目です。
明日公開日じゃんどうしようって状況で書き始めてるんで丁度仕事で今触ってる事を記事にしてみようと思います。というわけで今回はEmbulkについてです。
https://github.com/embulk/embulk

はじめに

みなさんはログの扱いをどうしてますかね。
弊社ではログの種類によって扱いは違ったりしてるんですが一部はリレーショナルDB(以下RDB)に直接保存しているものがあります。
でもRDBは結構辛いんです。行数増えても検索辛いしインデックス貼ってない列を検索しても辛いしでとにかく検索辛いんです。
世間的な流れとしてはFluentdで収集してElasticsearchに放り込むようなソリューションですかね。
というわけでそれに習いたいんですが、Fluentdのデーモン死活監視とかをやりたくないのでもうちょいライトなものは無いかということで目を付けたのがEmbulkでした。

Embulk(エンバルクと読むらしい)はFluendのバッチ処理版です。Fluentdがtail -fとかでファイル監視するのに対しEmblukはcronなどで単コマンドで動かす類のものです。というわけで評価がてら動かしてみます。

動作環境はOSはubuntu16.04です。
入力はLTSVで書かれたテキストファイルとします。

インストール

Embulkはjrubyで書かれてるようでJAVAが要るのでまずそれを入れておきます。

$ sudo apt-get update
$ sudo apt-get install default-jre

Embulk自体は公式ページの説明のとおりでインストールします。

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

インストール確認

$ embulk --version
embulk 0.8.39

入出力をプラグインで様々な環境やフォーマットに対応させるのはFluentdと同じですね。
今回は入力はLTSV、出力はElasticSearchを使うのでそれに対応するものを入れます。

embulk gem install embulk-parser-ltsv
embulk gem install embulk-output-elasticsearch

普通のgemと同じくbundleの仕掛けもあるので入れるモジュールは定義しておいた方がデプロイ工程とか考えるといいですね。

設定

インストールができたら次は設定します。
デフォルトは設定ファイル名は./config.ymlですがファイルパスは実行時にこんな感じで指定できます。

$ embulk run myconf.yml

入力

最初は頑張って設定ファイルをしこしこ書いてたんですが、ドキュメントをよく見ると入力ファイルから推測して設定ファイルを生成する機能もあるようです。

とりあえず最低限の雛形のseed.ymlを作ります。

in:
  type: file
  path_prefix: input
  parser:
    type: ltsv
exec: {}
out: {type: stdout}

input.logの中身はこんな感じとします。

time:2017-12-19 19:32:25    level:INFO  message:API Access  user:TestUser method:POST host:https://dummyhost/ path:/products/191304794148  headers:{"Access-Token":"1869a61d0aad6aed2008f0b31652c004"}   query:NULL  body:[] status_code:200 response:{"id":"191304794148","name":"item01","quantity":1}

入力ファイルから設定ファイルを推測でconfig.ymlを生成させます。

$ embulk guess -g ltsv seed.yml -o config.yml
2017-12-20 09:02:27.829 +0000: Embulk v0.8.39

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

2017-12-20 09:02:31.665 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 09:02:31.666 +0000 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 09:02:31.670 +0000 [INFO] (0001:guess): Loading files [input.log]
2017-12-20 09:02:31.682 +0000 [INFO] (0001:guess): Try to read 32,768 bytes from input source
2017-12-20 09:02:31.839 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2017-12-20 09:02:31.856 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2017-12-20 09:02:31.881 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2017-12-20 09:02:31.892 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
2017-12-20 09:02:31.975 +0000 [INFO] (0001:guess): Loaded plugin embulk-parser-ltsv (0.1.1)
in:
  type: file
  path_prefix: input
  parser:
    type: ltsv
    charset: UTF-8
    newline: LF
    schema:
    - {name: time, type: timestamp}
    - {name: level, type: string}
    - {name: message, type: string}
    - {name: user, type: string}
    - {name: method, type: string}
    - {name: host, type: string}
    - {name: path, type: string}
    - {name: headers, type: string}
    - {name: query, type: string}
    - {name: body, type: string}
    - {name: status_code, type: long}
    - {name: response, type: string}
exec: {}
out: {type: stdout}

Created 'config.yml' file.

大体完璧なconfigが出来上がってる。凄い。
出力はとりあえずstdoutにしてるので試しに一度実行してみましょう。

$ embulk run config.yml 
2017-12-20 09:03:20.740 +0000: Embulk v0.8.39

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

2017-12-20 09:03:28.646 +0000 [INFO] (0001:transaction): Loaded plugin embulk-parser-ltsv (0.1.1)
2017-12-20 09:03:28.670 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 09:03:28.671 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 09:03:28.680 +0000 [INFO] (0001:transaction): Loading files [input.log]
2017-12-20 09:03:28.791 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2017-12-20 09:03:28.812 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-12-19 19:32:25.000000 +0000,INFO,API Access,TestUser,POST,https://dummyhost/,/products/191304794148,{"Access-Token":"1869a61d0aad6aed2008f0b31652c004"},NULL,[],200,{"id":"191304794148","name":"item01","quantity":1}
2017-12-20 09:03:29.265 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-12-20 09:03:29.271 +0000 [INFO] (main): Committed.
2017-12-20 09:03:29.272 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}}

t2.microのEC2上でのテストでは起動が結構遅い。timeコマンドで計測してみる。

real    0m8.875s
user    0m8.636s
sys 0m0.208s

うーん・・・ちょっと心配。

最後の行に2017-12-20 09:03:29.272 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}} と前回処理したファイル名が返ってきていますね。
これは実行時にオプションを付けると読み込み時の設定ファイルに追記してくれるようです。

$ embulk run config.yml -o config.yml
$ cat config.yml
in:
  type: file
  path_prefix: input
  parser:
    type: ltsv
    charset: UTF-8
    newline: LF
    schema:
    - {name: time, type: timestamp}
    - {name: level, type: string}
    - {name: message, type: string}
    - {name: user, type: string}
    - {name: method, type: string}
    - {name: host, type: string}
    - {name: path, type: string}
    - {name: headers, type: string}
    - {name: query, type: string}
    - {name: body, type: string}
    - {name: status_code, type: long}
    - {name: response, type: string}
  last_path: input.log
exec: {}
out: {type: stdout}

loglotateさせる場合はこの手のオプションが必要そうですね。

※ 注) -oオプションは現在、非推奨で-cが推奨だそうです。詳しくはこちらを参照ください。
https://qiita.com/hiroysato/items/3552366ddf7d29bf7829

出力

出力はElasticsearchを使う予定なので、とりあえずlocalに構築しておきます。
私はこの辺を参考にインストールしました。AWS EC2のt2.microで動かしたらメモリ不足で起動してくれず泣く泣くt2.mediumまで上げてやりました。
http://mamori017.hatenablog.com/entry/2017/04/19/103530

出力設定を追加してやります。mappingとかはひとまず捨て置きます。
まだloglotateとかしてないのでlast_pathを書いてると入力の対象ファイルが無いことになってしまうのでlast_pathは除外しときます。

in:
  type: file
  path_prefix: input
  parser:
    type: ltsv
    charset: UTF-8
    newline: LF
    schema:
    - {name: time, type: timestamp}
    - {name: level, type: string}
    - {name: message, type: string}
    - {name: user, type: string}
    - {name: method, type: string}
    - {name: host, type: string}
    - {name: path, type: string}
    - {name: headers, type: string}
    - {name: query, type: string}
    - {name: body, type: string}
    - {name: status_code, type: long}
    - {name: response, type: string}
exec: {}
out:
  type: elasticsearch
  nodes:
    - {host: localhost, port: 9200}
  index: access_log
  index_type: main

試しに実行してElasticsearchに突っ込んでやりましょう。

$ embulk run config.yml 
2017-12-20 10:28:45.728 +0000: Embulk v0.8.39

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

2017-12-20 10:28:49.796 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-elasticsearch (0.4.5)
2017-12-20 10:28:49.822 +0000 [INFO] (0001:transaction): Loaded plugin embulk-parser-ltsv (0.1.1)
2017-12-20 10:28:49.830 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 10:28:49.831 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 10:28:49.835 +0000 [INFO] (0001:transaction): Loading files [input.log]
2017-12-20 10:28:49.871 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2017-12-20 10:28:49.888 +0000 [INFO] (0001:transaction): Logging initialized @4287ms
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Connecting to Elasticsearch version:5.3.0
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Executing plugin with 'insert' mode.
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Inserting data into index[access_log]
2017-12-20 10:28:50.116 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-12-20 10:28:50.440 +0000 [INFO] (0023:task-0000): Inserted 1 records
2017-12-20 10:28:50.441 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-12-20 10:28:50.442 +0000 [INFO] (0001:transaction): Insert completed. 1 records
2017-12-20 10:28:50.445 +0000 [INFO] (main): Committed.
2017-12-20 10:28:50.445 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}}

1 record突っ込んでくれたらしいです。

結果を確認します。

$ curl "http://localhost:9200/access_log/_search?pretty"
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "access_log",
        "_type" : "site_main",
        "_id" : "AWBzfXSGnjxb2Z4XCpf1",
        "_score" : 1.0,
        "_source" : {
          "time" : "2017-12-19T19:32:25.000+0000",
          "level" : "INFO",
          "message" : "API Access",
          "user" : "TestUser",
          "method" : "POST",
          "host" : "https://dummyhost/",
          "path" : "/products/191304794148",
          "headers" : "{\"Access-Token\":\"1869a61d0aad6aed2008f0b31652c004\"}",
          "query" : "NULL",
          "body" : "[]",
          "status_code" : 200,
          "response" : "{\"id\":\"191304794148\",\"name\":\"item01\",\"quantity\":1}"
        }
      }
    ]
  }
}

なんかできてます。

終わりに

単に動かしてみた系じゃなくてもう少し突っ込んだ内容にしたかったのですがEmbulkが想像以上にシンプルであっさりした記事になってしまいました。Fluentdとか触ったのは結構昔ですが妙なWEB UIからの設定が上手く行かずにハマった記憶があるのですが大分お手軽でいいですね。

しかしながらログパース処理がもしかしたらサーバーの本業の処理を圧迫するやも、とか考えると一旦S3などのストレージに上げて別のEC2やLambdaが処理をするような疎結合な設計をすべきかなと思ったりもしました。

11
1
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
1