SDKを使いましょう
前回、こちらの記事で評価SQLセットをAPI経由で作成する方法をご紹介しました。
この時はshellスクリプトをちくちく作成する方針を採用していたのですが、
別途公開されているSDKを活用することで、より見通し良くAPIの利用が可能です。
考え方
Insight SQL Testing経由でCloudwatch Logsから生ログによる評価SQLセットの作成を行います。
1つのSQLセットに含まれるSQLの本数は1000万本以内であることを要件としており、これを実現する為に、
- 先に指定期間内の評価SQLセットを指定した間隔で分割して作成しきってからアセスメントを実行するか、
- 指定期間内の評価SQLセットを指定間隔で分割して作成しその後即アセスメント実行を繰り返すか、
のいずれかの手段を採用しています。暑苦しいですが、処理の流れのイメージとしては以下の通りです。
では、実際のコードを見てみます
import datetime
from helpers import generate_intervals_for_insight_sql_testing
from insight_sql_testing import InsightSQLTesting
INSIGHT_SQL_TESTING_SETTING = {
"EC2_IP_ADDRESS": "[[InsghtSqlTestingマネージャサーバのIPアドレス]]",
"URL_BASE": "http://[[InsghtSqlTestingマネージャサーバのIPアドレス]]:7777/idt/",
"SQL_TESTING_USER": "[[InsightSqlTestingにログインするためのユーザ名]]",
"SQL_TESTING_PASS": "[[InsightSqlTestingにログインするためのパスワード]]",
"TARGET_DB_NAME": "[[InsightSqlTestingにおけるターゲットDB名]]", # ex) AuroraPostgreSQL_14.6
"CMP_SOURCE_DB_NAME": "[[InsightSqlTestingにおけるソースDB名]]", # ex) AuroraPostgreSQL_12.9
"TARGET_DB_USER": "[[DBのユーザ名]]",
"TARGET_DB_PASS": "[[DBのパスワード]]",
"PROCESS_SEQUENTIALLY": True, # True: SQLセットの作成→アセスメント実行を繰り返す。False: SQLセットを作成しきったのちにアセスメント実行を繰り返す
"CONCURRENCY_FOR_ASSESSMENT": 4, # 同時セッション数
}
CLOUD_WATCH_SETTING = {
"REGION": "[[対象となるCloudWatchLogsのリージョン]]", # ex) ap-northeast-1
"LOG_GROUP_NAME": "[[CloudWatch Logsにおけるロググループ名]]", # ex)"/aws/rds/cluster/training-inoka-aurora-postgresql-v12-16/postgresql",
"LOG_FORMAT": "[[Log format]]", # 次のいずれか:"postgres" "mysql" "aurora2"
"DATABASE_NAME": "[[データベース名(上記で指定したものと同じ)]]",
"LOG_RETRIEVE_FROM": "[[CloudWatch Logsから取得するログの開始時刻(UTCで指定)]]", # ex) "2024-11-20 23:00:00",
"LOG_RETRIEVE_TO": "[[CloudWatch Logsから取得するログの終了時刻(UTCで指定)]]", # ex)"2024-11-20 23:00:30",
"LOG_RETRIEVE_DURATION": "[[ログの取得間隔]]", # ex)"30 minutes", # SQLセットの作成間隔。
}
def main():
sql_workload_name_template = "SqlWorkloadFromAPI_{:%Y%m%dT%H%M%S}_[{}]-[{}]"
assessment_name_template = "AssessmentFromAPI_{:%Y%m%dT%H%M%S}_[{}]-[{}]"
with InsightSQLTesting(
INSIGHT_SQL_TESTING_SETTING["URL_BASE"],
INSIGHT_SQL_TESTING_SETTING["SQL_TESTING_USER"],
INSIGHT_SQL_TESTING_SETTING["SQL_TESTING_PASS"],
) as sql_testing:
sql_workloads = {}
t_delta = datetime.timedelta(hours=9)
jst = datetime.timezone(t_delta, "JST")
now = datetime.datetime.now(jst)
retrieve_intervals = generate_intervals_for_insight_sql_testing(
CLOUD_WATCH_SETTING["LOG_RETRIEVE_FROM"],
CLOUD_WATCH_SETTING["LOG_RETRIEVE_TO"],
CLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"],
)
for interval in retrieve_intervals:
# 評価SQLセット作成(from CloudWatch Logs)
sql_workload_info = sql_testing.create_sql_workload_cloudwatch_logs(
sql_workload_name=sql_workload_name_template.format(
now, interval[0], interval[1]
),
region=CLOUD_WATCH_SETTING["REGION"],
log_group_name=CLOUD_WATCH_SETTING["LOG_GROUP_NAME"],
log_format=CLOUD_WATCH_SETTING["LOG_FORMAT"],
database_name=CLOUD_WATCH_SETTING["DATABASE_NAME"],
log_start_time=interval[0],
log_end_time=interval[1],
is_unique=False,
)
sql_workloads[sql_workload_info["id"]] = assessment_name_template.format(
now, interval[0], interval[1]
)
# SQLセット作成後即アセスメント実行
if INSIGHT_SQL_TESTING_SETTING["PROCESS_SEQUENTIALLY"]:
assessment_info = sql_testing.execute_assessment(
assessment_name=sql_workloads[sql_workload_info["id"]],
sql_workload_id=sql_workload_info["id"],
db_users=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_USER"]],
db_user_passwords=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_PASS"]],
target_db_id=sql_testing.get_database_id_from_name(
INSIGHT_SQL_TESTING_SETTING["TARGET_DB_NAME"]
),
cmp_source_db_id=sql_testing.get_database_id_from_name(
INSIGHT_SQL_TESTING_SETTING["CMP_SOURCE_DB_NAME"]
),
concurrency=INSIGHT_SQL_TESTING_SETTING[
"CONCURRENCY_FOR_ASSESSMENT"
],
)
print(assessment_info)
# 分割して作成した評価SQLセット群に対して順次アセスメントを実行する。
if not INSIGHT_SQL_TESTING_SETTING["PROCESS_SEQUENTIALLY"]:
for sql_workload_id in sql_workloads:
assessment_info = sql_testing.execute_assessment(
assessment_name=sql_workloads[sql_workload_id],
sql_workload_id=sql_workload_id,
db_users=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_USER"]],
db_user_passwords=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_PASS"]],
target_db_id=sql_testing.get_database_id_from_name(
INSIGHT_SQL_TESTING_SETTING["TARGET_DB_NAME"]
),
cmp_source_db_id=sql_testing.get_database_id_from_name(
INSIGHT_SQL_TESTING_SETTING["CMP_SOURCE_DB_NAME"]
),
concurrency=INSIGHT_SQL_TESTING_SETTING[
"CONCURRENCY_FOR_ASSESSMENT"
],
)
print(assessment_info)
取得インターバルの調整
CLOUD_WATCH_SETTING["LOG_RETRIEVE_FROM"]
~CLOUD_WATCH_SETTING["LOG_RETRIEVE_TO"]
の間に存在するログをCLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"]
に従い分割して評価SQLセットを作成します。
以下は、上記でそれぞれ分割する間隔をAPIに投げるためのフォーマットを作成するための関数群ですが、ご参考までに。
import re
from datetime import timedelta
from dateutil import parser
time_deltas = {
"microsecond": timedelta(microseconds=1),
"microseconds": timedelta(microseconds=1),
"millisecond": timedelta(milliseconds=1),
"milliseconds": timedelta(milliseconds=1),
"second": timedelta(seconds=1),
"seconds": timedelta(seconds=1),
"minute": timedelta(minutes=1),
"minutes": timedelta(minutes=1),
"hour": timedelta(hours=1),
"hours": timedelta(hours=1),
"day": timedelta(days=1),
"days": timedelta(days=1),
"week": timedelta(weeks=1),
"weeks": timedelta(weeks=1),
"month": timedelta(days=30),
"months": timedelta(days=30),
"year": timedelta(days=365),
"years": timedelta(days=365),
}
pattern = r"(([+-]?\d+)\s*(microsecond|microseconds|millisecond|milliseconds|second|seconds|minute|minutes|hour|hours|day|days|week|weeks|month|months|year|years))+"
def date_command_parser(expression):
amount = 0
match_all = re.findall(pattern, expression)
if match_all:
for match in match_all:
this_amount = int(match[1])
unit = match[2]
if unit in time_deltas:
amount += timedelta.total_seconds(this_amount * time_deltas[unit])
return amount
return None
def get_interval(
start_datetime_str,
end_datetime_str,
duration_expression,
result_format=None,
):
result = []
start_datetime = parser.parse(start_datetime_str)
end_datetime = parser.parse(end_datetime_str)
duration = date_command_parser(duration_expression)
current_datetime = start_datetime
result.append(
start_datetime.strftime(result_format) if result_format else start_datetime
)
while current_datetime < end_datetime:
current_datetime = current_datetime + timedelta(seconds=duration)
if current_datetime < end_datetime:
result.append(
current_datetime.strftime(result_format)
if result_format
else current_datetime
)
result.append(
end_datetime.strftime(result_format) if result_format else end_datetime
)
return result
def generate_intervals_for_insight_sql_testing(
start_datetime_str,
end_datetime_str,
duration_expression,
):
format = "%Y-%m-%dT%H:%M:%SZ"
result = []
intervals = get_interval(start_datetime_str, end_datetime_str, duration_expression)
for i in range(len(intervals)):
if i == len(intervals) - 1:
break
else:
result.append(
tuple(
map(
lambda dt: dt.strftime(format), [intervals[i], intervals[i + 1]]
)
)
)
return result
Insight SQL Testingでは1つの評価SQLセットに含まれるSQLの本数は1000万本以下であることを要件としており(あまりに大きい評価SQLセットでは処理時間が増大してしまうため)、
CLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"]
を調整することで、1つの評価SQLセットのサイズを要件に適合することを目指します。
- SQLテストソフトウェア「Insight SQL Testing」
- https://www.insight-tec.com/products/sqltesting/