0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Elastic Stack v9.3新登場のWorkflowsでワークフローとAIエージェントによるデータ分析を実装する例(第5回)

0
Last updated at Posted at 2026-03-05

はじめに

このブログのシリーズでは、ElasticサンプルデータのKibana Flight Dataの飛行機の離発着履歴データを使い、Elastic WorkflowsとElastic Agent Builderを使ったフライト遅延の調査と原因分析の運用の自動化までの道のりをお伝えします。
前回はフライト遅延に関しての情報をCase(Elasticのチケット機能)に自動的に記録し、その原因分析をElastic Agent Builderで作ったエージェントにて行いました。

本ブログでは、この自動化をもう一歩完成形に近づけます。

  • フライトデータ分析方法を追加し、もっと色々な情報をCaseに足した上で、原因分析します。これまではフライト遅延しているデータの検索だけでした。

本ブログはElastic Cloudでv9.3のElasticデプロイメントを使用しています。本ブログの内容を行うにはEnterpriseサブスクリプションである必要があります。フリートライアルでも試すことが可能です。

シリーズの他の記事はこちらから参照ください
第1回
第2回
第3回
第4回
第5回
第6回

このブログで作成するもの

  • Elastic ML Anomaly Detection job flight_delay_anomalies
    機械学習にてFlightDelayMinの数値の異常を検知するMLジョブです

  • ワークフロー write_flight_anomaly_to_case 
    MLジョブの検知結果データを検索し、存在すればそれをCaseにレポートします。

  • Agent Tool write_flight_anomaly_to_case
    上記ワークフローをエージェントが実行するためのToolです

  • AIエージェント check_flight_delay_agent
    これまでのcheck_flight_delay_agentを更新し、write_flight_anomaly_to_caseも利用できるように設定変更します

ここまでの呼び出しフロー

今回の変更部分 トリガー 呼び出しフロー
チャットで特定の日付と空港の調査を依頼 Agent check_flight_delay_agent --> Workflow write_flight_delay_to_case
今回作成 チャットで特定の日付と空港のML異常のデータ調査を依頼 Agent check_flight_delay_agent --> Workflow write_flight_anomaly_to_case
チャットでオープンなCaseのリストの表示を依頼 Agent rca_flight_case_agent_v4 --> Agent Tool platform.core.cases
チャットで特定の日付と空港のCaseに対する原因分析を依頼 Agent rca_flight_case_agent_v4 --> Workflow write_comment_to_case
チャットで今回のと類似のレポートの検索を依頼 Agent rca_flight_case_agent_v4 --> Workflow search_similar_rca_reports
チャットで今回のレポートを保存を依頼 Agent rca_flight_case_agent_v4 --> Workflow index_update_rca_reports

※ AgentがWorkflowを呼び出すには、間にそれぞれのWorkflow実行用のAgent Toolが必要ですが、冗長になるので上記からは省いています。

事前準備:Elastic MLの異常検知ジョブの作成

これまでのフライトデータから問題を見つける手段は、エージェント check_flight_delay_agentがフライト遅延データ (FlightDelayMin > 0でデータ検索)を見つけることでした。
今回は新しく別のエージェント「フライト異常検知エージェント(check_flight_anomalies)」を作り、Elasticのマシンラーニング異常検知の結果を見てCaseへ書き込むようにします。

まずは異常検知ジョブを作成します。

CleanShot 2026-02-09 at 15.18.47@2x.png

扱っているのがただのサンプルデータなので、どこまでリアルな異常検知ができるかはわかりませんがこのように設定します。
CleanShot 2026-02-09 at 15.19.54@2x.png

JOB IDをflight_delay_anomaliesとし、Groupsをkibana_sample_flights、Job descriptionを入れ、Use dedicated indexをオンにしておきます。
CleanShot 2026-02-09 at 15.29.46@2x.png

精度はさておき、何かしらの異常検知データはできました。
CleanShot 2026-02-09 at 15.22.56@2x.png

異常データがどのIndexに入っているか確認します。
CleanShot 2026-02-09 at 15.36.46@2x.png

このシステムインデックスです。
CleanShot 2026-02-09 at 15.37.18@2x.png

実装:ワークフローwrite_flight_anomaly_to_caseで異常検知ジョブを特定する

これまでのワークフローwrite_flight_delay_to_caseをCloneし、データ検索部分だけ変更し、上記の異常データを探しようにします。

version: "1"
name: write_flight_anomaly_to_case
description: Write Anomaly Detection result to a case comment
tags:
  - draft
triggers:
  - type: manual
inputs:
  - name: city_name
    type: string
  - name: start_date
    type: string
  - name: end_date
    type: string
consts:
  search_parameter: "+{{ inputs.city_name }} +{{ inputs.start_date | date: '%Y/%m/%d' }}"
  search_parameter_encoded: ${{consts.search_parameter}} | url_encode
steps:    
  - name: search_parameter_encoded
    type: console
    with:
      message: |
        {% assign keyword = inputs.city_name %}
        {% assign date = inputs.start_date | date: '%Y/%m/%d' %}
        {% assign query = "+" | append: keyword | append: "+" | append: '"' | append: date | append: '"' %}
        {{ query | url_encode }}

  - name: execute_esql
    type: elasticsearch.esql.query
    with:
      query: |
        FROM .ml-anomalies-custom-flight_delay_anomalies-*
        | WHERE job_id == "flight_delay_anomalies" AND result_type == "influencer" AND influencer_field_name == "OriginCityName" AND OriginCityName == "{{ inputs.city_name }}"
        | WHERE @timestamp >= "{{ inputs.start_date }}" AND @timestamp < "{{ inputs.end_date }}" 
        | KEEP @timestamp, job_id, result_type, anomaly_score, influencer_field_name, influencer_score
        | SORT @timestamp ASC
        | LIMIT 100
      format: json
  - name: write_to_case_if_data_exist
    type: if
    condition: "steps.execute_esql.output.documents_found > 0"
    steps:
    - name: check_existing_case
      type: kibana.request
      with:
        method: "GET"
        path: "/api/cases/_find?defaultSearchOperator=AND&searchFields=title&search={{ steps.search_parameter_encoded.output }}"
    - name: create_new_case_if_none
      type: if
      condition: 'steps.check_existing_case.output.total: 0'
      steps:
      - name: create_new_case
        type: kibana.createCaseDefaultSpace
        with:
          title: "{{ inputs.city_name }} {{ inputs.start_date | date: '%Y/%m/%d' }} のフライト遅延とキャンセル"
          owner: observability
          description: >
            目的:フライトのキャンセル及び大幅な遅延をまとめ、日毎のレポートを作成し、後に参照できるように記録する
            対象日付:{{ inputs.start_date | date: '%Y/%m/%d' }}
            対象空港:{{ inputs.city_name }}
          settings:
            syncAlerts: true
          severity: medium
          tags: []
          connector:
            id: "none"
            name: "none"
            type: ".none"
            fields:

    - name: case_id
      type: console
      with:
        message: "{% if steps.check_existing_case.output.total == 0 %}{{ steps.create_new_case.output.id }}{% else %}{{ steps.check_existing_case.output.cases[0].id }}{%endif%}"

    - name: transform_output_by_ai
      type: ai.agent
      with:
        agent_id: flight_analyzer_agent
        message: "次のデータをCaseに追記するコメントに変換してください: {{steps.execute_esql.output | json}}"

    - name: add_response_to_existing_case
      type: kibana.request
      with:
        method: POST
        path: /api/cases/{{ steps.case_id.output }}/comments
        body:
          type: user
          owner: observability
          comment: "{{ steps.transform_output_by_ai.output }}"
      
    else:
    - name: output_no_result
      type: console
      with:
        message: "No data found for {{ inputs | json }}"
    
  - name: output_succes
    type: if
    condition: "steps.execute_esql.output.documents_found > 0"
    steps:
      - name: log_output
        type: console
        with:
          message: |
            以下のレポートがCase Id: [{{steps.case_id.output}}]({{ kibanaUrl }}/app/observability/cases/{{steps.case_id.output}}) に追加されました。
            {{ steps.transform_output_by_ai.output }}

上記のワークフローをテストします。異常データが存在する条件でテストします。
CleanShot 2026-02-09 at 15.43.20@2x.png

ワークフローを実行結果。見つけた異常データに対するAIのサマライズのOutput。
CleanShot 2026-02-09 at 15.44.39@2x.png

正しくCaseが新規に作成され、AIサマライズ済みの文章がコメント追加されています。
CleanShot 2026-02-09 at 15.45.08@2x.png

Agentツールwrite_flight_anomaly_to_caseを作成

第1回で作成したAgent Builder Toolのwrite_flight_delay_to_caseをCloneし、write_flight_anomaly_to_caseを作成し、次のように設定します。
CleanShot 2026-02-09 at 15.51.40@2x.png

Description
Kibana Sample Data Flightsに対するフライト遅延時間の異常検知データを検索し、その結果をCaseに追加します。対象のCaseが存在しない場合、Caseを新規作成します。

このツールには、入力として以下の情報を与えてください。
* 空港の都市名 ... 頭文字大文字の英語で入力 (例: Tokyo)
* フライトデータ分析の開始日時 ... ISOフォーマットであること(例: 2026-02-04T00:00:00Z)
* フライトデータ分析の終了日時 ... ISOフォーマットであること(例: 2026-02-05T00:00:00Z)

念の為、第1回で作成したwrite_flight_delay_to_caseのDescriptionの文章を微調整し、用途を冒頭にするようにしました。
CleanShot 2026-02-09 at 15.54.00@2x.png

Description
Kibana Sample Data Flightsのインデックスから入力された情報に関するフライトの遅延・キャンセルを検索し、その結果をCaseに追加します。対象のCaseが存在しない場合、Caseを新規作成します。

このツールには、入力として以下の情報を与えてください。
* 空港の都市名 ... 頭文字大文字の英語で入力 (例: Tokyo)
* フライトデータ分析の開始日時 ... ISOフォーマットであること(例: 2026-02-04T00:00:00Z)
* フライトデータ分析の終了日時 ... ISOフォーマットであること(例: 2026-02-05T00:00:00Z)

第一回で作成したエージェントcheck_flight_delay_agentに上記のToolを追加登録します。
CleanShot 2026-02-09 at 15.55.06@2x.png

動作確認

最初にフライト遅延を調べてというと、第1回で作成した方のツールを選択しました。
CleanShot 2026-02-09 at 15.57.26@2x.png

今回の新しいツールで異常検知データも探してもらいます。
CleanShot 2026-02-09 at 15.58.11@2x.png

CleanShot 2026-02-09 at 16.04.27@2x.png
(文字化けしてしまってますね)

Caseにも登録されました。
CleanShot 2026-02-09 at 16.05.17@2x.png

実装:原因分析エージェントrca_flight_case_agent_v4

第4回の記事でアップデートした原因調査用のエージェントrca_flight_case_agent_v3をCloneし、v4版を新しく作り、異常検知の情報もレポートに追加できるように、以下のようにプロンプトを更新します。

CleanShot 2026-02-09 at 16.12.26@2x.png

あなたは入力された情報を元に以下のことを実施するアシスタントです。
* Caseの検索
* Case内の情報を元に原因分析し、指定された報告書フォーマットでレポートを表示
* レポートをCaseに追記
* レポートを保存
* 類似のレポートを検索して表示

### 期待する動き
* Case Idが与えられたら、そのままそのCaseの分析に進んでください。
* Case Idが与えられなかったら、与えられた情報からOpenステータスのCaseのtitleに対してキーワード検索してください。Caseのtitleは次のような形式になっています。	"Paris 2026/01/31 のフライト遅延とキャンセル"。都市の名前は頭文字大文字の英語がルールです。日付のフォーマットもこれに沿ってください。
* Caseの検索結果としてCaseのIDとタイトルとそのリンクだけを表示してください。
* ユーザーにどのCaseを分析したいかを選らんでもらってください。
* Caseが扱っている対象の空港(一つ)を識別してください。
* その日のフライトの遅延・キャンセルが、当該空港で発生している問題に起因しているかを検討し、該当する場合は原因をレポートにまとめてください。
* 原因のカテゴリは次のものから一つだけ選んでください:天候、システムトラブル、機材トラブル、その他
* ユーザーから指示があった場合のみ、原因レポートをCaseに追加書き込みしてください。書き込みが成功したら、シンプルに「Case ID: xxx , タイトル: yyyy へレポートを追記しました。リンク zzz」のようなフォーマットでシンプルな返事で返してください。

### 役割
- 文章から事実情報を抽出し、決められたフィールドに整理してください。
- 推測や創作は行わず、コメントに含まれる情報のみを使用してください。
- 情報が不足している項目は「不明」と記載してください。
- 表現は簡潔で公的な報告書調にしてください。

### レポート出力形式

必ず以下にあるヘッダ名を出力してください。
必ず以下の「フィールド名: 値」の形式で出力してください。

- - -
# RCAレポート
対象空港: 
対象日: 
概要: 
影響を受けた便名: 
原因のカテゴリ: 
異常検知の有無:
異常検知ジョブ名:
異常検知スコア:
- - -

動作確認: 原因分析エージェントrca_flight_case_agent_v4に対してチャット

意図通り、2つのツールを使ってフライトデータの問題を見つけ、それをRCAレポートでサマライズして報告に加えてくれました。
CleanShot 2026-02-09 at 16.23.03@2x.png

おわり

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?