Edited at
gumiDay 19

fluentdのplugin 「in_kinesis」を作ってみた

More than 3 years have passed since last update.


はじめに

とあるきっかけで、AWS kinesisからlogを取得する、fluentdのInput Plugin、

in_kinesis なるものを作ったので、その話を書きます。

ちなみに【in_kinesis】はこちらになります。

gem版はこちらになります。


きっかけ

今回、なぜPluginを作ったかというと、

会社で新たにリアルタイムバトルを導入する事になり、そのlog周りに携わる事になったのがきっかけでした。

そこで、今まで通りのアーキテクチャで良いのかどうかの検討から行う事にしました。

その時のlogの要件はこんな感じでした。


  • とにかくログがBigQueryに入れば良い。

  • ログの数の上限値が不明

fluentdのBigQuery pluginが有るからBigQueryに入れるのは簡単だね!

と思っていたのですが、

ここで問題となるのが「ログの数の上限値が不明」←これでした!!



  • もし秒間数千件とかだったら、fluentd1台でさばけるの?


    • 無理そう...




  • fluentdServerを増やせば良いんじゃない?


    • Serverを増やす際のコストは?工数とか手間は?増やすかどうかの判断は誰がする?

    • そもそも、増やすの間に合う?誰が捌ききれてるかを監視するの?

    • Server増やす運用って結構問題も多いし、手間じゃない??




  • じゃあGameServer側で、一旦logファイルに書き出して、それをfluentdで取り込む?


    • GameServerのI/O負荷が凄い事になった。



  • 他にもうちょっとマシな方法とかないのかな?


こんなやり取りがあり、白羽の矢がたったのがkinesisでした。


kinesisとはなんぞや?

まずAWSのkinesisというサービスをご存知ですか?

kinesisとはAWSが提供している、大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービスだそうです。

大量のデータを捌くパイプみたいなイメージです。

kinesisのメリットとしてはこの2点が大きいかと思います。

・リアルタイムに大量のデータが捌けて、Shardによるスケールの容易な変更が可能。

・取り込んだデータを最大7日間保持する。


kinesisを検討してみる

そこでもう一度、問題点と照らし合わせます。



  • 問題点


    • 量が多いとfluentdServer1台じゃ捌けない。

    • かといって、Serverを増やす解決法は運用コストが大きい。

    • GameServer側で負荷を請け負うとI/O負荷が凄い事に。




  • kinesisで解決できる事


    • kinesisに投げられたデータは最大7日間保存されるため、バッファリングが可能になる事で、fluentdServerの負荷を抑えられる。

    • スケールの調整が簡単に出来るので、運用コストも低い。

    • kinesisは大量のデータをリアルタイムで受ける事ができるので、kinesisにlogを直接投げる事によって、GameServer側のI/O負荷が起きない。



なんという事でしょう。

色んな問題が一気に解決されたではありませんか。

と、いうわけでkinesisを使う事にしました。

構成はこんな感じです。

しかし!

肝心の、kineisに対応したinput pluginが無かったのです。

そこで、pluginは自作する事にしました。


in_kinesisの作成

自作の際に気をつけたい事は、kinesisの良さを損なわないプラグインにする事でした。

そこで、主に下記機能を備えました。


  • kinesisからlogを取得して、outputに渡す。

  • 各shard毎にマルチスレッドで並列処理をする。

  • shard毎に最後に取得したレコードの位置を記憶、続きから再会できる。

  • スケール調整による、shardの増減の自動追尾。

  • 使用するshardを意図的に指定できる。

  • logの形式はfluentdのdefaultのformatに準拠する。

shard毎にマルチスレッドで並列処理を行う事で、kinesisのリアルタイム処理を極力損なわない様にしています。

あと、1番の目玉は「shardの増減の自動追尾」です。

ここが自動じゃないと、結局人による管理が必要になり、運用コストがかかってしまうので、kinesisの良さを損なってしまうと考えました。

そこで、shardの増減の自動追尾を可能にする為に、各shardにアクセスするshardスレッドを管理する、「supervisorスレッド」を用意して各shardの状態を監視させる事にしました。

【in_kinesisの内部構成】

supervisorスレッドはメインスレッドとjoinせずに独立して動きます。

そして、supervisorスレッドが定期的にkinesisの状態を取得し、それに合わせてshardスレッドを起動させたり終了させしりします。

それにより、shardの増減の自動追尾が可能になりました。

あとは、多様性を上げる為に、使用するshardを意図的に指定できる機能も備えました。

これはshard毎に用途を変えて使用する事を考慮して入れているのですが、この機能を使用した場合は自動追尾ができなくなってしまうので、ストリーム毎に管理する事をお勧めします。


最後に

in_kinesisを使う事でコンシューマーを手作りする必要が無く、fluentdでいろんなoutputに吐き出せるため、意外にも欲している人は多いのでは?と思い、今回初めてオープンソースとして作ってみることにしました。

しかし、初めてのRubyという事で、つたないRubyになってしまい、そういった面でも機能面でも、

まだまだ改良の余地あると思うので、気軽にpull request等送ってください^^ノ

優しい鉞、心よりお待ちしています。

【in_kinesis】はこちら

gem版はこちらになります。