はじめに
以前、以下の記事でElasticsearchのAnomaly DetectionでDatafeedを定義して利用する方法について紹介しました。
上記の記事では基本的な使い方についての紹介でしたが、ここではより実践的なAggregationを使ったDatafeedを利用する方法について紹介したいと思います。
この記事では例題として、以下のようなAnomaly Detectionのジョブを作ってみることにします。
- テストデータとしてkibana_sample_data_logsを利用
- エラーコードごとの出現割合の変化を元に異常検知を行う
割合を元にしたAnomaly Detection
ElasticsearchのAnomaly Detectionが実際に行っているのは、ターゲットとなる値(例えばCPUの使用率)に対して平均、最大、合計などの集計処理を施した値が、通常とどの程度乖離しているのかを調べる処理です。
この集計処理は通常MLノードのAnomaly Detectionジョブで実行されます。ちなみにこのジョブはElasticsearchの動作するJVMではなく外部のC++プロセスとして動作します。
しかしこの集計処理はElasticsearchのAggregationを利用することも可能です。そうすることによって以下の利点があります。
- Elasticsearchの持つ全てのAggregationを利用できる
- ElasticsearchのAggregationは高速である
この方法を採用することで、Elasticsearchの高速なAggregation機能を活用しつつ、柔軟な異常検知を行うことが可能になります。以下の手順で進めていきます。
- Normalize Aggregationを用いたクエリの作成と検証
- Anomaly Detectionジョブの設定
- Datafeedの作成と動作確認
これらの手順を順に説明していきます。
実装方針
Elasticsearchでエラーコードの出現割合を計算するためには、Nomalize Aggregationを利用します。Nomalize Aggregationは、指定したフィールドの値の出現割合を計算するAggregationです。このAggregationを利用して、エラーコードの出現割合を計算し、異常検知を行うことができます。
Anomaly Detectionで任意のAggregationを利用するためには、カスタムのDatafeedを利用します。DatafeedはAnomaly Detectionのためのデータを取得するためのクエリを定義するもので、通常のクエリの他にAggregationのクエリを指定することもできます。
独自のDatafeedを利用する場合、KibanaのMachine LearningのUIを利用してDatafeedを作成することが難しいため、REST APIを利用してAnomaly Detectionのジョブを作成することにします。
クエリーの作成と検証
まず、エラーコードの出現割合を計算するためのクエリを作成しましょう。Normalize Aggregationを利用して、エラーコードの出現割合を計算するクエリは以下のようになります。
GET kibana_sample_data_logs/_search
{
"size": 0,
"aggs": {
"timestamp": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "900000ms"
},
"aggs": {
"timestamp": {
"max": {
"field": "@timestamp"
}
},
"response": {
"terms": {
"field": "response.keyword",
"size": 10
},
"aggs": {
"response_count": {
"value_count": {
"field": "@timestamp"
}
},
"response_percentage": {
"normalize": {
"buckets_path": "response_count",
"method": "percent_of_sum"
}
}
}
}
}
}
}
}
このクエリは、@timestampフィールドを15分間隔で集計し、エラーコードごとの出現割合を計算します。このクエリを実行すると、以下のような結果が得られます。
{
"took" : 150,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"timestamp" : {
"buckets" : [
{
"key_as_string" : "2025-03-09T00:30:00.000Z",
"key" : 1741480200000,
"doc_count" : 2,
"src" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "IN",
"doc_count" : 2,
"response" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "200",
"doc_count" : 2,
"response_count" : {
"value" : 2
},
"response_percentage" : {
"value" : 1.0
}
}
]
}
}
]
},
"timestamp" : {
"value" : 1.741480742912E12,
"value_as_string" : "2025-03-09T00:39:02.912Z"
}
},
...
]
}
}
}
Anomaly Detectionの設定
次に、Anomaly Detectionの設定を行います。Anomaly Detectionの設定には、以下の手順が必要となります。
Anomaly Detector(ジョブ)の作成
Anomaly Detectorは、異常検知を行うための設定を定義します。この時、Datafeedで利用するAggregationの中から、集計対象となるフィールドをsummary_count_field_name
として指定する必要があります。今回の場合、エラーコードの出現割合を元に異常検知を行うため、上記のアグリゲーションクエリーのレスポンス中でエラーコードの出現割合を表しているresponse_percentage
フィールドを指定します。
PUT _ml/anomaly_detectors/response_percentage_job
{
"analysis_config": {
"bucket_span": "1h",
"detectors": [
{
"function": "mean",
"field_name": "response_percentage",
"by_field_name": "response",
"use_null": true
}
],
"influencers": ["response"],
"summary_count_field_name": "response_percentage"
},
"data_description": {
"time_field": "timestamp"
},
"analysis_limits": {
"model_memory_limit": "11MB"
},
"model_plot_config": {
"enabled": true
},
"results_index_name": "test-job-anomalies",
"results_retention_days": 14
}
各パラメーターの意味は以下の表の通りです。
パラメーター | 説明 |
---|---|
analysis_config |
異常検知の設定を定義する。 |
bucket_span |
データを集計する間隔を指定する。 |
detectors |
異常検知を行うためのDetectorを定義する。 |
function |
Detectorで利用する関数を指定する。 |
field_name |
Detectorで利用するフィールドを指定する。 |
by_field_name |
Detectorで利用するフィールドを指定する。 |
use_null |
フィールドの値がnullの場合に異常検知を行うかどうかを指定する。 |
influencers |
異常検知の結果に影響を与えるフィールドを指定する。 |
summary_count_field_name |
Datafeedで利用するAggregationの中から、集計対象となるフィールドを指定する。 |
data_description |
データの時間フィールドを指定する。 |
analysis_limits |
モデルのメモリ制限を指定する。 |
model_plot_config |
モデルのプロットを有効にするかどうかを指定する。 |
results_index_name |
異常検知の結果を保存するインデックス名を指定する。 |
results_retention_days |
異常検知の結果を保存する日数を指定する。 |
Datafeedの作成
次に、Datafeedを作成します。Datafeedでは、Anomaly Detectorで利用するデータを取得するためのクエリを定義します。したがってここでは、前のセクションで検証したクエリーを利用します。
PUT _ml/datafeeds/response_percentage_feed
{
"job_id": "response_percentage_job",
"indices": [
"kibana_sample_data_logs"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"aggs": {
"timestamp": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1h"
},
"aggs": {
"timestamp": {
"max": {
"field": "@timestamp"
}
},
"response": {
"terms": {
"field": "response.keyword",
"size": 10
},
"aggs": {
"response_count": {
"value_count": {
"field": "@timestamp"
}
},
"response_percentage": {
"normalize": {
"buckets_path": "response_count",
"method": "percent_of_sum"
}
}
}
}
}
}
}
}
作成したDatafeedの動作を確認するために、Preview APIを利用してDatafeedの結果を確認します。
GET _ml/datafeeds/test-feed/_preview
このクエリを実行すると、以下のような結果が得られます。
[
{
"timestamp" : 1741480742912,
"response" : "200",
"response_percentage" : 1.0
},
{
"timestamp" : 1741490781326,
"response" : "200",
"response_percentage" : 1.0
},
{
"timestamp" : 1741491424863,
"response" : "200",
"response_percentage" : 0.6666666666666666
},
{
"timestamp" : 1741491424863,
"response" : "503",
"response_percentage" : 0.3333333333333333
},
...
]
ちなみに、厳密にはDatafeedとJobは別のリソースですが、結局二つ合わせて一つのAnomaly Detectionとして動作するので、1回のAPI呼び出しで作成することもできます。この場合、datafeed_config
にDatafeedの設定をそのまま記載します。以前のバージョンではサポートされていなかったのですが、サポートされているバージョンを使っている場合はこちらの方法を使った方が間違いがなくおすすめです。設定内容は同じです。
PUT _ml/anomaly_detectors/response_percentage_job
{
"analysis_config": {
"bucket_span": "1h",
"detectors": [
{
"function": "mean",
"field_name": "response_percentage",
"by_field_name": "response",
"use_null": true
}
],
"influencers": [
"response"
],
"summary_count_field_name": "response_percentage"
},
"data_description": {
"time_field": "timestamp"
},
"analysis_limits": {
"model_memory_limit": "11MB"
},
"model_plot_config": {
"enabled": true
},
"results_index_name": "test-job-anomalies",
"results_retention_days": 14,
"datafeed_config": {
"datafeed_id": "response_percentage_feed",
"indices": [
"kibana_sample_data_logs"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"aggs": {
"timestamp": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1h"
},
"aggs": {
"timestamp": {
"max": {
"field": "@timestamp"
}
},
"response": {
"terms": {
"field": "response.keyword",
"size": 10
},
"aggs": {
"response_count": {
"value_count": {
"field": "@timestamp"
}
},
"response_percentage": {
"normalize": {
"buckets_path": "response_count",
"method": "percent_of_sum"
}
}
}
}
}
}
}
}
}
ジョブの起動
正しくDatafeedが設定できたら、ジョブを開始しましょう。この操作はKibana上から実行可能です。
ジョブを実行してDatafeedを開始するには、KibanaのAnomaly detection jobsの画面にて、作成したジョブのメニューから「Start datafeed」をクリックします。
もちろん必要があれば、以下のようなOpen anomaly detection jobs APIおよびStart datafeeds APIを利用してジョブを開始することもできます。
POST _ml/anomaly_detectors/response_percentage_job/_open
POST _ml/datafeeds/response_percentage_feed/_start
実行結果の確認
ジョブを起動後は、通常のAnomaly detectionと同様Single Metric ViewerおよびAnomaly Explorerにて検出した異常を閲覧できます。
まとめ
ElasticsearchのAnomaly DetectionでAggregationを利用する方法について紹介しました。独自のDatafeedを定義することで、Elasticsearchの持つ全てのAggregationを利用できるため、柔軟な異常検知の実装が可能になります。また、集計処理をElasticsearchのAggregationにオフロードできるため、MLノードの負荷低減が期待できます。
Anomaly Detectionの設定は少し複雑ですが、Datafeedを利用することで柔軟な異常検知が可能になります。ぜひ試してみてください。