42
38

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

fluentd pluginで集計処理をする(Norikra以外で)

Last updated at Posted at 2014-12-09

はじめに

世間的には「fluentdで集計 ≒ Norikra!!!!!」という流れで、それに対して一石を投じる気のかけらも私には無いわけですが、Norikraを用いるまでもない軽微な処理を実行する場合fluentdのプラグイン単体で処理を完結したいケースもあり、そしてNorikraが若干重厚に映るケースもあります(JRuby!! Esper!!!)

ということで、集計が行えるようなfluentd pluginについてまとめてみます。チョイスは僕の独断と偏見です。

ユースケース

fluentdの基本的なユースケースは、inputとして入力をしたデータをoutput先にrelayする、というものです。そして集計処理は、多くの場合output先のシステム内、もしくはシステムに蓄積されたデータを用いて別のシステムを用いて行う事が多いと思います。

(ex. HDFSに保存したログデータをHiveを用いて集計)
(ex. ElasticSearchに保存したログデータをKibanaを用いて集計&可視化)

そんな中、fluentdの中であえてデータを加工し、集計をする目的としては以下が挙げられるかなと思います。

システムに集計結果を素早くフィードバックしたい

datacounter や flowcounter で集計した結果を GrowthForecast に登録する、というのはまさにこのケースになります。ある単位時間内の集計結果を即時に別サービスにフィードバックしたい場合、fluentd 中にデータが流れたタイミングで集計し定期的に別システムにoutputをするという戦略は、有効です。

また、特定のデータ集計結果を即時サービスサイドにフィードバックさせたい場合も、このケースに当たると思います。例としては、何かしらのランキング結果、TopNの順位や集計値等をサービスに反映させる時、などです。

システムは同期的ではなく非同期処理にしたい

たとえばMySQLやDynamoDBのようなstorageをcounterとして使用し、同期的に値の更新をすることも可能と思いますが、count処理が同期的になるとサービス側がStorageのパフォーマンスに引きづられるため非同期にしたいケースもあります。また、データの逐次更新を行うとstorageの負荷も気になります。

もちろん自前で実装することも可能でしょうが、fluentdを用いれば、ただfluentdのノードにデータを流すだけである程度の非同期性を担保することができます。

Cons.

もちろん、fluentdはストリーム中に流れるデータを集計するために最適化されているとはいえず、必ずしも適してない部分もあります。

集計処理がスケールしない。

仕組みとして、fluentd ノードの各processがprocessをまたいで集計処理を行う事が難しいため、基本的には1台のserver / 1つのCpuコア / 1のFluentd ノードprocess を用いて集計を行う事になります。そのためCPUの側面でもメモリの側面でも処理がスケールしません。
たとえば、URLごとの出現回数を数えようとした場合、そのURLの異なり数が膨大になった場合、集計の可能・不可能はそのサーバーのメモリサイズに依存します。

(もちろん、工夫しだいで回避可能なことも多いと思いますが)

正確な集計処理ができない

以下に紹介するpluginの多くは、内部的にwatcherのようなスレッド or ループが定期的にデータを監視し、指定されたtime windowごとにデータをflushする、といった実装になっています。

このような実装の場合、「前回flush後からxx秒後にflush」という動作にはなっていますが、「xx時xx分xx秒から1分間のデータを集計」というような形にはなっていません。なので、我々が普段欲しいような集計処理をfluentdの中だけで正確に行う事は、多くのpluginでは実装的に難しいです。

長時間のデータ保持が求められる集計処理に向かない

端的にいうと、Unique Count系の処理には向きません。

集計結果をflushする前にprocessがkillされるとデータを失う

pluginの作り次第な部分もあると思いますが、基本は失われます。

つまり、Norikraを使え、、と。。。

なので、fluentdを用いた集計基盤として、fluentd外のシステムにデータを送り、そちらに集計処理を委譲するというモデルの方ががデータ集計処理としてはふさわしいケースが多いです。
つまりNorikraのようなモデルは正しいですよね、という話になります。

「じゃあこの記事書くことないだろ」といった声も聞こえてくる気がしますが、、、気にせずに以下記載します。

plugin各種

tagomoris family (datacounter / flowcounter / numeric-counter )

https://github.com/tagomoris/fluent-plugin-datacounter
https://github.com/tagomoris/fluent-plugin-flowcounter
https://github.com/tagomoris/fluent-plugin-numeric-counter

tagomorisさんはご自身のブログで非常にわかりやすい説明を書かれていますし、多くの採用事例もあり、語り尽くされている感がありますので、この場では割愛。

以下のようなブログ記事もありますので、皆さんNorikraを使いましょう。
http://blog.livedoor.jp/sonots/archives/37921050.html

datacalculator

https://github.com/muddydixon/fluent-plugin-datacalculator

すべてのレコードに対して、もしくはデータのkeyごとに単位時間辺りの集計結果を算出することができるpluginです。

このpluginの特徴的だなと思うところは、formulas や finalizerという項目を使って独自の計算式を定義することが可能で、自由度・汎用度が非常に高いところです。count upやaverage等の単純でかつ多くの集計pluginで対応されている計算処理以外に独自の処理を施したい場合、このpluginは非常に重宝すると思います。

<match payment.quest>
  type datacalculator
  tag result.quest
  count_interval 5s
  aggregate keys area_id, mission_id
  formulas sum = amount * price, cnt = 1, total = amount
  finalizer ave = cnt > 0 ? 1.00 * sum / cnt : 0
  <unmatched>
    type file
    path unmatched
  </unmatched>
</match>

詳しくは作者のブログにサンプル等が書かれているので、こちらをご参照ください。
http://d.hatena.ne.jp/muddydixon/20120712/1342107800

grepcounter

https://github.com/sonots/fluent-plugin-grepcounter

集計対象のレコードを、正規表現(regexp)で絞り込む事ができるplulginです。作者の方のブログに書かれているように、特定の文字列を含むレコードのみ集計対象にしたい、といった場合に非常に重宝すすると思います。

<match syslog.**>
  type grepcounter
  count_interval 60
  input_key message
  regexp WARN
  exclude favicon.ico
  threshold 1
  add_tag_prefix warn.count
</match>

詳しくい使い方は、作者のブログにまとめられていますので、こちらをご参照ください。
http://blog.livedoor.jp/sonots/archives/24432889.html

inline-classifier

https://github.com/yosisa/fluent-plugin-inline-classifier

こちらのpluginは、あらかじめ取りうる値の範囲を複数定義しておき、集計対象のデータをそれらの入れ物に分類していくのが特徴になっています。

行える事は tagomoris さんの numeric-counterに近しいですが、numeric-counter は定められたtime windowごとに数値を合算するようになっていますが、こちらのinline-classifierでは1レコードごとに分類処理を行うだけのシンプルな作りになっており、後続のchainでまた別の集計処理を重ねる事ができたりするのが特徴で、重宝する部分でもあります。

<match raw.**>
  type inline_classifier
  add_prefix classified
  remove_prefix raw
  <rule>
    key reqtime
    type range
    store speed
    class1 fast * 0.1
    class2 normal 0.1 0.5
    class3 slow 0.5 *
  </rule>
</match>

詳しくは作者の方がQiitaにまとめられているのでこちらをご参照ください。
http://qiita.com/yosisa/items/01be2d2e74858aae440e

lossycount

https://github.com/moaikids/fluent-plugin-lossycount

拙作のpluginをここで晒すのは恥ずかしいのですが、こちらのpluginの特徴としては、単純にkeyの異なり数ごとに集計処理をするのではなく、lossy countというアルゴリズムを使って集計を行うことにあります。

lossy countによる集計の特性としては、gamma値とepsilon値を適切に設定することでデータ総数Nに応じ下記のようなデータを必ず出力できる事を保証している事です。

  • 出現頻度が “gamma × N” 以上のデータはすべて出力される。
  • 出現頻度が “(gamma – epsilon) × N” 未満であるデータは出力されない。
  • 出力される頻度は近似値で、誤差は必ず “(正確な頻度) – (epsilon × N)” の間に収まる。

端的に言うと、数学的に保証された確率でTopNについては正確に集計し、それ以外については概算値を算出することができるアルゴリズムです。上位N件のみ正確な数字と順位が知りたい場合などに使うと有効です。

<match pattern>
    type lossycount
    gamma LOSSY_COUNT_GAMMA
    epsilon LOSSY_COUNT_EPSILON
    key_name INPUT_KEY_NAME
    time_windows SECONDS
    output_tag OUTPUT_TAG_NAME
    output_key_name OUTPUT_KEY_NAME
    output_timestamp_name OUTPUT_TIMESTAMP_NAME
    output_value_name OUTPUT_VALUE_NAME
    enable_metrics true
    metrics_tag METRICS_OUTPUT_TAG_NAME
    verbose false
</match>

詳しくは以下のブログの中でも触れられているので、こちらをご参照ください。
http://developer.smartnews.com/blog/2013/09/24/implementation-of-ranking-algorithm-using-fluentd-and-redis/

実際に使ってみたケース

上記のpluginのうち、私はlossycountをよく使います。実際にどのようなケースで使っているかを簡単にご説明します。

redisと組み合わせてランキング処理

あるデータの出現頻度についてlossycountで集計し、結果をredsのsorted setに保存することによって、簡易的なランキング処理を行っています。

イメージとしては、以下のような形です。
ランキング処理は1台のノードで行いたいため、workerノードでETL的な情報の加工・抽出をしたうえでrankingノードに処理を委譲しています。
case1.png

なお、redisへの出力については、こちらも拙作のpluginですが redisstore pluginを使用しています。

https://github.com/moaikids/fluent-plugin-redisstore

MySQLと組み合わせて集計処理

あるデータの出現頻度についてlossycountで集計し、結果をMySQLのレコードとして保存しています。
すべての生ログをRDBに保存するには量が多く、しかしdailyやhourlyのデータ投入では反映が遅くなるため、1分に1回fluentdで集計済みの値を保存することによってサービスになるべく早く反映し使用可能な状態にするのが目的です。

イメージとしては以下のような形です。
case2.png

こちらについては、

key count fluentd_host timestamp

のような形でデータを保存するようにしており、keyごと / timestamp ごと などいくつかの軸で集計が行えるようにしています。また、複数 fluentd ノードが並行して書き込んでもMySQL側で合算できるので、処理の並行性・スケーラビリティも多少は担保されています。

なお、MySQLへの出力にはmysql pluginを用いています。
https://github.com/tagomoris/fluent-plugin-mysql

まとめ

まあでも基本はNorikraを使いましょう。

http://norikra.github.io/

42
38
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
42
38

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?