更新履歴
2024.1.24: AUTO_BUCKETよりもDATE_TRUNCの方が良いので一部書き直しました。
はじめに
特定のIPアドレスからの大量の403ログが、複数時間にわたり続いているのかのを検知したい... という高度な検知をES|QLでどこまで対応できるかを確認しました。
ES|QLはv8.11から登場したElasticの新しいクエリー言語です。
機能として、ES|QLで任意の時間毎のログ件数の集計はきますが、それが複数時間にわたって連続しているかを判定するような関数はありません。ただ、例えば直近の3時間のデータを検索範囲にし、1時間毎の集計を見た場合、それは3行のデータとなるので、それに対して検知条件をフィルタリングして、結果3行であれば、3時間連続で検知条件を満たした状況が発生している、ということは確認できます。
検証手順
本検証はv8.11のElastic Cloud環境で行なっています。KibanaのDev ToolsとDiscoverだけを使って検証しました。AlertにはKibana Alertを使います。
1. 時間単位の集計方法
今回はシンプルなスキーマのインデックスで実験していきます。IPアドレスとHTTPレスポンスコードだけです。
PUT detection-test
{
"mappings": {
"properties": {
"ip_address": {
"type": "ip"
},
"response_code": {
"type": "integer"
}
}
}
}
以下のようにして、IP 192.168.1.1と192.168.1.2でそれぞれレスポンス403のログを入れていきます。
POST detection-test/_doc
{
"ip_address": "192.168.1.1",
"response_code": 403,
"@timestamp": "2024-01-22T14:30:00+09:00"
}
POST detection-test/_doc
{
"ip_address": "192.168.1.2",
"response_code": 403,
"@timestamp": "2024-01-22T14:30:00+09:00"
}
以下のような分布でログを入れていきました。
192.168.1.1 -> 12時台: 5件, 13時台:10件, 14時台:10件
192.168.1.2 -> 12時台: 10件, 13時台:10件, 14時台:10件
まず簡単な例から。IPアドレス毎の検索対象時間内の403のログの合計件数を表示する例です。
from detection-test
| where response_code == 403
| STATS count_403 = COUNT(*) BY ip_address
| KEEP ip_address, count_403
次、1時間毎の403ログの集計を確認したいと思います。DATE_TRUNCを使ってできます。
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
| KEEP ip_address, hour, count_403
| SORT ip_address, hour desc
次、時刻の表示を日本時間にしたいと思います。残念ながら現在DATE_TRUNCを使った場合の時間表示に関しては現時点まだUTC表記にしかなりません。コード量が多くなりますが以下のようにして日本時間にすることができました。
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
| KEEP ip_address, hour, count_403
| SORT ip_address, hour desc
//For display in JST
| EVAL dt_long = to_long(hour)
| EVAL TIMEZONE_DIFF = 9
| EVAL dt_timezone_long = to_long(dt_long + TIMEZONE_DIFF * 60 * 60 * 1000)
| EVAL hourly_403_count_bucket_jst_string = DATE_FORMAT("yyyy-MM-dd HH:mm:ss", to_datetime(dt_timezone_long))
| KEEP ip_address, hourly_403_count_bucket_jst_string, count_403
| SORT ip_address, hourly_403_count_bucket_jst_string desc
| RENAME hourly_403_count_bucket_jst_string as 日本時間
2. ES|QLでアラートさせるための条件を追加
基本が終わったところで、次はアラートをさせるためにどうするかを考えていきます。
後に紹介するKibanaアラートでES|QLをアラートの条件と利用する際は、ES|QL実行結果に一行でも結果が存在すれば、アラート発行となるという動きとなります。
まず、簡単な例から。同じIPで403が特定の時間枠で10件以上ログされた時に、アラートする場合。(For alertコメントのところに条件を入れてあります)
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
//For alert
| where count_403 >= 10
//For display in JST
| EVAL dt_long = to_long(hour)
| EVAL TIMEZONE_DIFF = 9
| EVAL dt_timezone_long = to_long(dt_long + TIMEZONE_DIFF * 60 * 60 * 1000)
| EVAL hourly_403_count_bucket_jst_string = DATE_FORMAT("yyyy-MM-dd HH:mm:ss", to_datetime(dt_timezone_long))
| KEEP ip_address, hourly_403_count_bucket_jst_string, count_403
| SORT ip_address, hourly_403_count_bucket_jst_string desc
| RENAME hourly_403_count_bucket_jst_string as 日本時間
次、直近の3時間内で、3回(1時間 * 3)、それぞれの時間で10件以上ログされた時に、アラートする場合。これは、1時間単位に集計された数値を、さらに複数時間にわたって更に集計する形となるので、もっと複雑な条件です。
最初に、Where句で、NOW関数を使いつつ、過去3時間を範囲指定をします。これで例えば現在時刻が14:10の場合、14:10-17:10の間にある、集計バケット(14:00, 15:00, 13:00)に絞り込まれます。
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
//For alert
| where hour >= (NOW() - 3 hour) and hour < NOW()
| where count_403 >= 10
//For display in JST
| EVAL dt_long = to_long(hour)
| EVAL TIMEZONE_DIFF = 9
| EVAL dt_timezone_long = to_long(dt_long + TIMEZONE_DIFF * 60 * 60 * 1000)
| EVAL hourly_403_count_bucket_jst_string = DATE_FORMAT("yyyy-MM-dd HH:mm:ss", to_datetime(dt_timezone_long))
| KEEP ip_address, hourly_403_count_bucket_jst_string, count_403
| SORT ip_address, hourly_403_count_bucket_jst_string desc
| RENAME hourly_403_count_bucket_jst_string as 日本時間
もし、直近の14:00のバケットを対象とせず、 13:00, 12:00, 11:00のの3つのバケットとしたい場合は、WHERE句を以下のようにと1時間前にずらせば良いでしょう。
| where hour >= (NOW() - 4 hour) and hour < (NOW() -1 hour)
次に、追加の集計を入れ、過去3時間を検索対象とした時に、IPアドレス毎に3行(=3時間分)カウントできれば、連続で条件を満たしていることになります。
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
//For alert
| where hour >= (NOW() - 3 hour) and hour < NOW()
| where count_403 >= 10
//For alert looking on multiple timespans
| STATS cnt = COUNT(*) BY ip_address
| where cnt == 3
3. Kibanaアラートの作成
右上のAlertsからCreate earch threshold ruleを選択します。
表示中のES|QLは最初から条件としてセットされています。データ検索対象のSet the time windowはこの場合4時間以上であれば正しく動きますが、分かりやすく24時間にセットします。実際のアラート検知に使う時間範囲はES|QLに書いてあるので、問題ありません。(もし大量にデータがある場合は切り詰めたほうがレスポンス良くなるかも)
Test queryでテストしてみます。結果が一行でもあれば、アラートが出る条件を満たしています。
その他Actionsは必要に応じて設定ください。アラートの設定は以上です。
4. 攻撃者のIPアドレスが変わっていった場合の検知
さきほどは特定のIPアドレスごとに、複数時間で条件を満たしたかを確認しました。次はソースのIPアドレスが少しずつ変わるような、状況を想定した場合の検知です。最初の集計はIPアドレス毎に行いますが、最後の時間毎の確認では、IPアドレス関係なく件数条件を満たしているかを確認する方法を取りました。
以下のコメント//For alert looking on multiple timespansのところで、各時間毎MAXをとり、それが3つの時間で閾値を超えているかをみています。
from detection-test
| where response_code == 403
| EVAL hour = DATE_TRUNC(1 hour, @timestamp)
| STATS count_403 = COUNT(*) BY ip_address, hour
//For alert
| where hour >= (NOW() - 3 hour) and hour < NOW()
| where count_403 >= 10
//For alert looking on multiple timespans
| STATS max_access_in_hour = max(count_403) by hour
| STATS cnt = COUNT(hour)
| where cnt == 3
おわり
複数回の集計処理を必要とするような、検知もES|QLならなんとかできそうです。今までのElasticのクエリーではできませんでしたね。これによる負荷と速度は、実行する頻度とデータ量によっては気にした方がいいかもしれません。