データフィード / Datafeedとは
Elasticsearchではログやメトリクスなど時系列データを取り扱う場合、通常の振る舞いから外れる挙動を異常検知(Anomaly Detection)という仕組みを使って検出することができます。Anomaly Detectionの導入については以下の記事で紹介しています。
この異常検知のコアロジックはElasticsearchクラスター内のMLノードで動作する、C++で実装されたプロセスによって動作しています。しかしElasticsearch自体はこのプロセスとは別のJavaアプリケーションです。そこで、Elasticsearchの機能としてAnomaly Detectionを実現するためには、Anomaly Detectionジョブの対象となるデータをElasticsearchのデータノードから抽出し、C++のプロセスに受け渡す機能が必要となります。この機能がデータフィードです。
データフィードについての公式ドキュメントは以下です。
この記事ではこのデータフィードの仕組みと、一歩進んだデータフィードの設定方法について紹介します。
アーキテクチャー
ElasticsearchのAnomaly Detectionでは、「ジョブ」というリソースは実質的にElasticsearchの外部で動作するC++のプロセスを表します。つまり新しいAnomaly Detectionのジョブを作成すると、それに対応するC++のプロセスが立ち上がり、 MLノード上で常時起動した状態になります。
これに対して、Elasticsearchの同一MLノード上で動作し、Dataノードから対象のデータを取得してこのC++のプロセスにプロセス間通信でデータを送信するのがデータフィードです。これはJVM側で動作します。
UIのウィザードや、Create anomaly detection jobs APIを使うと、このジョブとデータフィードを同時に作成することができますが、内部的にはそれぞれ別のリソースが作成されているわけです。
Anomaly Detectionの画面を見ても、ジョブとデータフィードはそれぞれ別個に動作状態が表示されているのがわかります。ジョブ状態がOpenedのものは、ジョブのC++プロセスがMLノード上で立ち上がり、データを待ち受けていることを示しています。それに対してデータフィードがStartedになっているものは、ElasticsearchのMLノードが継続的にデータフィードで対応するジョブのプロセスにデータを送信していることを表しています。これは上記のような設計によるものなんですね。
このデータフィードはAnomaly Detectionの設定値であるfrequency
の間隔で起動されます。
シンプルな定義
では、具体的にAnomaly Detectionのジョブを作成しながらデータフィードの定義方法を確認してきましょう。
データフィードを細かく調整しながらジョブを作成するのにはAdvancedのウィザードを使う方法もありますが、おすすめは一旦大枠をウィザードで作った後、JSONを表示してそれをベースにコンソールから直接APIを呼び出すことです。そうするとどのような設定でジョブを作成したのかわかりますし、そのJSONをGitなどで管理すれば複数の環境に同じ定義をデプロイするのも安全です。ちなみにAPIで作成する場合、ウィザードにあるようなマルチメトリックとか集団(Population)とかいった違いはありません。それらの違いはDetectorの定義として表現することになります。Detectorについては別の記事を書いていますのでそちらも参照してください。
以下、Kibanaのサンプルデータ(kibana_sample_data_logs)に対してログ件数に対する異常検知のジョブをAPIで定義する例です。マルチメトリックのウィザードで雛形を作りました。シングルメトリックの方がUI的には簡単なのですが、ちょっと後で説明する事情があってマルチメトリックをベースにしています。
PUT _ml/anomaly_detectors/event_rate_logs
{
"analysis_config": {
"bucket_span": "15m",
"detectors": [
{
"function": "count"
}
],
"influencers": []
},
"data_description": {
"time_field": "timestamp"
},
"analysis_limits": {
"model_memory_limit": "11MB"
},
"datafeed_config": {
"indices": [
"kibana_sample_data_logs"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"datafeed_id": "datafeed-event_rate_logs"
}
}
最後にあるdatafeed_config
オブジェクトがデータフィードの設定ですね。見れば一目瞭然ですが、kibana_sample_data_logs
インデックスから全てのドキュメントを抽出していることがわかります。
もしこのジョブに送信するデータを特定の条件で絞り込みたい場合は、query
に適切な条件を指定すれば良いことがわかります。書式はQueryDSLになります。
この例ではジョブとデータフィードを同時に作成していますが、以下のように個別に作成することも可能です。
PUT _ml/anomaly_detectors/event_rate_logs2
{
"analysis_config": {
"bucket_span": "15m",
"detectors": [
{
"function": "count"
}
],
"influencers": []
},
"data_description": {
"time_field": "timestamp"
},
"analysis_limits": {
"model_memory_limit": "11MB"
}
}
PUT _ml/datafeeds/datafeed-event_rate_logs2
{
"indices": [
"kibana_sample_data_logs"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"job_id": "event_rate_logs2"
}
またこのようにして作成したデータフィードは、それ自体がどのようなデータを送信しているかPreview datafeeds APIでプレビューすることができます。
GET _ml/datafeeds/datafeed-event_rate_logs2/_preview
以下のような結果が得られます。
[
{
"timestamp": 1708216742912
},
{
"timestamp": 1708216742912
},
{
"timestamp": 1708226781326
},
{
"timestamp": 1708226781326
},
(略)
]
これを見るとtimestampの情報しかありません。これは対象のジョブがcountのみを行っており、異常検知を実施するにはイベントの時間とレコードの件数さえわかれば良いからです。データフィードは検索結果をそのまま送っているわけではなく、ジョブに必要なフィールドに絞って送信していることがわかります。従ってデータフィードは特定のジョブと完全にペアになって動作していて、そのジョブが必要とするフィールドが何かを調べてこのようなフィールドの取捨選択を行っているわけです。
クエリーによる絞り込みとランタイムフィールド
データフィードがどういうものかわかってきたところで、カスタマイズの方法や用途について見ていきましょう。ここではKibanaのeCommerceのサンプルデータを使います。以下のような要件があったとしましょう。
- 1時間ごとの売上高(taxful_total_price)のアノマリーを見つける
- ヨーロッパの国の注文に限定("geoip.continent_name": "Europe")
- 国(geoip.country_iso_code)及び性別(customer_gender)でパーティショニングして分析する
一つ目の要件は特に変わったことはないですね。バケットスパンを1時間に設定し、売上(taxful_total_price)の合計をdetectorに設定します。
二つ目については、対象のデータの絞り込みが必要になります。データフィードで対象となるデータの絞り込みができますので、datafeed_configに以下のようなQueryDSLを指定して絞り込みを行いましょう。
"query": {
"bool": {
"filter": [
{
"term": {
"geoip.continent_name": "Europe"
}
}
]
}
}
三つ目についてはどうでしょうか。ElasticのAnomaly Detectionではpartition_field_name
はひとつしか設定することができず、APIの仕様としては複数の要素でのパーティショニングはサポートされていないということになります。では実現不可能かというと、そういうわけでもありません。ランタイムフィールドを使って二つのフィールドの値をダイナミックに結合したフィールドを作成し、そのランタイムフィールドに対してpartition_field_name
を設定します。こうすることで単一のフィールドに対する分析として設定できるため、このような要件を実現することが可能になります。当然三つ以上のフィールドに対するパーティショニングも同様の戦略をとることで実現できます。
ここではcountry-gender
という名前で、{国名}-{性別}という形式の値を持つ以下のようなランタイムフィールドをデータフィードに定義します。
"runtime_mappings": {
"country-gender": {
"type": "keyword",
"script": {
"source": "emit(doc['geoip.country_iso_code'].value + '-' + doc['customer_gender'].value)"
}
}
}
以上を元に、高度な設定(Advanced)のウィザードを使ってジョブを作成した例が以下になります。後からの分析がわかりやすいように、インフルエンサー(影響)には国名と性別について個別のフィールドも指定しました。
分析結果を見て見ましょう。
これを見ると、イギリスの男性(GB-MALE)で比較的継続的に異常値が検出されていることがわかります。
アグリゲーションクエリーを使う
Anomaly Detectionでは、Bucket Spanをベースにログの件数やCPUの使用率などのメトリクスを集計した結果に対して異常検知の学習を実施します。通常この集計はC++側のプロセスで実行されますが、Elasticsearchにはアグリゲーションの機能があるため、Elasticsearch側でアグリゲーションをした結果をC++のジョブへ渡すようにすることが可能です。こうすることでC++のジョブ側での処理負荷を低減させることができます。この機能の詳細は以下の公式ドキュメントにまとまっていますので、詳細はこちらを参照してください。諸々の前提条件はここに記載されているので、特にRequirementsとLimitationsのセクションは必読です。
実はシングルメトリックのウィザードを利用すると、自動的にこのアグリゲーションがデータフィードに適用されるようになっています。最初の例でマルチメトリックのウィザードを使ってJSONを作成したのは、シングルメトリックを使ってしまうとむしろこのような少し高度な設定が自動的に入ってしまうからだったんですね。
シングルメトリックのウィザードで同様にKibanaのサンプルデータ(kibana_sample_data_logs)に対してログ件数に対する異常検知のジョブを定義し、高度な設定(Advanced)に変換した後、データフィードの定義をJSONで確認してみましょう。
ジョブの定義は以下のようになります。
PUT _ml/anomaly_detectors/event_rate_agg
{
"analysis_config": {
"bucket_span": "15m",
"detectors": [
{
"function": "count"
}
],
"influencers": [],
"summary_count_field_name": "doc_count"
},
"data_description": {
"time_field": "timestamp"
},
"analysis_limits": {
"model_memory_limit": "11MB"
},
"datafeed_config": {
"datafeed_id": "datafeed-event_rate_agg",
"job_id": "event_rate_agg",
"indices": [
"kibana_sample_data_logs"
],
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"aggregations": {
"buckets": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "900000ms"
},
"aggregations": {
"timestamp": {
"max": {
"field": "timestamp"
}
}
}
}
}
}
}
二つ大きなポイントがあります。一つ目はデータフィードの設定に"aggregations"が入っていることです。またトップレベルのアグリゲーションは(当然ながら)バケットを表すために時間を表す集計となっている必要があります。具体的には公式ドキュメントにある通りdate_histgram
アグリゲーションかcomposite
アグリゲーションで、どちらであっても時刻フィールドに対するmaxを返すアグリゲーションを内包する必要があることです。
二つ目は、ジョブの定義でフィールドとして指定している名前と、アグリゲーションの名前が一致していることです。例えばジョブの定義に以下があります。
"data_description": {
"time_field": "timestamp"
},
このtimestamp
はドキュメントに記載されたのフィールドではなく、アグリゲーションで集計したものを利用するため、バケットの時刻を表す情報を同名のアグリゲーション結果から取得することになります。従って、時刻が最終的に保存されるアグリゲーションの名前をtimestamp
となるように設定します。
"aggregations": {
"buckets": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "900000ms"
},
"aggregations": {
"timestamp": { # <------ ここがtimestampとなるようにする
"max": {
"field": "timestamp"
}
}
}
}
}
そのほか、例えば特定のフィールドをキーにパーティショニングをする場合やインフルエンサーを指定する場合なども、それらで指定されている値がアグリゲーションとして同名で取得できるようになっている必要があります。
ウィザードでジョブを作成する場合だと、データフィードの内容をUIで確認すると以下のような形になっています。
先ほどの単純な例に加えて、doc_count
がデータフィードに追加されています。doc_count
はアグリゲーションを実行するとElasticsearchが自動的に作成するフィールドですね。データフィードのそれぞれのオブジェクトがバケットを表しているため、そこに含まれるイベント件数がこのdoc_countとして表されているわけです。またここのtimestampは上記のmaxアグリゲーションの結果になっています。
またデータフィード内でアグリゲーションを実施する際、このバケット内にどれだけドキュメントが含まれていたのかはdetectorの処理に常に必要とされる情報であるため、その情報が含まれているフィールドを明示する必要があります。これはジョブを作成する際に analysis_config.summary_count_field_name
として以下のように指定します。
"analysis_config": {
(略),
"summary_count_field_name": "doc_count"
},
summary_count_field_name
は高度な設定のウィザードからジョブを作成する場合、以下のようなプルダウンで指定できます。
アグリゲーション済みのデータを使う
バケット単位での集計はElasticsearchのアグリゲーションで事前に処理することもできますが、そもそもインデックスに格納されているデータが、集計済みのデータであるようなケースがあり得ます。例えば上記の例のようにログの件数に対する異常検知を実装したい場合、バケットごとのログ件数を集計した結果をContinuousモードのTransformで常時集計しておき、この集計結果インデックスに対してAnomaly Detectionを仕掛けることなどが考えられるでしょう。こうすることによって、Anomaly Detectionとしての負荷の多くの部分をTransformにオフロードすることができます。Transformについては以下を参照してください。
トランスフォームの構成
具体的な例を見てみましょう。以下のようなpivotの設定を持つtransformがあったとします。
"pivot": {
"group_by": {
"event.action": {
"terms": {
"field": "event.action"
}
},
"@timestamp": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h"
}
}
},
"aggregations": {
"event.count": {
"value_count": {
"field": "@timestamp"
}
}
}
}
このトランスフォームによって、event.action
単位で1時間ごとに何件のログが出力されているのかを集計しています。結果は以下のような構造のドキュメントとして出力されます。
{
"@timestamp": "2024-04-15T06:00:00.000Z",
"event": {
"count": 2,
"action": "logged-off"
}
}
ジョブの作成
では、この構造のインデックスに対してAnomaly Detectionジョブを作成してみましょう。設定内容を逐次説明すると長くなってしまうので、ここは手を抜いてスクリーンショットを貼っておきますので確認してください。
ポイントは以下です。
- TransformとAnomaly Detectionのバケットスパンを1時間で揃えている
- サマリーカウントフィールドにevent.countを指定している
- 今回の例ではバケット内に1ドキュメントのみ含まれており、そのevent.countに集計結果が入っている。ここではmax関数を利用しているが、averageやminでも結果は同じである。
- ジョブのパーティションとしてevent.actionを指定している(Transformでの分析と揃えている)
JSONでの表現は以下の通り。
結果として以下のような異常が検知されていました。問題なさそうです。
データフィードを使わずにジョブにデータを送信する
これまでに説明してきたように、ElasticsearchのAnomaly Detectionでは分析を実行するジョブと、分析対象のデータをジョブに送るデータフィードが分離されています。そのため実は(すでにDeprecatedのため今後削除予定ですが)作成したジョブに対して、データフィードではなくAPIから直接データを送信することも現時点では可能です。
これは想像ですが、この機能はおそらくElasticsearchのAnomaly Detection開発チームがジョブのテストなどのために用意したものなのではないかと感じています。このようなものなので正直通常のユースケースでの利用をお勧めすることはできないのですが、この機能を使うことによって実際にインデックスに保存されていないデータを使ってジョブの動作を確認できますので、複雑なジョブを作成し、かつそれに対するさまざまなパターンのテストをしたい、というような場合には便利かもしれません。
Deprecatedの機能のため具体的な例をここで上げることはしませんが、興味のある方は以下の公式ドキュメントを参考にしてください。
おわりに
この記事ではElasticsearchでAnomaly Detectionにおいて、分析対象のデータを収集するデータフィードの役割や設定方法について説明しました。どちらかというと上級者向けの内容ですが、この機能を使うことによって対象のデータを適切な形に絞り込んで性能を向上させたり、ランタイムフィールドを使ってウィザードだけでは実現できなかった集計を可能にしたりするようなこともできますので、いざ複雑なジョブを作成しようという際には参考にしてください。
また、より実践的な内容の記事も書きましたので、特にDatafeedでAggregationを利用したジョブを作成する場合にはそちらも参照にしてください。