はじめに
前回、Microsoft365のログをS3に保存するLambdaを記事にしました。
今回は、それらファイル群をAthenaで見てみました。
ファイルの形式
今回用いるファイルの形式は以下のページに倣って、
- JSON文字列
- 1データ1行
- 改行がデータの区切り
としています。
当初は前回のログ抽出部分でExport-Csv
を使ってCSV出力していました。
ところがauditdata
カラムがJSON文字列であり、その中で使われている特殊文字のエスケープがAthenaと相性が悪く、検索できないテーブルになってしまったので、ConvertTo-Json
を使ったJSON文字列としています。
ファイルの場所はすべてs3://<<バケット名>>/sampleathena/nopart/
直下に保存しています。
CREATE文
CREATE EXTERNAL TABLE IF NOT EXISTS default.ms365auditlog_nopart(
pscomputername string ,
runspaceid string ,
psshowcomputername boolean ,
recordtype string ,
creationdate timestamp ,
userids string ,
operations string ,
auditdata string ,
resultindex int ,
resultcount int ,
identity string ,
isvalid boolean ,
objectstate string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<<バケット名>>/sampleathena/nopart/'
auditdata
がJSON文字列になっており、struct型で入れ子状態を定義しようとしたのですが、データごとに定義が異なると受け付けてくれないようです。
そのためここでは文字列として扱い、検索時にパースして使うこととしました。
SELECT文
auditdata
から特定の要素を取りたい場合はjson_extract_scalar(json_parse(auditdata),'$.<<特定の要素>>')
と書くようです。
例として、監査ログから「特定のIP以外からのログインについて、ユーザーと日時を抽出する」というSQLは以下になります。
-- 特定のIP以外からのログインを取得
WITH TEMP AS (
SELECT
userids
,json_extract_scalar(json_parse(auditdata),'$.Operation') AS Operation
,json_extract_scalar(json_parse(auditdata),'$.CreationTime') AS CreationTime
,json_extract_scalar(json_parse(auditdata),'$.ClientIP') AS ClientIP
,"auditdata"
FROM "default"."ms365auditlog_nopart"
)
SELECT * FROM TEMP
WHERE
Operation = 'UserLoggedIn'
AND ClientIP NOT IN ('11.11.11.11')
;
他にもいろいろな読ませ方があるようです。
おわりに
前回作ったLambdaで収集した監査ログをはAthenaで見る方法を紹介しました。
本運用の際には、色々なサービスと繋げてアラームとかも作れたら、と思っています。