概要
Azure SDK for Python を利用して、とある LogAnalyticsワークスペース に保存されている Activity-Log を、クエリ処理してログを抽出する Python プログラムです。
実行環境
macOS Monterey 12.1
python 3.8.3
前提条件
Activity-Log が LogAnalyticsワークスペースに宛先設定れており、そのワークスペースIDを取得できていること
実行プログラム
GetWorkspaceLogsQuery.py
import time
import argparse
import pandas as pd
from azure.core.exceptions import HttpResponseError
from azure.identity import AzureCliCredential, DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from datetime import datetime
# LogAnalyticsワークスペースIDの定義
LOG_WORKSPACE_ID = 'xxxxxxxx-0033-4040-9090-zzzzzzzzzzzz'
# LogAnalyticsワークスペースからLogsQueryを操作するオブジェクトの取得
def GetLogsQueryClient():
logs_client = LogsQueryClient(
credential=AzureCliCredential()
# credential=DefaultAzureCredential()
)
return logs_client
# LogAnalyticsワークスペースから指定するリソースグループのActivityログを取得
def GetWorkspaceLogsQuery(rg_name, start_date, end_date):
print("\n\n ##### From : {} To : {} #####".format(start_date, end_date))
# クエリ文の定義(特定のリソースグループを指定)
# query_str = "AzureActivity | where ResourceGroup == '{}' | take 1".format(str.upper(rg_name))
query_str = "AzureActivity | where ResourceGroup == '{}'".format(str.upper(rg_name))
print(query_str)
# LogsQueryを操作するオブジェクトの取得
logs_client = GetLogsQueryClient()
try:
response = logs_client.query_workspace(
workspace_id=LOG_WORKSPACE_ID,
query=query_str,
timespan=(start_date, end_date)
)
if response.status == LogsQueryStatus.PARTIAL:
error = response.partial_error
data = response.partial_data
print(error.message)
elif response.status == LogsQueryStatus.SUCCESS:
data = response.tables
for table in data:
df = pd.DataFrame(data=table.rows, columns=table.columns)
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.max_colwidth', -1):
print(df)
except HttpResponseError as err:
print("Something Fatal Happened")
print (err)
logs_client.close()
return df.ResourceGroup.count()
# パラメータの文字列を日付型に変換
def valid_date(s):
try:
return datetime.strptime(s, "%Y-%m-%d")
except ValueError:
msg = "Not a valid date: '{0}'.".format(s)
raise argparse.ArgumentTypeError(msg)
# メイン
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='LogAnalyticsワークスペースから指定するリソースグループのActivityログを取得する')
parser.add_argument('-g', '--rg', type=str, help='リソースグループ名', required=True)
parser.add_argument('-s', '--startdate', type=valid_date, help='From日付 yyyy-mm-dd', required=True)
parser.add_argument('-e', '--enddate', type=valid_date, help='To日付 yyyy-mm-dd', required=True)
args = parser.parse_args()
start = time.time()
cnt = GetWorkspaceLogsQuery(args.rg, args.startdate, args.enddate)
generate_time = time.time() - start
print("\n Activityログ数 : " + str(cnt))
print("\n 取得時間:{0}".format(generate_time) + " [sec] \n")
プログラムのHELP表示
## HELPの表示
$ python GetWorkspaceLogsQuery.py -h
usage: GetWorkspaceLogsQuery.py [-h] -g RG -s STARTDATE -e ENDDATE
LogAnalyticsワークスペースから指定するリソースグループのActivityログを取得する
optional arguments:
-h, --help show this help message and exit
-g RG, --rg RG リソースグループ名
-s STARTDATE, --startdate STARTDATE
From日付 yyyy-mm-dd
-e ENDDATE, --enddate ENDDATE
To日付 yyyy-mm-dd
プログラムの実行
## 実行状況
$ python GetWorkspaceLogsQuery.py -g rg_ituru_bricks01 -s 2021-11-11 -e 2022-01-11
##### From : 2021-11-11 00:00:00 To : 2022-01-11 00:00:00 #####
AzureActivity | where ResourceGroup == 'RG_ITURU_BRICKS01'
:
省略
:
Activityログ数 : 59
取得時間:1.656360149383545 [sec]
まとめ
上記の方法で Activityログ を取得し、あとは、煮るなり焼くなり、、、してみたいです、、、
参考記事
以下の記事を参考にさせていただきました。感謝申し上げます。
Azure Monitor Query client library for Python
AzureActivity