はじめに
先日以下の記事で紹介したアクセスログ解析スクリプトですが、設定が少し面倒で、解析対象のログファイルが多かったり、行数が多いと解析に時間がかかるため、少しリファクタリングしてみました。
解析結果自体は、上記記事のスクリプトと同じですので、修正理由と修正後のスクリプトを記載しておきたいと思います。
解析対象ディレクトリと解析結果出力先の指定方法の変更と修正方針
修正前は、各スクリプト毎に、ログ解析ディレクトリと解析結果出力ファイルパスをCSVファイルで指定していました。
ですが、ログ解析ディレクトリは全てのスクリプトで同じ場所を参照しますし、ログファイルのログフォーマット、ログファイルのファイル形式も同じであるため、Configファイルに設定をまとめ、全てのスクリプトで同じ設定を参照する事にしました。
スクリプト毎に異なるのは、解析結果出力先であるため、解析結果出力先ファイルパスは、スクリプト毎にセクションを切り指定するようにしました。
また、Configファイルを1つにした事で、Configファイルから共通設定を読み込む部分と解析結果をファイルに出力する部分の処理が共通化できるようになったため、共通処理を記述したUtilスクリプトを作成して共通化しました。
スクリプトの処理速度については、ログファイル毎の解析をマルチスレッドで処理するように修正しました。
Poolを利用し、マルチプロセスで処理する方法もあるようですが、ログファイル数、行数が多くなるとメモリ使用量が大きくなりそうなので、メモリを効率よく利用できるマルチスレッドで実装する事にしました。
修正後のConfigとスクリプト
Config
[AccessLog]セクションに、アクセスログ解析スクリプト共通設定を記載します。
それ以外のセクションでは、スクリプト毎に解析結果出力先ファイルパスを指定するようにしました。
#config.ini
[AccessLog]
Directories = C:/performance/XXXXXXXXXXX/SystemA/,
C:/performance/XXXXXXXXXXX/SystemB/,
C:/performance/XXXXXXXXXXX/SystemC/
FileName = access_*.log
LogFormat = host_ip_address,
remote_hostname,
date,
time,
method_uri,
http_status,
transfer(bytes),
res_time,
JSESSIONID
[AvgTransferPerRequest]
OutputFiles = C:/performance/XXXXXXXXXXX/avg_transfer_per_request_SystemA.csv,
C:/performance/XXXXXXXXXXX/avg_transfer_per_request_SystemB.csv,
C:/performance/XXXXXXXXXXX/avg_transfer_per_request_SystemC.csv
[HttpStatusCheck]
OutputFiles = C:/performance/XXXXXXXXXXX/http_status_SystemA.csv,
C:/performance/XXXXXXXXXXX/http_status_SystemB.csv,
C:/performance/XXXXXXXXXXX/http_status_SystemC.csv
[MaxRequestsPerSecondPerDay]
OutputFiles = C:/performance/XXXXXXXXXXX/max_requests_per_sec_SystemA.csv,
C:/performance/XXXXXXXXXXX/max_requests_per_sec_SystemB.csv,
C:/performance/XXXXXXXXXXX/max_requests_per_sec_SystemC.csv
[OutputHttpStatusNotEq200]
OutputFiles = C:/performance/XXXXXXXXXXX/http_status_not_eq200_requests_SystemA.csv,
C:/performance/XXXXXXXXXXX/http_status_not_eq200_requests_SystemB.csv,
C:/performance/XXXXXXXXXXX/http_status_not_eq200_requests_SystemC.csv
[OutputSlowReqOver3sec]
OutputFiles = C:/performance/XXXXXXXXXXX/slow_req_over_3sec_SystemA.csv,
C:/performance/XXXXXXXXXXX/slow_req_over_3sec_SystemB.csv,
C:/performance/XXXXXXXXXXX/slow_req_over_3sec_SystemC.csv
[PerformanceCheck]
OutputFiles = C:/performance/XXXXXXXXXXX/performance_check_SystemA.csv,
C:/performance/XXXXXXXXXXX/performance_check_SystemB.csv,
C:/performance/XXXXXXXXXXX/performance_check_SystemC.csv
■access_log_analyze_util.py
以下6つのスクリプトの共通処理を記述したスクリプトです。6つのスクリプトと同じディレクトリに存在する必要があります。
Configファイルの読み込みと、解析結果の出力を行っています。
import configparser
class AccessLogAnalyzeUtil:
def __init__(self):
self.config = configparser.ConfigParser(interpolation=None)
self.config.read("C:/performance/scripts/config/config.ini")
def get_config(self):
return self.config
def get_directories(self):
directories = self.config["AccessLog"]["Directories"]
return [directory.strip() for directory in directories.split(",")]
def get_file_name(self):
return self.config["AccessLog"]["FileName"]
def get_log_format(self):
log_format = self.config["AccessLog"]["LogFormat"]
return [format.strip() for format in log_format.split(",")]
def get_output_files(self, section):
output_files = self.config[section]["OutputFiles"]
return [file.strip() for file in output_files.split(",")]
def config_check(self, directories, output_files):
if len(directories) != len(output_files):
raise ValueError("Number of log directories and output files doesn't match")
return
def output_analyze_results(self, result_df):
output_files = result_df["Output File"].unique()
for output_file in output_files:
filtered_df = result_df[result_df["Output File"] == output_file]
filtered_df.to_csv(output_file, index=False)
print(f'解析結果は {output_file} に保存されました。')
■avg_transfer_per_request.py
アクセスログを解析し、日毎の1リクエストあたりの平均転送量をByte、KByte単位でCSVファイルに出力します。
import os
import glob
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("AvgTransferPerRequest")
# config設定チェック
autil.config_check(directories, output_files)
def process_log_file(log_file, log_directory, output_file):
# ログファイル名から日付を抽出
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, usecols=["transfer(bytes)"], header=None)
# "-"の値をNaNに変換
df["transfer(bytes)"] = pd.to_numeric(df["transfer(bytes)"], errors="coerce")
# 1リクエストあたりの平均転送量を算出
avg_transfer_per_request = df["transfer(bytes)"].mean()
# NaNの場合は平均算出対象外とする
if pd.notnull(avg_transfer_per_request):
avg_transfer_per_request_kb = avg_transfer_per_request / 1024 # 単位をKBに変換
# 解析結果を返す
return [log_directory, output_file, date_str, avg_transfer_per_request, avg_transfer_per_request_kb]
# マルチスレッドでログファイルの解析を実行
data = []
with ThreadPoolExecutor() as executor:
futures = []
for log_directory, output_file in zip(directories, output_files):
print(f'ディレクトリ {log_directory} を確認中。')
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
for log_file in log_files:
future = executor.submit(process_log_file, log_file, log_directory, output_file)
futures.append(future)
for future in futures:
result = future.result()
if result:
data.append(result)
# 解析結果をDataFrameに変換
result_df = pd.DataFrame(data, columns=["Log Directory", "Output File", "Date", "Avg Transfer per Request (Bytes)", "Avg Transfer per Request (KB)"])
# 日付を基準に昇順ソート
result_df = result_df.sort_values("Date", ascending=True)
# 解析結果を各出力ファイルに出力
autil.output_analyze_results(result_df)
■max_requests_per_second_per_day.py
アクセスログを解析し、日毎に以下の情報をCSVファイルに出力します。
- 秒間最大同時リクエスト数
- 秒間最大同時リクエストがあった日時
- 10分間での最大リクエスト数
- 10分間での最大リクエストがあった時間帯
import concurrent
import os
import glob
import pandas as pd
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("MaxRequestsPerSecondPerDay")
# config設定チェック
autil.config_check(directories, output_files)
def analyze_log_files(log_directory, output_file):
print(f'ディレクトリ {log_directory} を確認中。')
# ディレクトリ内のアクセスログファイルを取得
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
# ログファイルごとに解析を実行
results = []
for log_file in log_files:
# ログファイル名から日付を抽出
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, header=None)
# GETリクエストのみをフィルタリング
# df = df[df["method_uri"].str.startswith("GET")]
# 日時列を結合してDateTime型に変換
df["datetime"] = pd.to_datetime(df["date"] + " " + df["time"])
# 毎秒のリクエスト数を集計
requests_per_sec = df.groupby(pd.Grouper(key="datetime", freq="s")).size()
# 1秒あたりの最大同時リクエスト数を算出
max_requests_per_sec = requests_per_sec.max()
if max_requests_per_sec >= 1:
max_request_time = requests_per_sec.idxmax()
else:
max_requests_per_sec = 0
max_request_time = "nan"
# 10分毎のリクエスト数を集計
requests_per_10min = df.groupby(pd.Grouper(key="datetime", freq="10Min")).size()
# 10分間で最も多かったリクエスト数を取得
max_requests_per_10min = requests_per_10min.max()
if max_requests_per_10min >= 1:
# 最も多かったリクエスト数の時間帯を取得
max_count_time = requests_per_10min.idxmax().strftime("%Y/%m/%d %H:%M:%S")
else:
max_requests_per_10min = 0
max_count_time = "nan"
# 解析結果をリストに追加
results.append([log_directory, output_file, date_str, max_requests_per_sec, max_request_time, max_requests_per_10min, max_count_time])
return results
# マルチスレッドで解析を実行
results = []
with ThreadPoolExecutor() as executor:
# マルチスレッド処理を行うためのリスト
futures = []
for log_directory, output_file in zip(directories, output_files):
# マルチスレッド処理を行い、結果をリストに追加
analyze_partial = partial(analyze_log_files, output_file=output_file)
futures.append(executor.submit(analyze_partial, log_directory))
# マルチスレッド処理の結果を収集
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.extend(result)
# 解析結果をDataFrameに変換
result_df = pd.DataFrame(results, columns=["Log Directory", "Output File", "Date", "Max Requests per Second", "Max Request Time(sec)", "Max Requests per 10min", "Max Request Time(10min)"])
# 日付を基準に昇順ソート
result_df = result_df.sort_values("Date", ascending=True)
# 解析結果を各出力ファイルに出力
autil.output_analyze_results(result_df)
■performance_check.py
アクセスログを解析し、日毎に以下の情報をCSVファイルに出力します。
- 1日あたりのリクエスト数
- サーバー処理時間が3秒以上のリクエスト数
- サーバー処理時間が3秒以上のリクエストの平均値
- サーバー処理時間が3秒以上のリクエストの最大値
- サーバー処理時間が3秒以上のリクエストが多かった時間帯(10分間)
- 1日あたりのリクエストの内、サーバー処理時間が3秒以上だったリクエストの割合(%)
import concurrent
import os
import glob
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("PerformanceCheck")
# config設定チェック
autil.config_check(directories, output_files)
def analyze_log_file(log_file, log_directory, output_file):
print(f'ファイル {log_file} を解析中。')
# ログファイル名から日付を抽出
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, header=None)
# "-"をNaNに変換
df["res_time"] = pd.to_numeric(df["res_time"], errors="coerce")
# "res_time"の値が3以上のリクエストのみをフィルタリング
filtered_df = df[df["res_time"] >= 3]
# 1日あたりのリクエスト数
total_requests = len(df)
# "res_time"が3以上のリクエスト数
filtered_requests = len(filtered_df)
# "res_time"が3以上のリクエストの平均値
filtered_mean = filtered_df["res_time"].mean()
# "res_time"が3以上のリクエストの最大値
filtered_max = filtered_df["res_time"].max()
# "res_time"が3以上のリクエストが最も多かった時間帯(10分単位)
if filtered_df.size >= 1:
filtered_df["time"] = pd.to_datetime(filtered_df["time"])
filtered_df["time"] = filtered_df["time"].dt.floor("10min")
time_counts = filtered_df.groupby("time").size()
max_count_time = time_counts.idxmax().strftime("%H:%M:%S")
else:
max_count_time = "-"
# "res_time"が3以上のリクエストの割合(%)を求める
filtered_ratio = (filtered_requests / total_requests) * 100
# 解析結果をリストに追加
return [log_directory, output_file, date_str, total_requests, filtered_requests,
filtered_mean, filtered_max, max_count_time, filtered_ratio]
# マルチスレッドでログファイルの解析を実行
data = []
with ThreadPoolExecutor() as executor:
futures = []
for log_directory, output_file in zip(directories, output_files):
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
for log_file in log_files:
futures.append(executor.submit(analyze_log_file, log_file, log_directory, output_file))
for future in concurrent.futures.as_completed(futures):
result = future.result()
data.append(result)
# 解析結果をDataFrameに変換
result_df = pd.DataFrame(data, columns=["Log Directory", "Output File", "Date", "Total Requests",
"Slow Requests(>=3sec)", "Slow Request Mean", "Slow Request Max",
"Max Count Time of Slow Requests(10 min interval)", "Slow Requests Ratio(%)"])
# 日付を基準に昇順ソート
result_df = result_df.sort_values("Date", ascending=True)
# 解析結果を各出力ファイルに出力
autil.output_analyze_results(result_df)
■http_status_check.py
アクセスログを解析し、日毎、HTTPステータス毎のリクエスト数ををCSVファイルに出力します。
import os
import glob
import pandas as pd
import concurrent.futures
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("HttpStatusCheck")
# config設定チェック
autil.config_check(directories, output_files)
# 各組み合わせごとに解析を実行
def analyze(log_directory, output_file):
print(f'ディレクトリ {log_directory} を確認中。')
# ディレクトリ内のアクセスログファイルを取得
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
# 解析結果を格納するDataFrameを作成
results = []
# 各ログファイルを処理
for log_file in log_files:
# ログファイルのパスから日付を取得
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# print(date_str)
date = pd.to_datetime(date_str).strftime("%Y%m%d")
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, header=None)
# "-"をNaNに変換
df["http_status"] = pd.to_numeric(df["http_status"], errors="coerce")
# 200以外のHTTPステータスのみをフィルタリング
filtered_df = df[df["http_status"] != 200]
# HTTPステータスごとにリクエスト数を集計
status_counts = filtered_df["http_status"].value_counts()
# 解析結果をリストに追加
for status, count in status_counts.items():
results.append([log_directory, output_file, date, status, count])
return results
results = []
# マルチスレッドで解析を実行
with concurrent.futures.ThreadPoolExecutor() as executor:
# マルチスレッド処理を行い、結果をリストに追加
futures = [executor.submit(analyze, log_directory, output_file) for log_directory, output_file in zip(directories, output_files)]
# 各スレッドの結果を収集
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.extend(result)
# 解析結果をDataFrameに変換
result_df = pd.DataFrame(results, columns=["Log Directory", "Output File", "Date", "HTTP STATUS", "COUNT"])
# 日付とHTTPステータスを基準に昇順および降順ソート
result_df = result_df.sort_values(["Date", "HTTP STATUS"], ascending=[True, False])
# 解析結果をCSVファイルに出力
autil.output_analyze_results(result_df)
■output_http_status_not_eq200_requests.py
アクセスログを解析し、HTTPステータスが200以外のリクエストについて、以下の情報を日付の昇順、HTTPステータスの降順でCSVファイルに出力します。
- 日付
- 時間
- HTTPステータス
- データ転送量(byte)
- リクエスト(メソッド、URI)
import os
import glob
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("OutputHttpStatusNotEq200")
# config設定チェック
autil.config_check(directories, output_files)
# 解析結果を一時的に格納するDataFrameを作成
result_df = pd.DataFrame(columns=["Log Directory", "Output File", "Date", "Time", "HTTP STATUS", "Transfer(bytes)", "Method URI"])
# 各組み合わせごとに解析を実行
for i in range(len(directories)):
log_directory = directories[i]
output_file = output_files[i]
print(f'ディレクトリ {log_directory} を確認中。')
# ディレクトリ内のアクセスログファイルを取得
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
# ログファイルの一時的な結果を格納するリスト
file_data = []
# 各ログファイルを並列実行で処理
with ThreadPoolExecutor() as executor:
# ログファイルごとに処理を実行する関数
def process_log_file(log_file):
# ログファイルのパスから日付を取得
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# print(date_str)
date = pd.to_datetime(date_str).strftime("%Y%m%d")
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, header=None)
# "-"をNaNに変換
df["http_status"] = pd.to_numeric(df["http_status"], errors="coerce")
# 200以外のHTTPステータスのみをフィルタリング
filtered_df = df[df["http_status"] != 200]
# 解析結果をリストに追加
for _, row in filtered_df.iterrows():
file_data.append([log_directory, output_file, date, row["time"], row["http_status"], row["transfer(bytes)"], row["method_uri"]])
# 各ログファイルを並列実行
executor.map(process_log_file, log_files)
# ログファイルごとの一時的な結果をDataFrameに追加
file_df = pd.DataFrame(file_data, columns=["Log Directory", "Output File", "Date", "Time", "HTTP STATUS", "Transfer(bytes)", "Method URI"])
result_df = pd.concat([result_df, file_df], ignore_index=True)
# 日付とHTTPステータスを基準に昇順および降順ソート
result_df = result_df.sort_values(["Date", "HTTP STATUS"], ascending=[True, False])
# 解析結果をCSVファイルに出力
autil.output_analyze_results(result_df)
■output_slow_req_over_3sec.py
アクセスログを解析し、サーバー処理時間が3秒以上のリクエストについて、以下の情報をサーバー処理時間の降順でCSVファイルに出力します。
- 日付
- 時間
- HTTPステータス
- サーバー処理時間
- データ転送量(byte)
- リクエスト(メソッド、URI)
import os
import glob
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from accesslog_analyze_util import AccessLogAnalyzeUtil # type: ignore
# アクセスログ解析共通処理を読み込み
autil = AccessLogAnalyzeUtil()
# 設定ファイルの読み込み
# アクセスログが存在するディレクトリ
directories = autil.get_directories()
# アクセスログファイル名
file_name = autil.get_file_name()
# アクセスログフォーマットの列名
format = autil.get_log_format()
# 解析結果の出力先
output_files = autil.get_output_files("OutputSlowReqOver3sec")
# config設定チェック
autil.config_check(directories, output_files)
# 解析結果を一時的に格納するDataFrameを作成
result_df = pd.DataFrame(columns=["Log Directory", "Output File", "Date", "Time", "HTTP STATUS", "Response Time(Sec)", "Transfer(bytes)", "Method URI"])
# 各組み合わせごとに解析を実行
for i in range(len(directories)):
log_directory = directories[i]
output_file = output_files[i]
print(f'ディレクトリ {log_directory} を確認中。')
# ディレクトリ内のアクセスログファイルを取得
log_files = glob.glob(os.path.join(log_directory, "**", file_name), recursive=True)
# ログファイルの一時的な結果を格納するリスト
file_data = []
# 各ログファイルを並列実行で処理
with ThreadPoolExecutor() as executor:
# ログファイルごとに処理を実行する関数
def process_log_file(log_file):
# ログファイルのパスから日付を取得
date_str = os.path.basename(log_file).split(".")[0].split("_")[-1]
# print(date_str)
date = pd.to_datetime(date_str).strftime("%Y%m%d")
# アクセスログデータを読み込み
df = pd.read_csv(log_file, names=format, header=None)
# "-"をNaNに変換
df["res_time"] = pd.to_numeric(df["res_time"], errors="coerce")
# 200以外のHTTPステータスのみをフィルタリング
filtered_df = df[df["res_time"] >= 3]
# 解析結果をリストに追加
for _, row in filtered_df.iterrows():
file_data.append([log_directory, output_file, date, row["time"], row["http_status"], row["res_time"], row["transfer(bytes)"], row["method_uri"]])
# 各ログファイルを並列実行
executor.map(process_log_file, log_files)
# ログファイルごとの一時的な結果をDataFrameに追加
file_df = pd.DataFrame(file_data, columns=["Log Directory", "Output File", "Date", "Time", "HTTP STATUS", "Response Time(Sec)", "Transfer(bytes)", "Method URI"])
result_df = pd.concat([result_df, file_df], ignore_index=True)
# サーバー処理時間を基準に昇順ソート
result_df = result_df.sort_values("Response Time(Sec)", ascending=False)
# 解析結果を各出力ファイルに出力
autil.output_analyze_results(result_df)
おわりに
アクセスログ解析スクリプトをリファクタリングしてみました。
他にも色々と直したい所はありますが、キリがないため一旦ここまでにしておこうと思います。
生成AIの力を借りて十数年ぶりにしているプログラミングですが、楽しく、業務の効率化もできています。
判断は人間がしつつ、実装や作業は生成AIをうまく活用しないと損ですね。