はじめに
世間的には「fluentdで集計 ≒ Norikra!!!!!」という流れで、それに対して一石を投じる気のかけらも私には無いわけですが、Norikraを用いるまでもない軽微な処理を実行する場合fluentdのプラグイン単体で処理を完結したいケースもあり、そしてNorikraが若干重厚に映るケースもあります(JRuby!! Esper!!!)
ということで、集計が行えるようなfluentd pluginについてまとめてみます。チョイスは僕の独断と偏見です。
ユースケース
fluentdの基本的なユースケースは、inputとして入力をしたデータをoutput先にrelayする、というものです。そして集計処理は、多くの場合output先のシステム内、もしくはシステムに蓄積されたデータを用いて別のシステムを用いて行う事が多いと思います。
(ex. HDFSに保存したログデータをHiveを用いて集計)
(ex. ElasticSearchに保存したログデータをKibanaを用いて集計&可視化)
そんな中、fluentdの中であえてデータを加工し、集計をする目的としては以下が挙げられるかなと思います。
システムに集計結果を素早くフィードバックしたい
datacounter や flowcounter で集計した結果を GrowthForecast に登録する、というのはまさにこのケースになります。ある単位時間内の集計結果を即時に別サービスにフィードバックしたい場合、fluentd 中にデータが流れたタイミングで集計し定期的に別システムにoutputをするという戦略は、有効です。
また、特定のデータ集計結果を即時サービスサイドにフィードバックさせたい場合も、このケースに当たると思います。例としては、何かしらのランキング結果、TopNの順位や集計値等をサービスに反映させる時、などです。
システムは同期的ではなく非同期処理にしたい
たとえばMySQLやDynamoDBのようなstorageをcounterとして使用し、同期的に値の更新をすることも可能と思いますが、count処理が同期的になるとサービス側がStorageのパフォーマンスに引きづられるため非同期にしたいケースもあります。また、データの逐次更新を行うとstorageの負荷も気になります。
もちろん自前で実装することも可能でしょうが、fluentdを用いれば、ただfluentdのノードにデータを流すだけである程度の非同期性を担保することができます。
Cons.
もちろん、fluentdはストリーム中に流れるデータを集計するために最適化されているとはいえず、必ずしも適してない部分もあります。
集計処理がスケールしない。
仕組みとして、fluentd ノードの各processがprocessをまたいで集計処理を行う事が難しいため、基本的には1台のserver / 1つのCpuコア / 1のFluentd ノードprocess を用いて集計を行う事になります。そのためCPUの側面でもメモリの側面でも処理がスケールしません。
たとえば、URLごとの出現回数を数えようとした場合、そのURLの異なり数が膨大になった場合、集計の可能・不可能はそのサーバーのメモリサイズに依存します。
(もちろん、工夫しだいで回避可能なことも多いと思いますが)
正確な集計処理ができない
以下に紹介するpluginの多くは、内部的にwatcherのようなスレッド or ループが定期的にデータを監視し、指定されたtime windowごとにデータをflushする、といった実装になっています。
このような実装の場合、「前回flush後からxx秒後にflush」という動作にはなっていますが、「xx時xx分xx秒から1分間のデータを集計」というような形にはなっていません。なので、我々が普段欲しいような集計処理をfluentdの中だけで正確に行う事は、多くのpluginでは実装的に難しいです。
長時間のデータ保持が求められる集計処理に向かない
端的にいうと、Unique Count系の処理には向きません。
集計結果をflushする前にprocessがkillされるとデータを失う
pluginの作り次第な部分もあると思いますが、基本は失われます。
つまり、Norikraを使え、、と。。。
なので、fluentdを用いた集計基盤として、fluentd外のシステムにデータを送り、そちらに集計処理を委譲するというモデルの方ががデータ集計処理としてはふさわしいケースが多いです。
つまりNorikraのようなモデルは正しいですよね、という話になります。
「じゃあこの記事書くことないだろ」といった声も聞こえてくる気がしますが、、、気にせずに以下記載します。
plugin各種
tagomoris family (datacounter / flowcounter / numeric-counter )
https://github.com/tagomoris/fluent-plugin-datacounter
https://github.com/tagomoris/fluent-plugin-flowcounter
https://github.com/tagomoris/fluent-plugin-numeric-counter
tagomorisさんはご自身のブログで非常にわかりやすい説明を書かれていますし、多くの採用事例もあり、語り尽くされている感がありますので、この場では割愛。
以下のようなブログ記事もありますので、皆さんNorikraを使いましょう。
http://blog.livedoor.jp/sonots/archives/37921050.html
datacalculator
https://github.com/muddydixon/fluent-plugin-datacalculator
すべてのレコードに対して、もしくはデータのkeyごとに単位時間辺りの集計結果を算出することができるpluginです。
このpluginの特徴的だなと思うところは、formulas や finalizerという項目を使って独自の計算式を定義することが可能で、自由度・汎用度が非常に高いところです。count upやaverage等の単純でかつ多くの集計pluginで対応されている計算処理以外に独自の処理を施したい場合、このpluginは非常に重宝すると思います。
<match payment.quest>
type datacalculator
tag result.quest
count_interval 5s
aggregate keys area_id, mission_id
formulas sum = amount * price, cnt = 1, total = amount
finalizer ave = cnt > 0 ? 1.00 * sum / cnt : 0
<unmatched>
type file
path unmatched
</unmatched>
</match>
詳しくは作者のブログにサンプル等が書かれているので、こちらをご参照ください。
http://d.hatena.ne.jp/muddydixon/20120712/1342107800
grepcounter
https://github.com/sonots/fluent-plugin-grepcounter
集計対象のレコードを、正規表現(regexp)で絞り込む事ができるplulginです。作者の方のブログに書かれているように、特定の文字列を含むレコードのみ集計対象にしたい、といった場合に非常に重宝すすると思います。
<match syslog.**>
type grepcounter
count_interval 60
input_key message
regexp WARN
exclude favicon.ico
threshold 1
add_tag_prefix warn.count
</match>
詳しくい使い方は、作者のブログにまとめられていますので、こちらをご参照ください。
http://blog.livedoor.jp/sonots/archives/24432889.html
inline-classifier
https://github.com/yosisa/fluent-plugin-inline-classifier
こちらのpluginは、あらかじめ取りうる値の範囲を複数定義しておき、集計対象のデータをそれらの入れ物に分類していくのが特徴になっています。
行える事は tagomoris さんの numeric-counterに近しいですが、numeric-counter は定められたtime windowごとに数値を合算するようになっていますが、こちらのinline-classifierでは1レコードごとに分類処理を行うだけのシンプルな作りになっており、後続のchainでまた別の集計処理を重ねる事ができたりするのが特徴で、重宝する部分でもあります。
<match raw.**>
type inline_classifier
add_prefix classified
remove_prefix raw
<rule>
key reqtime
type range
store speed
class1 fast * 0.1
class2 normal 0.1 0.5
class3 slow 0.5 *
</rule>
</match>
詳しくは作者の方がQiitaにまとめられているのでこちらをご参照ください。
http://qiita.com/yosisa/items/01be2d2e74858aae440e
lossycount
https://github.com/moaikids/fluent-plugin-lossycount
拙作のpluginをここで晒すのは恥ずかしいのですが、こちらのpluginの特徴としては、単純にkeyの異なり数ごとに集計処理をするのではなく、lossy countというアルゴリズムを使って集計を行うことにあります。
lossy countによる集計の特性としては、gamma値とepsilon値を適切に設定することでデータ総数Nに応じ下記のようなデータを必ず出力できる事を保証している事です。
- 出現頻度が “gamma × N” 以上のデータはすべて出力される。
- 出現頻度が “(gamma – epsilon) × N” 未満であるデータは出力されない。
- 出力される頻度は近似値で、誤差は必ず “(正確な頻度) – (epsilon × N)” の間に収まる。
端的に言うと、数学的に保証された確率でTopNについては正確に集計し、それ以外については概算値を算出することができるアルゴリズムです。上位N件のみ正確な数字と順位が知りたい場合などに使うと有効です。
<match pattern>
type lossycount
gamma LOSSY_COUNT_GAMMA
epsilon LOSSY_COUNT_EPSILON
key_name INPUT_KEY_NAME
time_windows SECONDS
output_tag OUTPUT_TAG_NAME
output_key_name OUTPUT_KEY_NAME
output_timestamp_name OUTPUT_TIMESTAMP_NAME
output_value_name OUTPUT_VALUE_NAME
enable_metrics true
metrics_tag METRICS_OUTPUT_TAG_NAME
verbose false
</match>
詳しくは以下のブログの中でも触れられているので、こちらをご参照ください。
http://developer.smartnews.com/blog/2013/09/24/implementation-of-ranking-algorithm-using-fluentd-and-redis/
実際に使ってみたケース
上記のpluginのうち、私はlossycountをよく使います。実際にどのようなケースで使っているかを簡単にご説明します。
redisと組み合わせてランキング処理
あるデータの出現頻度についてlossycountで集計し、結果をredsのsorted setに保存することによって、簡易的なランキング処理を行っています。
イメージとしては、以下のような形です。
ランキング処理は1台のノードで行いたいため、workerノードでETL的な情報の加工・抽出をしたうえでrankingノードに処理を委譲しています。
なお、redisへの出力については、こちらも拙作のpluginですが redisstore pluginを使用しています。
https://github.com/moaikids/fluent-plugin-redisstore
MySQLと組み合わせて集計処理
あるデータの出現頻度についてlossycountで集計し、結果をMySQLのレコードとして保存しています。
すべての生ログをRDBに保存するには量が多く、しかしdailyやhourlyのデータ投入では反映が遅くなるため、1分に1回fluentdで集計済みの値を保存することによってサービスになるべく早く反映し使用可能な状態にするのが目的です。
こちらについては、
key | count | fluentd_host | timestamp |
---|
のような形でデータを保存するようにしており、keyごと / timestamp ごと などいくつかの軸で集計が行えるようにしています。また、複数 fluentd ノードが並行して書き込んでもMySQL側で合算できるので、処理の並行性・スケーラビリティも多少は担保されています。
なお、MySQLへの出力にはmysql pluginを用いています。
https://github.com/tagomoris/fluent-plugin-mysql
まとめ
まあでも基本はNorikraを使いましょう。