2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kusto (KQL) でスキーマ変更に強いクエリを書く:未知の列も自動集計するテクニック

Last updated at Posted at 2025-07-28

はじめに

下図のようなサイトごとのセキュリティの統計データがあります。

image.png

このような表構造に対して、攻撃カテゴリごとの合計件数を算出し、重要度や傾向を円グラフで視覚化することで、セキュリティ運用の指針や優先順位づけに活用できます。これを今回の KQL クエリの目的とします。

セキュリティ運用では、攻撃手法の多様化により、ログデータのスキーマが頻繁に変化します。静的なクエリで実装すると確かに目的は達成できますが、攻撃カテゴリが増えるたびにクエリの改修が必要になります。

image.png

そこで本投稿では、未知の列も自動で集計できる "強い動的クエリの書き方" にフォーカスします。

静的クエリの課題

まずは従来の静的クエリです。特定の攻撃カテゴリ (Bad Bots, XSS, DDoS など) を手動で指定し、それぞれ合計件数を算出しています。

静的なクエリ
// データソース
_GetWatchlist('SecurityStatistics')
// 攻撃カテゴリごとの発生件数の合計を算出
| summarize 
    BadBots = sum(todouble(['Bad Bots'])),
    XSS = sum(todouble(['Cross Site Scripting'])),
    DDoS = sum(todouble(['DDoS Incidents'])),
    IllegalAccess = sum(todouble(['Illegal Resource Access'])),
    RemoteFile = sum(todouble(['Remote File Inclusion'])),
    SQLi = sum(todouble(['SQL Injection']))
// 攻撃カテゴリ名と発生件数の合計をペアにし、それらを配列形式にまとめる
| project CategoryValue = pack_array(
    pack('Category', 'Bad Bots', 'Count', BadBots),
    pack('Category', 'Cross Site Scripting', 'Count', XSS),
    pack('Category', 'DDoS Incidents', 'Count', DDoS),
    pack('Category', 'Illegal Resource Access', 'Count', IllegalAccess),
    pack('Category', 'Remote File Inclusion', 'Count', RemoteFile),
    pack('Category', 'SQL Injection', 'Count', SQLi)
  )
// ペア配列を行ごとに展開し、攻撃カテゴリごとのレコードに変換
| mv-expand CategoryValue
// 展開したデータから攻撃カテゴリ名と発生件数の合計を抽出し、それぞれの型にキャスト
| project Category = tostring(CategoryValue.Category), Count = todouble(CategoryValue.Count)
// 発生件数の合計の降順にソート
| order by Count desc 
// 最終結果を円グラフで表示
| render piechart 

このアプローチの最大の問題点は、攻撃カテゴリ (集計列) が追加されるたびに修正が必要になることです。データスキーマが頻繁に変化する運用環境では、この方法は管理・保守コストを上昇させてしまいます。

カイゼン: 動的列処理によるスマートな可視化

次に、動的に対象の攻撃カテゴリ (集計列) を抽出し、変更があってもメンテナンス不要なクエリをステップごとに紹介します。

Step 1: データソースを取得

(今回は) ウォッチリストからデータを取得して source に格納します

データソースを取得
// データソース
let source = _GetWatchlist('SecurityStatistics');

Step 2: 集計対象列の名前を抽出

まず、集計しない列を除外するための列名の一覧を定義します。次に、getschema 演算子を使ってデータソースのスキーマ情報 (列名) を取得し、先ほど定義した「除外列」を除いた列名のみを columnList に格納します。これにより、集計対象となる数値データ列 (攻撃カテゴリ) の選定を動的に実行します。

対象列の名前を抽出
// 除外列の指定
let excluded = dynamic([
    '_DTItemId', 'LastUpdatedTimeUTC', 'SearchKey', 'Site ID', 'Site Name', 'WAF Total'
]);
// 集計対象列の名前の抽出
let columnList = toscalar(
    source
    | getschema
    | where ColumnName !in (todynamic(excluded))
    | summarize make_list(ColumnName)
);

Step 3: キー・値ペアの展開と数値化

まず、pack_all() 関数により、データソースの各行のすべての列をキー・値のペア形式に変換し、mv-expand 演算子で各ペアを行ごとに展開します。そして、キー名 (攻撃カテゴリ) と値 (発生件数) を個別に抽出します。

データソースのキー・値ペア展開と数値化
// データソースのキー・値ペア展開と数値化
source
| extend all = pack_all()
| mv-expand pair = all
| extend kvp = parse_json(pair)
| extend Category = tostring(bag_keys(kvp)[0])
| extend val = todouble(kvp[Category])

Step 4: フィルター & 集計して可視化

Step 2」で抽出した集計対象列 (攻撃カテゴリ) のみに絞り込み、攻撃カテゴリごとに発生件数の合計を算出し、降順に並べ替えて、集計結果を円グラフとして表示します。

フィルター & 集計して可視化
| where Category in (columnList)
| where isnotnull(val)
| summarize Count = sum(val) by Category
| order by Count desc 
| render piechart 

カイゼンのポイント

  • 動的スキーマ抽出 によって再利用性が高く、データソースの構造変更にも強いクエリになりました
  • データの整形・集計・可視化までワンストップで処理でき、運用効率が向上します
カイゼンしたクエリ
// データソース
let source = _GetWatchlist('SecurityStatistics');
// 除外列の指定
let excluded = dynamic([
    '_DTItemId', 'LastUpdatedTimeUTC', 'SearchKey', 'Site ID', 'Site Name', 'WAF Total'
]);
// 集計対象列 (攻撃カテゴリ) の名前の抽出
let columnList = toscalar(
    source
    | getschema
    | where ColumnName !in (todynamic(excluded))
    | summarize make_list(ColumnName)
);
// データソースのキー・値ペア展開と数値化
source
// 各行のすべての列をキー・値のペア形式に変換
| extend all = pack_all()
// ペア配列を行ごとに展開して攻撃カテゴリごとのレコードに変換し、JSON として解析
| mv-expand pair = all
| extend kvp = parse_json(pair)
// キー名 (攻撃カテゴリ) と値 (発生件数) を個別に抽出
| extend Category = tostring(bag_keys(kvp)[0])
| extend val = todouble(kvp[Category])
// 集計対象列 (攻撃カテゴリ) で null でない数値データを抽出
| where Category in (columnList)
| where isnotnull(val)
// 攻撃カテゴリごとに発生件数の合計を算出
| summarize Count = sum(val) by Category
// 発生件数の合計の降順にソート
| order by Count desc 
// 最終結果を円グラフで表示
| render piechart 

まとめ

本投稿では、KQL を用いたセキュリティ統計データの可視化において、静的クエリの限界と、それを克服するための動的クエリの書き方について紹介しました。

  • 静的クエリは明示的でわかりやすい反面、スキーマ変更に弱く、保守性に課題がある
  • 動的クエリでは getschemapack_all() を活用することで、未知の列にも柔軟に対応でき、運用効率が大幅に向上
  • 特に、攻撃カテゴリの追加・変更が頻繁に発生する環境では、今回のアプローチが非常に有効

KQL の柔軟性を活かすことで、よりスマートなセキュリティ運用が可能になります。ぜひ、皆さんの環境でも応用してみてください。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?