はじめに
- とあるシステムからのログを、Linux Syslogサーバで受け取っていたが、それをAzure LogAnalyticsに転送するまでの手順を共有したい
- 従来は、Log Analytics レガシーエージェントがあったが、2024/8にサポート終了になるということで、Azure Monitor Agent (AMA) での転送が必要になる
とても複雑な手順になるので、ところどころ辛口コメントを入れさせていただきたい- とてつもなく苦労したため、この記事が誰かの役にたつことを願い。
前提となるシステム構成
取り込みたいカスタムログの概要
- CSV形式でAppサーバーからログが吐き出され、Syslogプロトコル経由でSyslogサーバーに転送され、Syslogサーバー内の、/var/log/remoteフォルダ内に蓄積がなされている。
- 以下の形式。Syslogフォーマットで、MSGフィールドはCSV形式となっている
root@AzMonitorAgent01:/var/log/remote# tail test02.log -n 1
Mar 13 15:49:36 AppServer "KDDI Taro","2024-03-13 15:49:10","Allowed","outlook.office365.com:443","CONNECT","200","hoge","fuga"
おおまかな流れ
- LogAnalyticsで仮のカスタムテーブルを定義する
- DCE(Data Collection Endpoint)を定義する
- DCEとSyslogサーバを紐づける
- Azure Firewallで適切にPolicyを入れる
- DCR(Data Collection Rule)を定義する
- いったんログを取り込んでみる
- データを分解するKQL(Kusto Query Language)を作る
- KQLの出力と整合のある本番カスタムテーブルを定義する
- DCRを修正する
- 完成
これだけ見ても複雑さが想像できると思う。
1. LogAnalyticsでカスタムテーブルを定義する
- Azure Portalに入り、以下のPSスクリプトを実行する。貼り付けて実行でOK
- テーブル名は、hogehoge_CLとしてあるので、適宜修正すること。
- AzureサブスクリプションID、リソースグループ名、LogAnalyticsワークスペース名も適宜修正すること。
$tableParams = @'
{
"properties": {
"schema": {
"name": "hogehoge_CL",
"columns": [
{
"name": "TimeGenerated",
"type": "DateTime"
},
{
"name": "RawData",
"type": "String"
}
]
}
}
}
'@
Invoke-AzRestMethod -Path "/subscriptions/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx/resourcegroups/<リソースグループ名>/providers/microsoft.operationalinsights/workspaces/<LogAnalyticsワークスペース名>/tables/hogehoge_CL?api-version=2021-12-01-preview" -Method PUT -payload $tableParams
2. DCEを定義する
3. DCEとSyslogサーバーを紐づける
4. AzureFirewallにLogAnalytics向けPolicyを入れる
- Syslogサーバーは、LogAnalyticsと通信できる必要があるため、Firewallなど設置している場合は、適宜Allowのポリシーを入れてやる必要がある。
- 今回は、LogAnalyticsをJapanEastリージョンに設置したため、JapanEast向けを設定する
- 以下の宛先、TCP:443宛ての通信をAllowにする。
global.handler.control.monitor.azure.com
japaneast.handler.control.monitor.azure.com ※VMの設置されているリージョン。
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx.ods.opinsights.azure.com ※宛先のLogAnalytics ワークスペースID
hoge-dce-xxxx.japaneast-1.ingest.monitor.azure.com ※設定したDCE
5. DCRを定義する
5-1. Resourceを指定する。Syslogサーバーを指定して、DCEをEnableする
5-2. Collect and Deliverを指定する
- データソースとして、Syslogサーバー内のログが保存されるフォルダ・ファイル名規則を入力する
- テーブル名は最初に定義したテーブル名。
- Destinationは取り込み先のLogAnalyticsワークスペースを選択するだけなので適宜。
途中確認:AMAがSyslogサーバーにインストールされたか確認する
- ここまで正常に終えると、SyslogサーバーにAMAがインストールされ、Heartbeat通信がLogAnalyticsに転送される
- その確認は、LogAnalyticsのAgentの項目から確認できる。
- 5項まで設定完了して20分経過しても、Heartbeatが流れてこない場合は、どこかで間違っているため、見直すこと。
6. いったんログを取り込む
- DCRで指定したフォルダにログを突っ込むと、LogAnalyticsに取り込みがなされる
- DCRのサーバーへの反映は20分ほどかかるので待つこと。
7. データを分解するKQLを作る
- カスタムテーブルのテーブル定義は、1項の通り、RawDataしか定義していないため、取り込んだログは、そっくりRawDataフィールドに格納される
- 以下の内容が、そっくりRawDataに入ることになる。
Mar 13 15:49:36 AppServer "KDDI Taro","2024-03-13 15:49:10","Allowed","outlook.office365.com:443","CONNECT","200","hoge","fuga"
- このRawDataから、CSV部分を切り出し、CSVをParseして、カスタムログの各フィールドに分解してやりたい。
- 以下の例となる。
- 2行目で、Syslogのヘッダ部分を削除している
- 3行目・4行目で、DCR内のKQLでは、なぜかparse_CSV関数が使えないため、仕方なくtrimとsplitを組み合わせて、CSVをParseしている。
- 6行目は、JSTでログが吐き出されているが、DCRで取り込まれる際、時刻がUTCと認識されるため、JSTをUTCに変換するため、マイナス9時間している
hogehoge_CL
| extend TrimData = trim_start(@"^((?:.*? ){4}.*?)", RawData)
| extend TrimData2 = trim('"',TrimData)
| extend CSVFields_CF = split(TrimData2, '","')
| extend User = tostring(CSVFields_CF[0])
| extend EventTime = datetime_add('hour', -9, todatetime(CSVFields_CF[1]))
| extend Action = tostring(CSVFields_CF[2])
| extend Host = tostring(CSVFields_CF[3])
| extend Method = tostring(CSVFields_CF[4])
| extend RespCode = tostring(CSVFields_CF[5])
| extend FileType = tostring(CSVFields_CF[6])
| extend HogeType = tostring(CSVFields_CF[7])
| project TimeGenerated, EventTime, User, Action, Host, Method, RespCode, FileType, HogeType
KQL作りのTips
8. KQLの出力結果に合わせて本番カスタムテーブルを定義する
- 7項で以下のフィールド定義としている。
project TimeGenerated, EventTime, User, Action, Host, Method, RespCode, FileType, HogeType
9. DCRを修正する
完成
- 反映までに20分ほどかかる
- ログがしかるべきテーブルカラムに分解されてLogAnalyticsに取り込まれることを確認しよう
- 仮のテーブルは適宜削除しよう