Help us understand the problem. What is going on with this article?

fluentdのplugin 「in_kinesis」を作ってみた

More than 3 years have passed since last update.

はじめに

とあるきっかけで、AWS kinesisからlogを取得する、fluentdのInput Plugin、
in_kinesis なるものを作ったので、その話を書きます。

ちなみに【in_kinesis】はこちらになります。
gem版はこちらになります。

きっかけ

今回、なぜPluginを作ったかというと、
会社で新たにリアルタイムバトルを導入する事になり、そのlog周りに携わる事になったのがきっかけでした。
そこで、今まで通りのアーキテクチャで良いのかどうかの検討から行う事にしました。
その時のlogの要件はこんな感じでした。

  • とにかくログがBigQueryに入れば良い。
  • ログの数の上限値が不明

fluentdのBigQuery pluginが有るからBigQueryに入れるのは簡単だね!
と思っていたのですが、
ここで問題となるのが「ログの数の上限値が不明」←これでした!!

  • もし秒間数千件とかだったら、fluentd1台でさばけるの?

    • 無理そう...
  • fluentdServerを増やせば良いんじゃない?

    • Serverを増やす際のコストは?工数とか手間は?増やすかどうかの判断は誰がする?
    • そもそも、増やすの間に合う?誰が捌ききれてるかを監視するの?
    • Server増やす運用って結構問題も多いし、手間じゃない??
  • じゃあGameServer側で、一旦logファイルに書き出して、それをfluentdで取り込む?

    • GameServerのI/O負荷が凄い事になった。
  • 他にもうちょっとマシな方法とかないのかな?

こんなやり取りがあり、白羽の矢がたったのがkinesisでした。

kinesisとはなんぞや?

まずAWSのkinesisというサービスをご存知ですか?
kinesisとはAWSが提供している、大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービスだそうです。

大量のデータを捌くパイプみたいなイメージです。
kinsis_image.png

kinesisのメリットとしてはこの2点が大きいかと思います。
・リアルタイムに大量のデータが捌けて、Shardによるスケールの容易な変更が可能。
・取り込んだデータを最大7日間保持する。

kinesisを検討してみる

そこでもう一度、問題点と照らし合わせます。

  • 問題点

    • 量が多いとfluentdServer1台じゃ捌けない。
    • かといって、Serverを増やす解決法は運用コストが大きい。
    • GameServer側で負荷を請け負うとI/O負荷が凄い事に。
  • kinesisで解決できる事

    • kinesisに投げられたデータは最大7日間保存されるため、バッファリングが可能になる事で、fluentdServerの負荷を抑えられる。
    • スケールの調整が簡単に出来るので、運用コストも低い。
    • kinesisは大量のデータをリアルタイムで受ける事ができるので、kinesisにlogを直接投げる事によって、GameServer側のI/O負荷が起きない。

なんという事でしょう。
色んな問題が一気に解決されたではありませんか。
と、いうわけでkinesisを使う事にしました。

構成はこんな感じです。
SQS (1).png

しかし!
肝心の、kineisに対応したinput pluginが無かったのです。
そこで、pluginは自作する事にしました。

in_kinesisの作成

自作の際に気をつけたい事は、kinesisの良さを損なわないプラグインにする事でした。
そこで、主に下記機能を備えました。

  • kinesisからlogを取得して、outputに渡す。
  • 各shard毎にマルチスレッドで並列処理をする。
  • shard毎に最後に取得したレコードの位置を記憶、続きから再会できる。
  • スケール調整による、shardの増減の自動追尾。
  • 使用するshardを意図的に指定できる。
  • logの形式はfluentdのdefaultのformatに準拠する。

shard毎にマルチスレッドで並列処理を行う事で、kinesisのリアルタイム処理を極力損なわない様にしています。

あと、1番の目玉は「shardの増減の自動追尾」です。
ここが自動じゃないと、結局人による管理が必要になり、運用コストがかかってしまうので、kinesisの良さを損なってしまうと考えました。

そこで、shardの増減の自動追尾を可能にする為に、各shardにアクセスするshardスレッドを管理する、「supervisorスレッド」を用意して各shardの状態を監視させる事にしました。

【in_kinesisの内部構成】
kinesis.png

supervisorスレッドはメインスレッドとjoinせずに独立して動きます。
そして、supervisorスレッドが定期的にkinesisの状態を取得し、それに合わせてshardスレッドを起動させたり終了させしりします。

それにより、shardの増減の自動追尾が可能になりました。

あとは、多様性を上げる為に、使用するshardを意図的に指定できる機能も備えました。
これはshard毎に用途を変えて使用する事を考慮して入れているのですが、この機能を使用した場合は自動追尾ができなくなってしまうので、ストリーム毎に管理する事をお勧めします。

最後に

in_kinesisを使う事でコンシューマーを手作りする必要が無く、fluentdでいろんなoutputに吐き出せるため、意外にも欲している人は多いのでは?と思い、今回初めてオープンソースとして作ってみることにしました。

しかし、初めてのRubyという事で、つたないRubyになってしまい、そういった面でも機能面でも、
まだまだ改良の余地あると思うので、気軽にpull request等送ってください^^ノ
優しい鉞、心よりお待ちしています。

【in_kinesis】はこちら
gem版はこちらになります。

kinpira
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした