LoginSignup
6
6

More than 5 years have passed since last update.

monologから出力したログをfluentdで集約する

Last updated at Posted at 2016-07-06

はじめに

アプリケーションを長く運用していると、ユーザーの操作履歴など、データベースに保存するデータ以外に、操作ログなどを取らなければいけないケースが出てきます。
今更ではありますが、fluentdを使ってmonologから出力されたログを集約してみます。

その1 Fluent-Logger-PHPで直接fluentdにログを送る

新たにログを取る場合は、いったんテキストファイルに出力したものを集計するのではなく、直接、fluentdにログを送りつけてもいいかもしれません。

構成

  • アプリ側は、monolog-fluent-handlerを使ってログを出力
  • アプリから出力されたログを、in_forwardで受信
  • record_transformerフィルタで、レコードを整形する
  • sqlite3にフィルタしたデータをINSERTする

monolog-fluent-handlerを使ってログを出力

monologmonolog-fluent-handlerをrequireします。

$ composer require "monolog/monolog:1.*"
$ composer require "dakatsuka/monolog-fluent-handler"
composer.json
{
    "require": {
        "monolog/monolog": "1.*",
        "dakatsuka/monolog-fluent-handler": "^1.2"
    }
}
example/fluent_handler.php
<?php
require_once __DIR__.'/../vendor/autoload.php';

use Dakatsuka\MonologFluentHandler\FluentHandler;
use Monolog\Logger;

// Global settings
setlocale(LC_ALL, 'ja_JP.UTF-8');
date_default_timezone_set('Asia/Tokyo');

$logger = new Logger('example');
$logger->pushHandler(new FluentHandler());

$logger->debug('fluent', ['message' => 'foo bar']);

ポイント

  • FluentHandlerの引数を省略すると、FluentLogger(fluent-logger-php)が使用される
  • Loggerのチャンネル名と、ログメッセージが組み合わされて、Tag名になる
    public function write(array $record)
    {
        $tag  = $record['channel'] . '.' . $record['message'];
        $data = $record['context'];
        $data['level'] = Logger::getLevelName($record['level']);

        $this->logger->post($tag, $data);
    }

参考URL

アプリから出力されたログを、in_forwardで受信

/etc/td-agent/td-agent.conf
<source>
  @type forward
</source>

参考URL

record_transformerフィルタで、レコードを整形する

/etc/td-agent/td-agent.conf
<filter example.fluent>
  @type record_transformer
  renew_record true
  enable_ruby true
  <record>
    level ${level}
    message ${message}
    handler fluent
    created_at ${time.to_datetime.strftime('%Y-%m-%d %H:%M:%S')}
  </record>
</filter>

ポイント

  • monologで言う$contextがfluent-logger-phpでは、メインのデータとして扱われる
  • renew_record trueとすると、<record></record>で定義されたアイテムのみとなる、falseまたは未定義だと、<record></record>で定義されたアイテムは、すでにあるデータに追加される
  • enable_ruby trueとすると、rubyの組み込み関数を使うことができる

参考URL

sqlite3にフィルタしたデータをINSERTする

/etc/td-agent/td-agent.conf
<match example.fluent>
  @type sqlite3
  path /var/db/td-agent/example.sqlite3
  table log
  columns level,message,handler,created_at
</match>

fluent-plugin-sqlite3のインストール

集めたログをSQLiteに保存します。

$ yum install -y sqlite-devel
$ td-agent-gem install fluent-plugin-sqlite3

事前にデータベースを作成

$ sqlite3 /path/to/database.sqlite3 < /path/to/create_tables.sql
create_tables.sql
create table if not exists log
(
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  level TEXT,
  message TEXT,
  handler TEXT,
  created_at TIMESTAMP DEFAULT (datetime('now','localtime'))
);

ポイント

  • columnsに入力必要な列のデータを列挙する、データベースの列名と合わせる必要がある

参考URL

その2 StreamHandlerで出力したテキストログを集約する

すでにテキストとして出力されているログを整形して、集約してみます。

構成

  • アプリ側は、StreamHandlerを使ってログをテキストファイルに出力
  • アプリから出力されたログを、in_tailで収集 正規表現でざっくり要素分解
  • カスタムパーサー(parser_monolog.rb)を追加、収集したログをパース
  • parserプラグインで、json部分をパース
  • record_transformerフィルタで、レコードを整形する
  • sqlite3にフィルタしたデータをINSERTする

StreamHandlerを使ってログをテキストファイルに出力

example/stream_handler.php
<?php
require_once __DIR__.'/../vendor/autoload.php';

use Monolog\Handler\StreamHandler;
use Monolog\Logger;

// Global settings
setlocale(LC_ALL, 'ja_JP.UTF-8');
date_default_timezone_set('Asia/Tokyo');

$logger = new Logger('example');
$logger->pushHandler(new StreamHandler(__DIR__.'/../var/log/example_stream.log', Logger::DEBUG, true, 0777));

$logger->debug('stream', ['message' => 'foo bar']);

出力されたログファイルは下記のようになります。

example_stream.log
[2016-07-06 19:02:28] example.DEBUG: stream {"message":"foo bar"} []

アプリから出力されたログを、in_tailで収集 正規表現でざっくり要素分解

/etc/td-agent/td-agent.conf
<source>
  @type tail
  path /data/var/log/example_stream.log
  #format /^\[(?<timestamp>[\d\-]+ [\d\:]+)\] (?<log_name>.+)\.(?<log_level>(DEBUG|INFO|NOTICE|WARNING|ERROR|CRITICAL|ALERT|EMERGENCY))\: (?<message>.*) (?<context>\{.+\}) \[(?<extra>.*)\]$/
  format monolog # parser_monolog.rb
  pos_file /var/log/td-agent/example_stream.log.pos
  tag example.stream
</source>

ポイント

  • contextはjsonとして出力されているが、結果、jsonとしてparseしてあげないとjsonとして扱えない
  • formatを正規表現から、monolog(カスタムパーサー)に変更

参考URL

カスタムパーサー(parser_monolog.rb)を追加、収集したログをパース

すべてテキストであれば、正規表現でパースできれば事足りたのだが、context部分はJSON文字列になるため、fluent内でJSONとして扱いたい。
そのために、fluent-plugin-parserを使っていたが、毎回monolog用の正規表現をコピーするのは非効率なので、カスタムパーサーを作ることにしました。

fluent/plugin/parser_monolog.rb
require 'fluent/parser'
require 'json'

module Fluent
  class TextParser
    class MonologParser < Parser
      Plugin.register_parser('monolog', self)

      REGEXP = /^\[(?<time>[\d\-]+ [\d\:]+)\] (?<channel>.+)\.(?<level>(DEBUG|INFO|NOTICE|WARNING|ERROR|CRITICAL|ALERT|EMERGENCY))\: (?<message>.*) (?<context>\{.+\}) \[(?<extra>.*)\]$/
      TIME_FORMAT = "%Y-%m-%d %H:%M:%S"

      def initialize
        super
        @time_parser = TimeParser.new(TIME_FORMAT)
        @mutex = Mutex.new
      end

      def patterns
        {'format' => REGEXP, 'time_format' => TIME_FORMAT}
      end

      def parse(text)
        m = REGEXP.match(text)
        unless m
          yield nil, nil
          return
        end

        time = m['time']
        time = @mutex.synchronize { @time_parser.parse(time) }

        channel = m['channel']
        level = m['level']
        message = m['message']
        context = JSON.parse(m['context'])
        extra = m['extra']

        record = {
            "channel" => channel,
            "level" => level,
            "message" => message,
            "context" => context,
            "extra" => extra
        }
        record["time"] = m['time'] if @keep_time_key

        yield time, record
      end
    end
  end
end

ポイント

参考URL

parserプラグインで、json部分をパース

record_transformerフィルタで、レコードを整形する

/etc/td-agent/td-agent.conf
<filter example.stream>
  @type record_transformer
  renew_record true
  enable_ruby true
  <record>
    level ${level}
    message ${context["message"]}
    handler stream
    created_at ${time.strftime('%Y-%m-%d %H:%M:%S')}
  </record>
</filter>

ポイント

  • jsonにパースしているので、${context["message"]}のように、jsonの各フィールドを参照することができる

sqlite3にフィルタしたデータをINSERTする

前述した方法と同じなので、説明を割愛します。

結果

FluentHandlerとStreamHandlerそれぞれから出力したログが、ともにsqlite3に保存されています。

$ sqlite3 /var/db/td-agent/example.sqlite3 "select * from log;"
1|DEBUG|foo bar|fluent|2016-07-06 18:59:09
2|DEBUG|foo bar|stream|2016-07-06 19:02:28

imunew/monolog-fluentd-exampleで動作確認できます

imunew/monolog-fluentd-exampleをcloneし、READMEを見ながらセットアップすれば、sandbox的に動作確認することができます。

おわりに

  • fluentd自体は、大まかには、input、output、filterで構成されていて、色々な組み合わせが可能になっている
  • シンプルかつ疎結合なエコシステムなので、自分の用途にあったpluginを作成・公開してもいいかもしれない
  • monologはsymfony以外でも使われていると思うので、parser_monologは需要ありそう
6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6