はじめに
この記事はDatadog Advent Calendar 2016の3日目の記事です。
※ほんとは4日目の枠の予約をしてたのだけど、前日に書き終わった時点で、まだ3日目の枠が空いてから前に詰めた。なのでこれを読んだ誰か4日目書いてね(/ω・\)チラッ
Datadogとログ監視
SSHログイン成功/失敗回数を記録/監視しようと思って、既にサーバ監視ツールとしてDatadogを使っていたので、Datadogでログ監視できないかと調べた。
Datadogにもログ監視機能はあるようなのだけど、標準で提供されている方法は以下のようなかんじで、公式ログフォーマットが決め打ちされており、貧弱すぎてシスログすら標準ではパースしてくれない。
Datadog Agent によるログの解析方法
例えばシスログをパースしようとすると、こんなかんじで、pythonでカスタムパーサーを自分で書かないといけないらしい。
Datadog ログ監視でログ本文を通知
うーん。動くかもしれないが、これはツライ(´・ω・`)
何が一番ツライかというとこのカスタムパーサーを監視対象サーバ全台に配布しないといけないし、パーサーのコードを書き換えたりすると配布し直さないといけない。
で、代替案として、ログのバックアップなどのために既にfluentdでログ転送していたので、集約しているログサーバのfluentdからDatadogに投げられないかと考えた。ログを特定の条件で抽出したり加工したりするのはfluentだと得意だし。プラグイン組み合わせたりもできるし。
で、ありがたいことに、既にfluentdからDatadogへの送信のプラグインを作ってる人がいた。
ryotarai/fluent-plugin-dogstatsdが良さそうだったので、これを使わせていただいた。
ちなみにこのプラグインはデータ送信部分はdogstatdを経由しているので、fluentdサーバにもdatadog-agentがインストールされてる必要があるんだけど、既にインストールしてたし問題なし。
またこのプラグインの使い方については、以下の記事でnginxのログを監視している例があったので、いろいろ参考にさせていただいた。
Dogstatsd で Datadog にカスタムメトリクスを送る方法 〜Fluentd との連携を添えて〜
Datadogのメトリクスとして送信できれば、あとはDatadog側でよしなに監視定義を設定すればよい。
というわけで、やってみよう。
環境
稼働確認した環境やバージョンなどは以下のとおり。
- Amazon Linux 2015.09
- td-agent-2.3.1-0.el2015.x86_64
- datadog-agent-5.8.3-1.x86_64
- fluentd: 0.12.20
- fluent-plugin-rewrite-tag-filter: 1.5.4
- fluent-plugin-dogstatsd: 0.0.5
設定
監視対象ログをfluentdで転送する
SSHログイン成功/失敗は/var/log/secureに記録されている。
監視対象サーバのログを投げる方は、この構成の場合、実はあんまり本題と関係ないんだけど、一応前提となる構成として書いておく。
実際には他の設定も入ってるんだけど、関連するところを抜粋。
とりあえず監視したいログ(この場合/var/log/secure)をfluentdで投げればOK。各自の環境で適宜読み替えて下さい。
<source>
type config_expander
<config>
type tail
format syslog
path /var/log/secure
pos_file /var/log/td-agent/secure.pos
tag hoge.staging.app.secure
</config>
</source>
<filter *.**>
@type record_transformer
enable_ruby true
<record>
@hostname ${hostname}
@timestamp ${time.strftime('%Y-%m-%dT%H:%M:%S%z')}
</record>
</filter>
<match *.**>
type forward
retry_limit 5
flush_interval 5s
<server>
host fluentd.local
port 24224
</server>
</match>
監視対象ログのタグを取り出す
以降は、ログを集約するfluentdサーバ上の設定です。
まず、処理の単位をわかりやすくするために **.secure
にマッチしたものを @secure
ラベルにルーティングしてます。特に要件ではないのでこの辺はご自由にどうぞ。
<source>
type forward
port 24224
</source>
<match **.secure>
type copy
# secureログの監視
<store>
type relabel
@label @secure
</store>
# コピーしてデフォルト処理にも流しておく
<store>
type relabel
@label @default
</store>
</match>
監視対象文字列のタグをリライトして調整
監視対象の文字列ごとにrewrite_tag_filterを使ってタグをリライトして調整する。
ここでやりたいことは、Datadogに送る前に、必要なログだけを正規表現でフィルタしつつ、ついでに正規表現のマッチでIPアドレスなどの引き継ぎたい情報を抽出したのを、あとで加工しやすいようにfluent内のタグに一旦退避する。
どの文字を引っ掛けるかはOSによって適宜調整が必要だと思うんだけど、SSHログイン成功/失敗した接続元のIPアドレスが分かる行を抽出しておく。
<label @secure>
# secureログをgrepする
<match **.secure>
type rewrite_tag_filter
# ログイン成功した場合に出力される
# Accepted publickey for XXXXX from XXX.XXX.XXX.XXX port XXXXX ssh2: RSA xx:xxx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx:xx
rewriterule1 message Accepted\s+publickey\s+for\s+\S+\sfrom\s+(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3}) ${tag}.login_success.$1-$2-$3-$4
# ログイン失敗した場合に出力される
# Connection closed by XXX.XXX.XXX.XXX [preauth]
rewriterule2 message Connection\s+closed\s+by\s+(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})\s+\[preauth\] ${tag}.login_failure.$1-$2-$3-$4
</match>
...
</label>
Datadog送信用のレコードを作る
タグを分解して、Datadog送信用のレコードを作る。
ここで作ってるrecordのKey=ValueがDatadog上のタグのKey=Valueにマッピングされる。
<label @secure>
...
# grep条件にヒットした件数をカウント
<match **.secure.login_success.*>
type copy
<store>
type record_reformer
renew_record true
tag dogstatsd.increment
<record>
type "increment"
key hoge.fluentd.secure.login_success
service ${tag_parts[0]}
env ${tag_parts[1]}
role ${tag_parts[2]}
source_ip ${tag_suffix[-1]}
fluentd_source_host ${record["@hostname"]}
</record>
</store>
</match>
<match **.secure.login_failure.*>
type copy
<store>
type record_reformer
renew_record true
tag dogstatsd.increment
<record>
type "increment"
key hoge.fluentd.secure.login_failure
service ${tag_parts[0]}
env ${tag_parts[1]}
role ${tag_parts[2]}
source_ip ${tag_suffix[-1]}
fluentd_source_host ${record["@hostname"]}
</record>
</store>
</match>
## @dogstatsd にルーティングする
<match dogstatsd.**>
type relabel
@label @dogstatsd
</match>
</label>
source_ipに接続元IPアドレスをセットしてタグとして送るのだけど、Datadogで使えるタグの総数にはノード数に応じた上限があるので注意。
今回の用途みたいなSSHログインはアクセス元IPアドレスを絞ってるので、通常は不特定多数のIPが記録されることはないはずなのだけど、この方法を例えばWebサーバのアクセスログのクライアントIPとか不特定のIPが記録される用途には使うとタグの数が膨大になって上限に達する恐れがあるので、そういう使い方をしてはいけない。
あと、上記の設定ではfluentタグから監視対象サーバが所属するservice、env、roleなどの情報を抽出しているが、これはfluentタグの構造に依存しているので、Datadogのタグとして送信したいものは、なんらかの方法で、レコードに含めておく必要がある。
Datadogに送信する
最後にfluent-plugin-dogstatsdを使って、dogstatd経由でDatadogに送信します。
プラグインをインストールします。fluent-gemのパスは適宜読み替えて下さい。
# /opt/td-agent/embedded/bin/fluent-gem install fluent-plugin-dogstatsd
アウトプットで type dogstatsd
を指定します。
flat_tag true にすると fluentレコードの各フィールドをDatadogのメトリクスのタグにしてくれる。
<label @dogstatsd>
<match dogstatsd.**>
type copy
<store>
@id dogstatsd
type dogstatsd
flat_tag true
</store>
</match>
</label>
これでDatadogにメトリクスが飛ぶようになるので、あとは一定時間あたりのログイン成功/失敗回数とか、タグに付けたIPアドレスなどの条件を組み合わせて好きにDatadogのダッシュボードやアラート設定などを作ればよい。
まとめ
fluentdとDatadogを連携してIPアドレスごとのSSHログイン成功/失敗回数を記録/監視することができた。なんだかDatadogの使い方ではなくfluentdの設定方法の説明ばかりしてたような気もするが、致し方ない。
ちなみにこの方法だとログに特定の文字列が現れた回数をメトリクスとして記録できるが、メタ情報としてIPアドレスなどのタグは送信できるものの、オリジナルのログ本文を通知する手段がない。
ログ監視の目的として、特定の文字列が現れた場合、ログ本文を含むアラートを上げたい場合は、メトリクスではなくイベントとして送信するか、もしくは以前別件で書いたSlackに直接通知するのがよいんじゃなかろうかと思う。fluentdからSlackへの通知は以下を参照。
fluentdでログ監視して特定の文字列をgrepで検知したらSlackに通知する(バースト抑止付き)
ところで、ログ監視をfluentdの一箇所に集約できたのはよいんだけど、監視条件を追加するたびにfluentdの定義が肥大化していくので、色んな条件でログ監視をどんどん追加していくとメンテしづらそうな気がしている。
というわけでDatadogさんログ監視機能もうちょっとがんばってくれないかなぁ。。。
監視対象ログのパス、ログフォーマット、監視条件をWeb管理画面で設定するだけで、サーバサイドはエージェント入れとくだけぐらいになるとうれしいんだけれども。
あと冒頭にも書いたけど、これを書いてる時点で明日のDatadog Advent Calendar 2016の4日目がまだ空いてるので誰か埋めてね(/ω・\)チラッ