0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Insight SQL TestingをAPIから利用する - 評価SQLセットの作成からアセスメントの実行まで

Last updated at Posted at 2025-03-03

SDKを使いましょう

前回、こちらの記事で評価SQLセットをAPI経由で作成する方法をご紹介しました。

この時はshellスクリプトをちくちく作成する方針を採用していたのですが、
別途公開されているSDKを活用することで、より見通し良くAPIの利用が可能です。

考え方

Insight SQL Testing経由でCloudwatch Logsから生ログによる評価SQLセットの作成を行います。
1つのSQLセットに含まれるSQLの本数は1000万本以内であることを要件としており、これを実現する為に、

  • 先に指定期間内の評価SQLセットを指定した間隔で分割して作成しきってからアセスメントを実行するか、
  • 指定期間内の評価SQLセットを指定間隔で分割して作成しその後即アセスメント実行を繰り返すか、

のいずれかの手段を採用しています。暑苦しいですが、処理の流れのイメージとしては以下の通りです。

  • 分割した評価SQLセットとアセスメントの作成を繰り返す
    20250220_Testingの概念説明(スクリプト用).png

  • 先に分割した評価SQLセットを作成しきる
    20250220_Testingの概念説明(スクリプト用) (1).png

では、実際のコードを見てみます

main.py
import datetime
from helpers import generate_intervals_for_insight_sql_testing
from insight_sql_testing import InsightSQLTesting

INSIGHT_SQL_TESTING_SETTING = {
    "EC2_IP_ADDRESS": "[[InsghtSqlTestingマネージャサーバのIPアドレス]]",
    "URL_BASE": "http://[[InsghtSqlTestingマネージャサーバのIPアドレス]]:7777/idt/",
    "SQL_TESTING_USER": "[[InsightSqlTestingにログインするためのユーザ名]]",
    "SQL_TESTING_PASS": "[[InsightSqlTestingにログインするためのパスワード]]",
    "TARGET_DB_NAME": "[[InsightSqlTestingにおけるターゲットDB名]]",  # ex) AuroraPostgreSQL_14.6
    "CMP_SOURCE_DB_NAME": "[[InsightSqlTestingにおけるソースDB名]]",  # ex) AuroraPostgreSQL_12.9
    "TARGET_DB_USER": "[[DBのユーザ名]]",
    "TARGET_DB_PASS": "[[DBのパスワード]]",
    "PROCESS_SEQUENTIALLY": True,  # True: SQLセットの作成→アセスメント実行を繰り返す。False: SQLセットを作成しきったのちにアセスメント実行を繰り返す
    "CONCURRENCY_FOR_ASSESSMENT": 4,  # 同時セッション数
}

CLOUD_WATCH_SETTING = {
    "REGION": "[[対象となるCloudWatchLogsのリージョン]]",  # ex) ap-northeast-1
    "LOG_GROUP_NAME": "[[CloudWatch Logsにおけるロググループ名]]",  # ex)"/aws/rds/cluster/training-inoka-aurora-postgresql-v12-16/postgresql",
    "LOG_FORMAT": "[[Log format]]",  # 次のいずれか:"postgres" "mysql" "aurora2"
    "DATABASE_NAME": "[[データベース名(上記で指定したものと同じ)]]",
    "LOG_RETRIEVE_FROM": "[[CloudWatch Logsから取得するログの開始時刻(UTCで指定)]]",  # ex) "2024-11-20 23:00:00",
    "LOG_RETRIEVE_TO": "[[CloudWatch Logsから取得するログの終了時刻(UTCで指定)]]",  # ex)"2024-11-20 23:00:30",
    "LOG_RETRIEVE_DURATION": "[[ログの取得間隔]]",  # ex)"30 minutes",  # SQLセットの作成間隔。
}

def main():
    sql_workload_name_template = "SqlWorkloadFromAPI_{:%Y%m%dT%H%M%S}_[{}]-[{}]"
    assessment_name_template = "AssessmentFromAPI_{:%Y%m%dT%H%M%S}_[{}]-[{}]"

    with InsightSQLTesting(
        INSIGHT_SQL_TESTING_SETTING["URL_BASE"],
        INSIGHT_SQL_TESTING_SETTING["SQL_TESTING_USER"],
        INSIGHT_SQL_TESTING_SETTING["SQL_TESTING_PASS"],
    ) as sql_testing:

        sql_workloads = {}

        t_delta = datetime.timedelta(hours=9)
        jst = datetime.timezone(t_delta, "JST")
        now = datetime.datetime.now(jst)

        retrieve_intervals = generate_intervals_for_insight_sql_testing(
            CLOUD_WATCH_SETTING["LOG_RETRIEVE_FROM"],
            CLOUD_WATCH_SETTING["LOG_RETRIEVE_TO"],
            CLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"],
        )

        for interval in retrieve_intervals:
            # 評価SQLセット作成(from CloudWatch Logs)
            sql_workload_info = sql_testing.create_sql_workload_cloudwatch_logs(
                sql_workload_name=sql_workload_name_template.format(
                    now, interval[0], interval[1]
                ),
                region=CLOUD_WATCH_SETTING["REGION"],
                log_group_name=CLOUD_WATCH_SETTING["LOG_GROUP_NAME"],
                log_format=CLOUD_WATCH_SETTING["LOG_FORMAT"],
                database_name=CLOUD_WATCH_SETTING["DATABASE_NAME"],
                log_start_time=interval[0],
                log_end_time=interval[1],
                is_unique=False,
            )

            sql_workloads[sql_workload_info["id"]] = assessment_name_template.format(
                now, interval[0], interval[1]
            )

            # SQLセット作成後即アセスメント実行
            if INSIGHT_SQL_TESTING_SETTING["PROCESS_SEQUENTIALLY"]:
                assessment_info = sql_testing.execute_assessment(
                    assessment_name=sql_workloads[sql_workload_info["id"]],
                    sql_workload_id=sql_workload_info["id"],
                    db_users=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_USER"]],
                    db_user_passwords=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_PASS"]],
                    target_db_id=sql_testing.get_database_id_from_name(
                        INSIGHT_SQL_TESTING_SETTING["TARGET_DB_NAME"]
                    ),
                    cmp_source_db_id=sql_testing.get_database_id_from_name(
                        INSIGHT_SQL_TESTING_SETTING["CMP_SOURCE_DB_NAME"]
                    ),
                    concurrency=INSIGHT_SQL_TESTING_SETTING[
                        "CONCURRENCY_FOR_ASSESSMENT"
                    ],
                )
                print(assessment_info)

        # 分割して作成した評価SQLセット群に対して順次アセスメントを実行する。
        if not INSIGHT_SQL_TESTING_SETTING["PROCESS_SEQUENTIALLY"]:
            for sql_workload_id in sql_workloads:
                assessment_info = sql_testing.execute_assessment(
                    assessment_name=sql_workloads[sql_workload_id],
                    sql_workload_id=sql_workload_id,
                    db_users=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_USER"]],
                    db_user_passwords=[INSIGHT_SQL_TESTING_SETTING["TARGET_DB_PASS"]],
                    target_db_id=sql_testing.get_database_id_from_name(
                        INSIGHT_SQL_TESTING_SETTING["TARGET_DB_NAME"]
                    ),
                    cmp_source_db_id=sql_testing.get_database_id_from_name(
                        INSIGHT_SQL_TESTING_SETTING["CMP_SOURCE_DB_NAME"]
                    ),
                    concurrency=INSIGHT_SQL_TESTING_SETTING[
                        "CONCURRENCY_FOR_ASSESSMENT"
                    ],
                )
                print(assessment_info)

取得インターバルの調整

CLOUD_WATCH_SETTING["LOG_RETRIEVE_FROM"]CLOUD_WATCH_SETTING["LOG_RETRIEVE_TO"]の間に存在するログをCLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"]に従い分割して評価SQLセットを作成します。

以下は、上記でそれぞれ分割する間隔をAPIに投げるためのフォーマットを作成するための関数群ですが、ご参考までに。

helpers.py
import re
from datetime import timedelta
from dateutil import parser

time_deltas = {
    "microsecond": timedelta(microseconds=1),
    "microseconds": timedelta(microseconds=1),
    "millisecond": timedelta(milliseconds=1),
    "milliseconds": timedelta(milliseconds=1),
    "second": timedelta(seconds=1),
    "seconds": timedelta(seconds=1),
    "minute": timedelta(minutes=1),
    "minutes": timedelta(minutes=1),
    "hour": timedelta(hours=1),
    "hours": timedelta(hours=1),
    "day": timedelta(days=1),
    "days": timedelta(days=1),
    "week": timedelta(weeks=1),
    "weeks": timedelta(weeks=1),
    "month": timedelta(days=30),
    "months": timedelta(days=30),
    "year": timedelta(days=365),
    "years": timedelta(days=365),
}

pattern = r"(([+-]?\d+)\s*(microsecond|microseconds|millisecond|milliseconds|second|seconds|minute|minutes|hour|hours|day|days|week|weeks|month|months|year|years))+"


def date_command_parser(expression):

    amount = 0
    match_all = re.findall(pattern, expression)
    if match_all:
        for match in match_all:
            this_amount = int(match[1])
            unit = match[2]
            if unit in time_deltas:
                amount += timedelta.total_seconds(this_amount * time_deltas[unit])
        return amount
    return None


def get_interval(
    start_datetime_str,
    end_datetime_str,
    duration_expression,
    result_format=None,
):

    result = []
    start_datetime = parser.parse(start_datetime_str)
    end_datetime = parser.parse(end_datetime_str)
    duration = date_command_parser(duration_expression)
    current_datetime = start_datetime
    result.append(
        start_datetime.strftime(result_format) if result_format else start_datetime
    )

    while current_datetime < end_datetime:
        current_datetime = current_datetime + timedelta(seconds=duration)
        if current_datetime < end_datetime:
            result.append(
                current_datetime.strftime(result_format)
                if result_format
                else current_datetime
            )
    result.append(
        end_datetime.strftime(result_format) if result_format else end_datetime
    )

    return result


def generate_intervals_for_insight_sql_testing(
    start_datetime_str,
    end_datetime_str,
    duration_expression,
):
    format = "%Y-%m-%dT%H:%M:%SZ"
    result = []

    intervals = get_interval(start_datetime_str, end_datetime_str, duration_expression)

    for i in range(len(intervals)):
        if i == len(intervals) - 1:
            break
        else:
            result.append(
                tuple(
                    map(
                        lambda dt: dt.strftime(format), [intervals[i], intervals[i + 1]]
                    )
                )
            )

    return result

Insight SQL Testingでは1つの評価SQLセットに含まれるSQLの本数は1000万本以下であることを要件としており(あまりに大きい評価SQLセットでは処理時間が増大してしまうため)、
CLOUD_WATCH_SETTING["LOG_RETRIEVE_DURATION"]を調整することで、1つの評価SQLセットのサイズを要件に適合することを目指します。


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?