ASIM (Advanced Security Information Model) とは?
Microsoft Sentinel では、ASIM (Advanced Security Information Model) (読み方は「エーシム」) というログデータの正規化モデルが提供されています。ASIM のパーサーで正規化されたログテーブルを利用することで、多くのデータソースのログを統一のスキーマで扱い、同じクエリで検索をかけることができます。
ASIM のプロセスについては、公式ドキュメントの下記図にてまとめられています。様々なログソースから入ってきたデータを、正規化されたスキーマにのっとったテーブルの形式にパースし、共通のクエリやダッシュボードを使って扱えるようにしていきます。
本記事について
ASIM は Sentinel を活用するにあたりとても便利な仕組みですが、公式ドキュメント以外であまり日本語の情報が出ていません。また、公式ドキュメントは詳細な情報が載っているものの、初めて ASIM に触れる方には少し難しめに書かれています。
ですので、本記事では、最初に ASIM に触れる方向けに、最低限知っておくべき点や躓きやすい点をメモしておこうと思います。
ASIM で使われるテーブルとパーサー (関数)
ASIM で扱われているスキーマ
公式ドキュメントにあるように下記11種類のスキーマが現在定義されています。
- 監査イベント
- 認証イベント
- DHCP アクティビティ
- DNS アクティビティ
- ファイル アクティビティ
- ネットワーク セッション
- プロセス イベント
- レジストリ イベント
- ユーザー管理
- Web セッション
それぞれのスキーマにおいて、利用されるフィールドの名前や入力値の型などが定義されています。たとえば、ネットワークセッションのスキーマは下記ドキュメントのとおりです。
ドキュメントを見てみると、全スキーマ共通のフィールドである、EventResultDetails や EventSeverity, ネットワークセッションスキーマ固有のフィールドである IP アドレスや FQDN などに関する複数のフィールドなどが明確に定義されているのがわかります。
そして、ネットワークに関わるログ、たとえばシスコ社やパロアルトネットワークス社のアプライアンスのログや、Azure Firewall/NSG のログ、Microsoft Defender for Endpoint や Azure Monitor VM Insights のログなどが、同一のスキーマで扱えるようになることで、同じクエリで一元的に検索をかけられるようになります。
ASIM のデフォルトのパーサー (関数)
Sentinel のワークスペースを作成すると、すでにデフォルトの ASIM のパーサーが関数として入っています。ログ検索の画面で、関数タブを開き、Microsoft Sentinel の欄へ進むと、下記スクリーンショットのように関数が入っていることがわかります。
なお、既定のテーブルの種類には大きく分けて下記の2種類があります。(頭に_がついているのが既定のテーブルであることを意味しています。)
-
_Im_<種類名>
- フィルターをかけるためのパラメータ (starttime, endtime, eventype など) を入力できるパーサー。OOB (Out Of Box) のコンテンツハブで提供されているクエリやダッシュボードは基本的にはこちらを利用。
-
_Asim_<種類名>
- フィルターをかけるためのパラメータがないパーサー。ログ検索画面で利用することが想定。
たとえば、NSG のログが入っている Sentinel Workspace で下記の検索をかけると、NSG ログが ASIM のテーブルに変換された形で出てきます。
_ASim_NetworkSession | take 10
パーサーの内部の処理の簡単な説明
少しだけ、パーサーの関数が実際にどういったことをしているかを見てみます。まず、_Im_NetworkSession() の関数のコードを読み込んでみます。
union isfuzzy=true
_Im_NetworkSessionBuiltIn(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack),
Im_NetworkSessionSolutions(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack),
Im_NetworkSessionCustom(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack)
すると、いくつかのテーブルを union 句で合体させていることがわかります。そのひとつである、_Im_NetworkSessionBuiltIn() テーブルを見ると、また union 句で合成していました。
let DisabledParsers=materialize(_GetWatchlist('ASimDisabledParsers')
| where SearchKey in ('Any', 'Exclude_Im_NetworkSession')
| extend SourceSpecificParser=column_ifexists('SourceSpecificParser','')
| where isnotempty(SourceSpecificParser)
| summarize list = make_set(SourceSpecificParser));
let builtInDisabled = 0 != array_length(set_intersect(toscalar(DisabledParsers),dynamic(['Exclude_Im_NetworkSessionBuiltIn', 'Exclude_Im_NetworkSession', 'Any'])));
union isfuzzy=true
_Im_NetworkSession_AppGateSDPV02(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, disabled= (builtInDisabled or('Exclude_Im_NetworkSession_AppGateSDP' in (DisabledParsers)))),
_Im_NetworkSession_AWSVPCV03(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, disabled= (builtInDisabled or('Exclude_Im_NetworkSession_AWSVPC' in (DisabledParsers)))),
(以下略)
この関数では、具体的なソースからパースされたテーブルを合成しています。そのひとつである _Im_NetworkSession_AppGateSDPV02() は、文字通り、Appgate SDP からのログを ASIM で正規化したテーブルです。
let parser = (disabled:bool=false)
{
let DirectionLookup = datatable (direction:string, NetworkDirection:string)
[
'up', 'Inbound',
'down', 'Outbound'
];
let ActionLookup = datatable (DvcOriginalAction:string, DvcAction:string, EventSeverity:string, EventResult:string)
[
'allow', 'Allow', 'Informational', 'Success',
'drop', 'Drop', 'Low', 'Failure',
'reject', 'Deny', 'Low', 'Failure',
'block', 'Deny', 'Low', 'Failure',
'block_report', 'Deny', 'Low', 'Failure',
'allow_report', 'Allow', 'Informational', 'Success'
];
let tcpupd_success = Syslog
| where
ProcessName in ("cz-sessiond", "cz-vpnd")
(以下略)
ASIM のパーサーのカスタマイズ
既定のものではなく、カスタマイズされた ASIM のパーサーをご自身のワークスペースにデプロイすることもできます。パーサーの開発については本記事では詳細を割愛しますが、公式ドキュメントにて詳述されています。
また、GitHub のレポジトリにて、カスタムのパーサーをデプロイするためのテンプレートが載っています。こちらをベースにカスタマイズしていくのも良いかもしれません。
テンプレートをそのままデプロイすると、関数のワークスペース関数に、パーサーが展開されます。
Deploy to Azure をクリック
展開先を見るとわかる通り、Asim_<種類名> と vim\_<種類名> の関数があります。既定との違いとして、頭文字に \_ が付いていないです。それぞれ、Asim\_<種類名> はパラメータなし、vim\_<種類名>はパラメータありになっています。
ASIM ログテーブルの活用 - 分析ルールや Workbooks と OOTB コンテンツ
ASIM で正規化したテーブルに対して、標準化されたクエリを使うことができます。特に Sentinel のコンテンツハブでは、OOTB (Out Of The Box) のコンテンツとして、検知ルールや Workbooks が提供されています。
例えば、NetworkSession を使ったものとして、Network Session Essentials というマイクロソフト社から提供されたコンテンツがあります。これをワークスペースに追加することで、分析ルールのテンプレートや Workbooks のテンプレートなどにコンテンツが追加され、ユーザーがすぐに使い始めることができます。
たとえば、「Anomaly found in Network Session Traffic (ASIM Network Session schema)」という分析ルールをクリックすると、ASIM ベースのクエリで検知を行うテンプレートがあり、それをすぐに有効化できます。
let min_t = ago(14d);
let max_t = now();
let dt = 1d;
let fieldForDvcAction = "DvcAction";
let fieldForNetworkDirection = "NetworkDirection";
let fieldForNetworkProtocol = "NetworkProtocol";
let AnomalyThreshold = 2.5;
let eps = materialize (_Im_NetworkSession | project TimeGenerated | where TimeGenerated > ago(5m) | count | extend Count = Count/300);
let maxSummarizedTime = toscalar (
union isfuzzy=true
(
NetworkCustomAnalytics_protocol_CL
| where EventTime_t > min_t
| summarize max_TimeGenerated=max(EventTime_t)
| extend max_TimeGenerated = datetime_add('minute',10,max_TimeGenerated)
),
(
print(min_t)
| project max_TimeGenerated = print_0
)
| summarize maxTimeGenerated = max(max_TimeGenerated)
);
let nosummary = materialize(
union isfuzzy=true
(
NetworkCustomAnalytics_protocol_CL
| where EventTime_t > ago(1d)
| project v = int(2)
),
(
print int(1)
| project v = print_0
)
| summarize maxv = max(v)
| extend nosum = (maxv > 1)
);
let allData = union isfuzzy=true
(
(datatable(exists:int, nosum:bool)[1,false] | where toscalar(eps) > 1000 | join (nosummary) on nosum) | join (
_Im_NetworkSession(starttime=todatetime(ago(2d)), endtime=now())
| where TimeGenerated > maxSummarizedTime
| summarize Count=count() by NetworkProtocol, DstPortNumber, DstAppName, NetworkDirection, DvcAction, bin(TimeGenerated,10m)
| extend EventTime = TimeGenerated, Count = toint(Count), DstPortNumber = toint(DstPortNumber), exists=int(1)
) on exists
| project-away exists, maxv, nosum*
),
(
(datatable(exists:int, nosum:bool)[1,false] | where toscalar(eps) between (501 .. 1000) | join (nosummary) on nosum) | join (
_Im_NetworkSession(starttime=todatetime(ago(3d)), endtime=now())
| where TimeGenerated > maxSummarizedTime
| summarize Count=count() by NetworkProtocol, DstPortNumber, DstAppName, NetworkDirection, DvcAction, bin(TimeGenerated,10m)
| extend EventTime = TimeGenerated, Count = toint(Count), DstPortNumber = toint(DstPortNumber), exists=int(1)
) on exists
| project-away exists, maxv, nosum*
),
(
(datatable(exists:int, nosum:bool)[1,false] | where toscalar(eps) <= 500 | join (nosummary) on nosum) | join (
_Im_NetworkSession(starttime=todatetime(ago(4d)), endtime=now())
| where TimeGenerated > maxSummarizedTime
| summarize Count=count() by NetworkProtocol, DstPortNumber, DstAppName, NetworkDirection, DvcAction, bin(TimeGenerated,10m)
| extend EventTime = TimeGenerated, Count = toint(Count), DstPortNumber = toint(DstPortNumber), exists=int(1)
) on exists
| project-away exists, maxv, nosum*
),
(
NetworkCustomAnalytics_protocol_CL
| where EventTime_t > min_t
| project-rename NetworkProtocol=NetworkProtocol_s, DstPortNumber=DstPortNumber_d, DstAppName=DstAppName_s, NetworkDirection=NetworkDirection_s, DvcAction=DvcAction_s, Count=count__d, EventTime=EventTime_t
| extend Count = toint(Count),DstPortNumber = toint(DstPortNumber)
)
;
let findVolumneBasedAnomaly = allData
| make-series total=sum(Count) on EventTime from min_t to max_t step dt
| extend (anomalies, score, baseline) = series_decompose_anomalies(total, AnomalyThreshold, -1, 'linefit')
| mv-expand anomalies, score, baseline, EventTime, total
| extend anomalies = toint(anomalies), score = toint(score), baseline = toint(baseline), EventTime = todatetime(EventTime), total = tolong(total)
| where EventTime >= ago(1d)
| where score >= 2*AnomalyThreshold
;
let findAnomalies = (field:string){
allData
| where isnotempty(column_ifexists(field,""))
| make-series total=sum(Count) on EventTime from min_t to max_t step dt by column_ifexists(field,"")
| extend (anomalies, score, baseline) = series_decompose_anomalies(total, AnomalyThreshold, -1, 'linefit')
| mv-expand anomalies, score, baseline, EventTime, total
| extend anomalies = toint(anomalies), score = toint(score), baseline = toint(baseline), EventTime = todatetime(EventTime), total = tolong(total)
| where EventTime >= ago(1d)
| where score >= 2*AnomalyThreshold
};
union findAnomalies(fieldForDvcAction), findAnomalies(fieldForNetworkDirection), findAnomalies(fieldForNetworkProtocol), findVolumneBasedAnomaly
| extend anomalyFieldType = case (isnotempty(column_ifexists(fieldForDvcAction,"")), "DvcAction",
isnotempty(column_ifexists(fieldForNetworkDirection,"")), "NetworkDirection",
isnotempty(column_ifexists(fieldForNetworkProtocol,"")), "NetworkProtocol",
"TotalVolume"
),
anomalyFieldValue = case (isnotempty(column_ifexists(fieldForDvcAction,"")), column_ifexists(fieldForDvcAction,""),
isnotempty(column_ifexists(fieldForNetworkDirection,"")), column_ifexists(fieldForNetworkDirection,""),
isnotempty(column_ifexists(fieldForNetworkProtocol,"")), column_ifexists(fieldForNetworkProtocol,""),
"Overall"
)
ログソースを後から追加したとしても、ASIM 対応していれば、このクエリで脅威検知をすぐに実行することができます。そういった意味でも多種なソースが混在するワークスペースでは、ASIM をうまく活用していくメリットが出てくるかと思います。
最後に
本記事ではメモレベルではありますが、Microsoft Sentinel の ASIM の仕組みや使い方をまとめてみました。Sentinel ユーザーの方でこれから ASIM を使いこなしていきたいと思われた際の、最初の一歩を踏み出す一助になれば幸いです。
*本稿は、個人の見解に基づいた内容であり、所属する会社の公式見解ではありません。また、いかなる保証を与えるものでもありません。正式な情報は、各製品の販売元にご確認ください。