はじめに
このブログのシリーズでは、ElasticサンプルデータのKibana Flight Dataの飛行機の離発着履歴データを使い、Elastic WorkflowsとElastic Agent Builderを使ったフライト遅延の調査と原因分析の運用の自動化までの道のりをお伝えします。
前回はフライト遅延に関しての情報をCase(Elasticのチケット機能)に自動的に記録し、その原因分析をElastic Agent Builderで作ったエージェントにて行いました。
本ブログでは、この自動化をもう一歩完成形に近づけます。
- フライトデータ分析方法を追加し、もっと色々な情報をCaseに足した上で、原因分析します。これまではフライト遅延しているデータの検索だけでした。
本ブログはElastic Cloudでv9.3のElasticデプロイメントを使用しています。本ブログの内容を行うにはEnterpriseサブスクリプションである必要があります。フリートライアルでも試すことが可能です。
シリーズの他の記事はこちらから参照ください
第1回
第2回
第3回
第4回
第5回
第6回
このブログで作成するもの
-
Elastic ML Anomaly Detection job
flight_delay_anomalies
機械学習にてFlightDelayMinの数値の異常を検知するMLジョブです -
ワークフロー
write_flight_anomaly_to_case
MLジョブの検知結果データを検索し、存在すればそれをCaseにレポートします。 -
Agent Tool
write_flight_anomaly_to_case
上記ワークフローをエージェントが実行するためのToolです -
AIエージェント
check_flight_delay_agent
これまでのcheck_flight_delay_agentを更新し、write_flight_anomaly_to_caseも利用できるように設定変更します
ここまでの呼び出しフロー
| 今回の変更部分 | トリガー | 呼び出しフロー |
|---|---|---|
| チャットで特定の日付と空港の調査を依頼 | Agent check_flight_delay_agent --> Workflow write_flight_delay_to_case
|
|
| 今回作成 | チャットで特定の日付と空港のML異常のデータ調査を依頼 | Agent check_flight_delay_agent --> Workflow write_flight_anomaly_to_case
|
| チャットでオープンなCaseのリストの表示を依頼 | Agent rca_flight_case_agent_v4 --> Agent Tool platform.core.cases
|
|
| チャットで特定の日付と空港のCaseに対する原因分析を依頼 | Agent rca_flight_case_agent_v4 --> Workflow write_comment_to_case
|
|
| チャットで今回のと類似のレポートの検索を依頼 | Agent rca_flight_case_agent_v4 --> Workflow search_similar_rca_reports
|
|
| チャットで今回のレポートを保存を依頼 | Agent rca_flight_case_agent_v4 --> Workflow index_update_rca_reports
|
※ AgentがWorkflowを呼び出すには、間にそれぞれのWorkflow実行用のAgent Toolが必要ですが、冗長になるので上記からは省いています。
事前準備:Elastic MLの異常検知ジョブの作成
これまでのフライトデータから問題を見つける手段は、エージェント check_flight_delay_agentがフライト遅延データ (FlightDelayMin > 0でデータ検索)を見つけることでした。
今回は新しく別のエージェント「フライト異常検知エージェント(check_flight_anomalies)」を作り、Elasticのマシンラーニング異常検知の結果を見てCaseへ書き込むようにします。
まずは異常検知ジョブを作成します。
扱っているのがただのサンプルデータなので、どこまでリアルな異常検知ができるかはわかりませんがこのように設定します。

JOB IDをflight_delay_anomaliesとし、Groupsをkibana_sample_flights、Job descriptionを入れ、Use dedicated indexをオンにしておきます。

実装:ワークフローwrite_flight_anomaly_to_caseで異常検知ジョブを特定する
これまでのワークフローwrite_flight_delay_to_caseをCloneし、データ検索部分だけ変更し、上記の異常データを探しようにします。
version: "1"
name: write_flight_anomaly_to_case
description: Write Anomaly Detection result to a case comment
tags:
- draft
triggers:
- type: manual
inputs:
- name: city_name
type: string
- name: start_date
type: string
- name: end_date
type: string
consts:
search_parameter: "+{{ inputs.city_name }} +{{ inputs.start_date | date: '%Y/%m/%d' }}"
search_parameter_encoded: ${{consts.search_parameter}} | url_encode
steps:
- name: search_parameter_encoded
type: console
with:
message: |
{% assign keyword = inputs.city_name %}
{% assign date = inputs.start_date | date: '%Y/%m/%d' %}
{% assign query = "+" | append: keyword | append: "+" | append: '"' | append: date | append: '"' %}
{{ query | url_encode }}
- name: execute_esql
type: elasticsearch.esql.query
with:
query: |
FROM .ml-anomalies-custom-flight_delay_anomalies-*
| WHERE job_id == "flight_delay_anomalies" AND result_type == "influencer" AND influencer_field_name == "OriginCityName" AND OriginCityName == "{{ inputs.city_name }}"
| WHERE @timestamp >= "{{ inputs.start_date }}" AND @timestamp < "{{ inputs.end_date }}"
| KEEP @timestamp, job_id, result_type, anomaly_score, influencer_field_name, influencer_score
| SORT @timestamp ASC
| LIMIT 100
format: json
- name: write_to_case_if_data_exist
type: if
condition: "steps.execute_esql.output.documents_found > 0"
steps:
- name: check_existing_case
type: kibana.request
with:
method: "GET"
path: "/api/cases/_find?defaultSearchOperator=AND&searchFields=title&search={{ steps.search_parameter_encoded.output }}"
- name: create_new_case_if_none
type: if
condition: 'steps.check_existing_case.output.total: 0'
steps:
- name: create_new_case
type: kibana.createCaseDefaultSpace
with:
title: "{{ inputs.city_name }} {{ inputs.start_date | date: '%Y/%m/%d' }} のフライト遅延とキャンセル"
owner: observability
description: >
目的:フライトのキャンセル及び大幅な遅延をまとめ、日毎のレポートを作成し、後に参照できるように記録する
対象日付:{{ inputs.start_date | date: '%Y/%m/%d' }}
対象空港:{{ inputs.city_name }}
settings:
syncAlerts: true
severity: medium
tags: []
connector:
id: "none"
name: "none"
type: ".none"
fields:
- name: case_id
type: console
with:
message: "{% if steps.check_existing_case.output.total == 0 %}{{ steps.create_new_case.output.id }}{% else %}{{ steps.check_existing_case.output.cases[0].id }}{%endif%}"
- name: transform_output_by_ai
type: ai.agent
with:
agent_id: flight_analyzer_agent
message: "次のデータをCaseに追記するコメントに変換してください: {{steps.execute_esql.output | json}}"
- name: add_response_to_existing_case
type: kibana.request
with:
method: POST
path: /api/cases/{{ steps.case_id.output }}/comments
body:
type: user
owner: observability
comment: "{{ steps.transform_output_by_ai.output }}"
else:
- name: output_no_result
type: console
with:
message: "No data found for {{ inputs | json }}"
- name: output_succes
type: if
condition: "steps.execute_esql.output.documents_found > 0"
steps:
- name: log_output
type: console
with:
message: |
以下のレポートがCase Id: [{{steps.case_id.output}}]({{ kibanaUrl }}/app/observability/cases/{{steps.case_id.output}}) に追加されました。
{{ steps.transform_output_by_ai.output }}
上記のワークフローをテストします。異常データが存在する条件でテストします。

ワークフローを実行結果。見つけた異常データに対するAIのサマライズのOutput。

正しくCaseが新規に作成され、AIサマライズ済みの文章がコメント追加されています。

Agentツールwrite_flight_anomaly_to_caseを作成
第1回で作成したAgent Builder Toolのwrite_flight_delay_to_caseをCloneし、write_flight_anomaly_to_caseを作成し、次のように設定します。

Kibana Sample Data Flightsに対するフライト遅延時間の異常検知データを検索し、その結果をCaseに追加します。対象のCaseが存在しない場合、Caseを新規作成します。
このツールには、入力として以下の情報を与えてください。
* 空港の都市名 ... 頭文字大文字の英語で入力 (例: Tokyo)
* フライトデータ分析の開始日時 ... ISOフォーマットであること(例: 2026-02-04T00:00:00Z)
* フライトデータ分析の終了日時 ... ISOフォーマットであること(例: 2026-02-05T00:00:00Z)
念の為、第1回で作成したwrite_flight_delay_to_caseのDescriptionの文章を微調整し、用途を冒頭にするようにしました。

Kibana Sample Data Flightsのインデックスから入力された情報に関するフライトの遅延・キャンセルを検索し、その結果をCaseに追加します。対象のCaseが存在しない場合、Caseを新規作成します。
このツールには、入力として以下の情報を与えてください。
* 空港の都市名 ... 頭文字大文字の英語で入力 (例: Tokyo)
* フライトデータ分析の開始日時 ... ISOフォーマットであること(例: 2026-02-04T00:00:00Z)
* フライトデータ分析の終了日時 ... ISOフォーマットであること(例: 2026-02-05T00:00:00Z)
第一回で作成したエージェントcheck_flight_delay_agentに上記のToolを追加登録します。

動作確認
最初にフライト遅延を調べてというと、第1回で作成した方のツールを選択しました。

実装:原因分析エージェントrca_flight_case_agent_v4
第4回の記事でアップデートした原因調査用のエージェントrca_flight_case_agent_v3をCloneし、v4版を新しく作り、異常検知の情報もレポートに追加できるように、以下のようにプロンプトを更新します。
あなたは入力された情報を元に以下のことを実施するアシスタントです。
* Caseの検索
* Case内の情報を元に原因分析し、指定された報告書フォーマットでレポートを表示
* レポートをCaseに追記
* レポートを保存
* 類似のレポートを検索して表示
### 期待する動き
* Case Idが与えられたら、そのままそのCaseの分析に進んでください。
* Case Idが与えられなかったら、与えられた情報からOpenステータスのCaseのtitleに対してキーワード検索してください。Caseのtitleは次のような形式になっています。 "Paris 2026/01/31 のフライト遅延とキャンセル"。都市の名前は頭文字大文字の英語がルールです。日付のフォーマットもこれに沿ってください。
* Caseの検索結果としてCaseのIDとタイトルとそのリンクだけを表示してください。
* ユーザーにどのCaseを分析したいかを選らんでもらってください。
* Caseが扱っている対象の空港(一つ)を識別してください。
* その日のフライトの遅延・キャンセルが、当該空港で発生している問題に起因しているかを検討し、該当する場合は原因をレポートにまとめてください。
* 原因のカテゴリは次のものから一つだけ選んでください:天候、システムトラブル、機材トラブル、その他
* ユーザーから指示があった場合のみ、原因レポートをCaseに追加書き込みしてください。書き込みが成功したら、シンプルに「Case ID: xxx , タイトル: yyyy へレポートを追記しました。リンク zzz」のようなフォーマットでシンプルな返事で返してください。
### 役割
- 文章から事実情報を抽出し、決められたフィールドに整理してください。
- 推測や創作は行わず、コメントに含まれる情報のみを使用してください。
- 情報が不足している項目は「不明」と記載してください。
- 表現は簡潔で公的な報告書調にしてください。
### レポート出力形式
必ず以下にあるヘッダ名を出力してください。
必ず以下の「フィールド名: 値」の形式で出力してください。
- - -
# RCAレポート
対象空港:
対象日:
概要:
影響を受けた便名:
原因のカテゴリ:
異常検知の有無:
異常検知ジョブ名:
異常検知スコア:
- - -
動作確認: 原因分析エージェントrca_flight_case_agent_v4に対してチャット
意図通り、2つのツールを使ってフライトデータの問題を見つけ、それをRCAレポートでサマライズして報告に加えてくれました。

おわり







