この記事は OPENLOGI Advent Calendar 2017- Qiita の21日目です。
明日公開日じゃんどうしようって状況で書き始めてるんで丁度仕事で今触ってる事を記事にしてみようと思います。というわけで今回はEmbulkについてです。
https://github.com/embulk/embulk
はじめに
みなさんはログの扱いをどうしてますかね。
弊社ではログの種類によって扱いは違ったりしてるんですが一部はリレーショナルDB(以下RDB)に直接保存しているものがあります。
でもRDBは結構辛いんです。行数増えても検索辛いしインデックス貼ってない列を検索しても辛いしでとにかく検索辛いんです。
世間的な流れとしてはFluentdで収集してElasticsearchに放り込むようなソリューションですかね。
というわけでそれに習いたいんですが、Fluentdのデーモン死活監視とかをやりたくないのでもうちょいライトなものは無いかということで目を付けたのがEmbulkでした。
Embulk(エンバルクと読むらしい)はFluendのバッチ処理版です。Fluentdがtail -f
とかでファイル監視するのに対しEmblukはcronなどで単コマンドで動かす類のものです。というわけで評価がてら動かしてみます。
動作環境はOSはubuntu16.04です。
入力はLTSVで書かれたテキストファイルとします。
インストール
Embulkはjrubyで書かれてるようでJAVAが要るのでまずそれを入れておきます。
$ sudo apt-get update
$ sudo apt-get install default-jre
Embulk自体は公式ページの説明のとおりでインストールします。
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
インストール確認
$ embulk --version
embulk 0.8.39
入出力をプラグインで様々な環境やフォーマットに対応させるのはFluentdと同じですね。
今回は入力はLTSV、出力はElasticSearchを使うのでそれに対応するものを入れます。
embulk gem install embulk-parser-ltsv
embulk gem install embulk-output-elasticsearch
普通のgemと同じくbundleの仕掛けもあるので入れるモジュールは定義しておいた方がデプロイ工程とか考えるといいですね。
設定
インストールができたら次は設定します。
デフォルトは設定ファイル名は./config.yml
ですがファイルパスは実行時にこんな感じで指定できます。
$ embulk run myconf.yml
入力
最初は頑張って設定ファイルをしこしこ書いてたんですが、ドキュメントをよく見ると入力ファイルから推測して設定ファイルを生成する機能もあるようです。
とりあえず最低限の雛形のseed.ymlを作ります。
in:
type: file
path_prefix: input
parser:
type: ltsv
exec: {}
out: {type: stdout}
input.logの中身はこんな感じとします。
time:2017-12-19 19:32:25 level:INFO message:API Access user:TestUser method:POST host:https://dummyhost/ path:/products/191304794148 headers:{"Access-Token":"1869a61d0aad6aed2008f0b31652c004"} query:NULL body:[] status_code:200 response:{"id":"191304794148","name":"item01","quantity":1}
入力ファイルから設定ファイルを推測でconfig.ymlを生成させます。
$ embulk guess -g ltsv seed.yml -o config.yml
2017-12-20 09:02:27.829 +0000: Embulk v0.8.39
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
2017-12-20 09:02:31.665 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 09:02:31.666 +0000 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 09:02:31.670 +0000 [INFO] (0001:guess): Loading files [input.log]
2017-12-20 09:02:31.682 +0000 [INFO] (0001:guess): Try to read 32,768 bytes from input source
2017-12-20 09:02:31.839 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2017-12-20 09:02:31.856 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2017-12-20 09:02:31.881 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2017-12-20 09:02:31.892 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
2017-12-20 09:02:31.975 +0000 [INFO] (0001:guess): Loaded plugin embulk-parser-ltsv (0.1.1)
in:
type: file
path_prefix: input
parser:
type: ltsv
charset: UTF-8
newline: LF
schema:
- {name: time, type: timestamp}
- {name: level, type: string}
- {name: message, type: string}
- {name: user, type: string}
- {name: method, type: string}
- {name: host, type: string}
- {name: path, type: string}
- {name: headers, type: string}
- {name: query, type: string}
- {name: body, type: string}
- {name: status_code, type: long}
- {name: response, type: string}
exec: {}
out: {type: stdout}
Created 'config.yml' file.
大体完璧なconfigが出来上がってる。凄い。
出力はとりあえずstdoutにしてるので試しに一度実行してみましょう。
$ embulk run config.yml
2017-12-20 09:03:20.740 +0000: Embulk v0.8.39
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
2017-12-20 09:03:28.646 +0000 [INFO] (0001:transaction): Loaded plugin embulk-parser-ltsv (0.1.1)
2017-12-20 09:03:28.670 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 09:03:28.671 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 09:03:28.680 +0000 [INFO] (0001:transaction): Loading files [input.log]
2017-12-20 09:03:28.791 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2017-12-20 09:03:28.812 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2017-12-19 19:32:25.000000 +0000,INFO,API Access,TestUser,POST,https://dummyhost/,/products/191304794148,{"Access-Token":"1869a61d0aad6aed2008f0b31652c004"},NULL,[],200,{"id":"191304794148","name":"item01","quantity":1}
2017-12-20 09:03:29.265 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2017-12-20 09:03:29.271 +0000 [INFO] (main): Committed.
2017-12-20 09:03:29.272 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}}
t2.microのEC2上でのテストでは起動が結構遅い。timeコマンドで計測してみる。
real 0m8.875s
user 0m8.636s
sys 0m0.208s
うーん・・・ちょっと心配。
最後の行に2017-12-20 09:03:29.272 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}}
と前回処理したファイル名が返ってきていますね。
これは実行時にオプションを付けると読み込み時の設定ファイルに追記してくれるようです。
$ embulk run config.yml -o config.yml
$ cat config.yml
in:
type: file
path_prefix: input
parser:
type: ltsv
charset: UTF-8
newline: LF
schema:
- {name: time, type: timestamp}
- {name: level, type: string}
- {name: message, type: string}
- {name: user, type: string}
- {name: method, type: string}
- {name: host, type: string}
- {name: path, type: string}
- {name: headers, type: string}
- {name: query, type: string}
- {name: body, type: string}
- {name: status_code, type: long}
- {name: response, type: string}
last_path: input.log
exec: {}
out: {type: stdout}
loglotateさせる場合はこの手のオプションが必要そうですね。
※ 注) -o
オプションは現在、非推奨で-c
が推奨だそうです。詳しくはこちらを参照ください。
https://qiita.com/hiroysato/items/3552366ddf7d29bf7829
出力
出力はElasticsearchを使う予定なので、とりあえずlocalに構築しておきます。
私はこの辺を参考にインストールしました。AWS EC2のt2.microで動かしたらメモリ不足で起動してくれず泣く泣くt2.mediumまで上げてやりました。
http://mamori017.hatenablog.com/entry/2017/04/19/103530
出力設定を追加してやります。mappingとかはひとまず捨て置きます。
まだloglotateとかしてないのでlast_pathを書いてると入力の対象ファイルが無いことになってしまうのでlast_pathは除外しときます。
in:
type: file
path_prefix: input
parser:
type: ltsv
charset: UTF-8
newline: LF
schema:
- {name: time, type: timestamp}
- {name: level, type: string}
- {name: message, type: string}
- {name: user, type: string}
- {name: method, type: string}
- {name: host, type: string}
- {name: path, type: string}
- {name: headers, type: string}
- {name: query, type: string}
- {name: body, type: string}
- {name: status_code, type: long}
- {name: response, type: string}
exec: {}
out:
type: elasticsearch
nodes:
- {host: localhost, port: 9200}
index: access_log
index_type: main
試しに実行してElasticsearchに突っ込んでやりましょう。
$ embulk run config.yml
2017-12-20 10:28:45.728 +0000: Embulk v0.8.39
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
2017-12-20 10:28:49.796 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-elasticsearch (0.4.5)
2017-12-20 10:28:49.822 +0000 [INFO] (0001:transaction): Loaded plugin embulk-parser-ltsv (0.1.1)
2017-12-20 10:28:49.830 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'input'
2017-12-20 10:28:49.831 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-12-20 10:28:49.835 +0000 [INFO] (0001:transaction): Loading files [input.log]
2017-12-20 10:28:49.871 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2017-12-20 10:28:49.888 +0000 [INFO] (0001:transaction): Logging initialized @4287ms
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Connecting to Elasticsearch version:5.3.0
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Executing plugin with 'insert' mode.
2017-12-20 10:28:50.109 +0000 [INFO] (0001:transaction): Inserting data into index[access_log]
2017-12-20 10:28:50.116 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2017-12-20 10:28:50.440 +0000 [INFO] (0023:task-0000): Inserted 1 records
2017-12-20 10:28:50.441 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2017-12-20 10:28:50.442 +0000 [INFO] (0001:transaction): Insert completed. 1 records
2017-12-20 10:28:50.445 +0000 [INFO] (main): Committed.
2017-12-20 10:28:50.445 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"input.log"},"out":{}}
1 record突っ込んでくれたらしいです。
結果を確認します。
$ curl "http://localhost:9200/access_log/_search?pretty"
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "access_log",
"_type" : "site_main",
"_id" : "AWBzfXSGnjxb2Z4XCpf1",
"_score" : 1.0,
"_source" : {
"time" : "2017-12-19T19:32:25.000+0000",
"level" : "INFO",
"message" : "API Access",
"user" : "TestUser",
"method" : "POST",
"host" : "https://dummyhost/",
"path" : "/products/191304794148",
"headers" : "{\"Access-Token\":\"1869a61d0aad6aed2008f0b31652c004\"}",
"query" : "NULL",
"body" : "[]",
"status_code" : 200,
"response" : "{\"id\":\"191304794148\",\"name\":\"item01\",\"quantity\":1}"
}
}
]
}
}
なんかできてます。
終わりに
単に動かしてみた系じゃなくてもう少し突っ込んだ内容にしたかったのですがEmbulkが想像以上にシンプルであっさりした記事になってしまいました。Fluentdとか触ったのは結構昔ですが妙なWEB UIからの設定が上手く行かずにハマった記憶があるのですが大分お手軽でいいですね。
しかしながらログパース処理がもしかしたらサーバーの本業の処理を圧迫するやも、とか考えると一旦S3などのストレージに上げて別のEC2やLambdaが処理をするような疎結合な設計をすべきかなと思ったりもしました。