はじめに
アプリケーションを長く運用していると、ユーザーの操作履歴など、データベースに保存するデータ以外に、操作ログなどを取らなければいけないケースが出てきます。
今更ではありますが、fluentdを使ってmonologから出力されたログを集約してみます。
その1 Fluent-Logger-PHPで直接fluentdにログを送る
新たにログを取る場合は、いったんテキストファイルに出力したものを集計するのではなく、直接、fluentdにログを送りつけてもいいかもしれません。
構成
- アプリ側は、monolog-fluent-handlerを使ってログを出力
- アプリから出力されたログを、in_forwardで受信
- record_transformerフィルタで、レコードを整形する
- sqlite3にフィルタしたデータをINSERTする
monolog-fluent-handlerを使ってログを出力
monolog
とmonolog-fluent-handler
をrequireします。
$ composer require "monolog/monolog:1.*"
$ composer require "dakatsuka/monolog-fluent-handler"
{
"require": {
"monolog/monolog": "1.*",
"dakatsuka/monolog-fluent-handler": "^1.2"
}
}
<?php
require_once __DIR__.'/../vendor/autoload.php';
use Dakatsuka\MonologFluentHandler\FluentHandler;
use Monolog\Logger;
// Global settings
setlocale(LC_ALL, 'ja_JP.UTF-8');
date_default_timezone_set('Asia/Tokyo');
$logger = new Logger('example');
$logger->pushHandler(new FluentHandler());
$logger->debug('fluent', ['message' => 'foo bar']);
ポイント
-
FluentHandlerの引数を省略すると、FluentLogger(fluent-logger-php)が使用される
-
Loggerのチャンネル名と、ログメッセージが組み合わされて、Tag名になる
public function write(array $record) { $tag = $record['channel'] . '.' . $record['message']; $data = $record['context']; $data['level'] = Logger::getLevelName($record['level']); $this->logger->post($tag, $data); }
参考URL
- Monologのログ出力先をFluentdに変更してみた | dakatsuka's blog
- fluent/fluent-logger-php: A structured logger for Fluentd (PHP)
アプリから出力されたログを、in_forwardで受信
<source>
@type forward
</source>
参考URL
record_transformerフィルタで、レコードを整形する
<filter example.fluent>
@type record_transformer
renew_record true
enable_ruby true
<record>
level ${level}
message ${message}
handler fluent
created_at ${time.to_datetime.strftime('%Y-%m-%d %H:%M:%S')}
</record>
</filter>
ポイント
- monologで言う$contextがfluent-logger-phpでは、メインのデータとして扱われる
-
renew_record true
とすると、<record></record>
で定義されたアイテムのみとなる、falseまたは未定義だと、<record></record>
で定義されたアイテムは、すでにあるデータに追加される -
enable_ruby true
とすると、rubyの組み込み関数を使うことができる
参考URL
sqlite3にフィルタしたデータをINSERTする
<match example.fluent>
@type sqlite3
path /var/db/td-agent/example.sqlite3
table log
columns level,message,handler,created_at
</match>
fluent-plugin-sqlite3のインストール
集めたログをSQLiteに保存します。
$ yum install -y sqlite-devel
$ td-agent-gem install fluent-plugin-sqlite3
事前にデータベースを作成
$ sqlite3 /path/to/database.sqlite3 < /path/to/create_tables.sql
create table if not exists log
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
level TEXT,
message TEXT,
handler TEXT,
created_at TIMESTAMP DEFAULT (datetime('now','localtime'))
);
ポイント
- columnsに入力必要な列のデータを列挙する、データベースの列名と合わせる必要がある
参考URL
その2 StreamHandlerで出力したテキストログを集約する
すでにテキストとして出力されているログを整形して、集約してみます。
構成
- アプリ側は、StreamHandlerを使ってログをテキストファイルに出力
- アプリから出力されたログを、in_tailで収集
正規表現でざっくり要素分解 - カスタムパーサー(parser_monolog.rb)を追加、収集したログをパース
parserプラグインで、json部分をパース- record_transformerフィルタで、レコードを整形する
- sqlite3にフィルタしたデータをINSERTする
StreamHandlerを使ってログをテキストファイルに出力
<?php
require_once __DIR__.'/../vendor/autoload.php';
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
// Global settings
setlocale(LC_ALL, 'ja_JP.UTF-8');
date_default_timezone_set('Asia/Tokyo');
$logger = new Logger('example');
$logger->pushHandler(new StreamHandler(__DIR__.'/../var/log/example_stream.log', Logger::DEBUG, true, 0777));
$logger->debug('stream', ['message' => 'foo bar']);
出力されたログファイルは下記のようになります。
[2016-07-06 19:02:28] example.DEBUG: stream {"message":"foo bar"} []
アプリから出力されたログを、in_tailで収集 正規表現でざっくり要素分解
<source>
@type tail
path /data/var/log/example_stream.log
#format /^\[(?<timestamp>[\d\-]+ [\d\:]+)\] (?<log_name>.+)\.(?<log_level>(DEBUG|INFO|NOTICE|WARNING|ERROR|CRITICAL|ALERT|EMERGENCY))\: (?<message>.*) (?<context>\{.+\}) \[(?<extra>.*)\]$/
format monolog # parser_monolog.rb
pos_file /var/log/td-agent/example_stream.log.pos
tag example.stream
</source>
ポイント
contextはjsonとして出力されているが、結果、jsonとしてparseしてあげないとjsonとして扱えない- formatを正規表現から、monolog(カスタムパーサー)に変更
参考URL
カスタムパーサー(parser_monolog.rb)を追加、収集したログをパース
すべてテキストであれば、正規表現でパースできれば事足りたのだが、context部分はJSON文字列になるため、fluent内でJSONとして扱いたい。
そのために、fluent-plugin-parserを使っていたが、毎回monolog用の正規表現をコピーするのは非効率なので、カスタムパーサーを作ることにしました。
require 'fluent/parser'
require 'json'
module Fluent
class TextParser
class MonologParser < Parser
Plugin.register_parser('monolog', self)
REGEXP = /^\[(?<time>[\d\-]+ [\d\:]+)\] (?<channel>.+)\.(?<level>(DEBUG|INFO|NOTICE|WARNING|ERROR|CRITICAL|ALERT|EMERGENCY))\: (?<message>.*) (?<context>\{.+\}) \[(?<extra>.*)\]$/
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def initialize
super
@time_parser = TimeParser.new(TIME_FORMAT)
@mutex = Mutex.new
end
def patterns
{'format' => REGEXP, 'time_format' => TIME_FORMAT}
end
def parse(text)
m = REGEXP.match(text)
unless m
yield nil, nil
return
end
time = m['time']
time = @mutex.synchronize { @time_parser.parse(time) }
channel = m['channel']
level = m['level']
message = m['message']
context = JSON.parse(m['context'])
extra = m['extra']
record = {
"channel" => channel,
"level" => level,
"message" => message,
"context" => context,
"extra" => extra
}
record["time"] = m['time'] if @keep_time_key
yield time, record
end
end
end
end
ポイント
- lib/fluent/plugin/parser_apache2.rbのほぼパクリ
- contextはJSON.parseしてrecordにセット
-
/etc/td-agent/plugin/
に配置して、td-agentを起動するだけ
参考URL
parserプラグインで、json部分をパース
record_transformerフィルタで、レコードを整形する
<filter example.stream>
@type record_transformer
renew_record true
enable_ruby true
<record>
level ${level}
message ${context["message"]}
handler stream
created_at ${time.strftime('%Y-%m-%d %H:%M:%S')}
</record>
</filter>
ポイント
- jsonにパースしているので、
${context["message"]}
のように、jsonの各フィールドを参照することができる
sqlite3にフィルタしたデータをINSERTする
前述した方法と同じなので、説明を割愛します。
結果
FluentHandlerとStreamHandlerそれぞれから出力したログが、ともにsqlite3に保存されています。
$ sqlite3 /var/db/td-agent/example.sqlite3 "select * from log;"
1|DEBUG|foo bar|fluent|2016-07-06 18:59:09
2|DEBUG|foo bar|stream|2016-07-06 19:02:28
imunew/monolog-fluentd-exampleで動作確認できます
imunew/monolog-fluentd-exampleをcloneし、READMEを見ながらセットアップすれば、sandbox的に動作確認することができます。
おわりに
- fluentd自体は、大まかには、input、output、filterで構成されていて、色々な組み合わせが可能になっている
- シンプルかつ疎結合なエコシステムなので、自分の用途にあったpluginを作成・公開してもいいかもしれない
- monologはsymfony以外でも使われていると思うので、parser_monologは需要ありそう
- fluent-plugin-monologをrubygems.orgに公開しました - Qiita (2016-07-10追記)