0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PostgreSQLで正規化が必要ない時も中間テーブルを作るべきか - パフォーマンスの観点から

Posted at

主題
正規化をしない場合でも、一定の共通性のある多対多の場合は中間テーブルを作成した方が良いのか?

Postgesqlの設計において、中間テーブルを作成するかどうかは悩ましいところだ。
中間テーブルを作成するメリットとしては、下記のようなものがある。

多対多(N:M)関係の表現

一般的によく使われる場面がこれです。
ユーザーの居住地の都道府県を管理する場合など、多対多の関係性の表現で用いられることが多い。

正規化によるデータの整合性向上

都道府県の情報など、値の種類が固定されている場合はハードコーディングせずにデータベースで管理することでデータが荒れるのを防ぐことができる。
また変更に対しても強くなる。

パフォーマンスの最適化
↑↑↑↑↑↑↑
今回のテーマがこれ

さて今回のテーマが最後のパフォーマンスの最適化です。
企業のデータベースを作ることをイメージして頂きたいです。企業の情報をスクレイピングで集めてくると、サイトごとに企業についている多種多様なタグがあります(例:積極採用中、最近資金調達、他)。

サイトごとにタグにある程度の法則はあったりするので、関係性としては多対多になります。
しかし、タグは正規化されていないので、固定の種類があるわけではなりません。

このような状況において、中間テーブルを使用するべきなのか、中間テーブルを作成せず企業IDとタグ名のテーブルを作成すれば良いのか。それをパフォーマンスの面から検証してみました。

注:テーブル作成やダミーデータの作成、分析などのコードは最後に載せています。

テーブル定義書

companies テーブル

カラム名 データ型 NULL許可 主キー 外部キー 備考
id Integer × 自動インクリメント
name String × 企業名

リレーション

  • all_in_tags: AllInTag テーブルとの1対多の関係
  • l_company_tags: LCompanyTag テーブルとの1対多の関係

all_in_tag テーブル(企業ごとにタグを直接持つ)

カラム名 データ型 NULL許可 主キー 外部キー 備考
id Integer × 自動インクリメント
company_id Integer × companies.id CASCADE削除
tag_name String × タグ名

リレーション

  • company: Company テーブルとの多対1の関係

tags テーブル(正規化したタグ管理)

カラム名 データ型 NULL許可 主キー 外部キー 備考
id Integer × 自動インクリメント
tag String × ユニーク制約

リレーション

  • l_company_tags: LCompanyTag テーブルとの1対多の関係

l_company_tag テーブル(中間テーブル:企業とタグの多対多関係)

カラム名 データ型 NULL許可 主キー 外部キー 備考
id Integer × 自動インクリメント
company_id Integer × companies.id CASCADE削除
tag_id Integer × tags.id CASCADE削除

リレーション

  • company: Company テーブルとの多対1の関係
  • tag: Tag テーブルとの多対1の関係

ダミーデータの作成

今回の検証の前提
企業数:300万
タグ数:1万(アルファベット小文字3文字をランダムで生成、ランダム生成の結果の重複を許容)
1つのタグが何件の企業に付与されるか:平均2,000 標準偏差1,000 範囲50~10000で生成したランダムな数字を作成し、その件数分の企業をランダムで抽出、タグを付与した。

上記で生成したデータベースの結果は下記のようになった(Dockerのvolumesが6GBくらい)。

企業数: 3,000,000(300万社)
タグ数: 7,583種類
1企業あたりのタグ数:
平均: 約 6.73
標準偏差: 約 2.58(タグのばらつきの度合い)
最頻値: 6(最も多くの企業が持つタグの数)
中央値: 7(全企業のタグ数の中央値)

検索のテスト

今回はインデックスの設定を行なっていない。

1回目は適当にtag名のtdの部分一致で検索した。
結果は下記の通りで、中間テーブルがある方が約3倍高速に処理ができた。

実行回数: 10 回
検索一致件数:68696 件

# 中間テーブルなし
平均処理時間: 1.73651 秒
最小処理時間: 1.62792 秒
最大処理時間: 1.92060 秒

# 中間テーブルあり
平均処理時間: 0.58138 秒
最小処理時間: 0.52922 秒
最大処理時間: 0.71784 秒

次に検索にヒットする件数が減るように、sssの部分一致で検索した。
検索にヒットする件数が減った結果、中間テーブルがある方が約5倍高速になった。

実行回数: 10 回
検索一致件数:1543 件

# 中間テーブルなし
平均処理時間: 1.34004 秒
最小処理時間: 1.30966 秒
最大処理時間: 1.56466 秒

# 中間テーブルあり
平均処理時間: 0.25989 秒
最小処理時間: 0.24791 秒
最大処理時間: 0.32355 秒

件数が少ない方が処理が高速になったことから、さらにヒットする件数を増やしてテストを行う。
s1文字での検索を行った。
さらに差が縮まる結果となった。

実行回数: 10 回
検索一致件数:1488132 件

# 中間テーブルなし
平均処理時間: 7.87023 秒
最小処理時間: 7.60600 秒
最大処理時間: 8.35773 秒

# 中間テーブルあり
平均処理時間: 7.10237 秒
最小処理時間: 6.80466 秒
最大処理時間: 7.49426 

考察

なぜ検索ヒット件数が増えると差が縮まるのか?
PostgreSQLの根本的な仕組みについて熟知しているわけではないので、生成AIとやりとりしながら考察しました。

データ量が増えると、インデックスのフィルタリング効果が減少

中間テーブルは タグと企業のマッピングを事前に最適化 することで、検索対象を減らしている。
しかし、検索ヒット件数が多すぎる場合、結局すべてのレコードをスキャンするため、速度差が縮まる。

PostgreSQL の処理コスト

検索結果が少ない場合は、フィルタリングが効果的で、不要なデータを早く除外できる。
検索結果が多くなると、データ取得や結合処理のオーバーヘッドが増加 し、中間テーブルを使っても速度差が小さくなる。

I/O 負荷の増加

LIKE '%s%' のように検索ヒット数が 100万件を超えると、インデックスを使用してもディスクI/O の影響が大きくなる。
最終的に全データをスキャンする時間が支配的になり、中間テーブルのメリットが相対的に小さくなる。

データの追加

次はデータの追加についてテストしてみます。
今回の例では、企業情報のスクレイピングなどで取得して企業名とtagで保存するようなことを考えます。

ランダムに企業を選び、ランダムに作成したタグをその企業に付与していきます。
1000件分テストを行った。その結果は下記の表の通りである(同一タグは503件あった)。

指標 中間テーブルなし の処理速度 中間テーブルあり の処理速度
平均 0.002419 0.003352
標準偏差 0.000515 0.001233
最頻値 0.002043 0.002438
中央値 0.002354 0.002954
最小 0.001317 0.001293
最大 0.006293 0.011290

やはり、既存の値の確認や挿入がない分、中間テーブルがない方が処理が高速になります。
1万件のデータがあり、タグの同一度も今回と同一と仮定すると、中間テーブルを使うと、約 1.39 倍(38.57% )遅くなることが想定される。タグの種類が増えるとさらに差が開く可能性もある。

処理方式 1万件の処理時間 (秒)
中間テーブルなし 24.19 秒
中間テーブルあり 33.52 秒

このことから、検索に強い中間テーブルと追加に強い中間テーブルなしという構図が見えてくる。

データの総量の増加

ここまでの検証から、タグの数よりもタグと企業の結びつきの数が増えることでさらに差が出るような気がするので、「1つのタグが何件の企業に付与されるか」の部分を大きく増やして再度テストを行ってみる。

企業数:300万
タグ数:1万(アルファベット小文字3文字をランダムで生成、ランダム生成の結果の重複を許容)
1つのタグが何件の企業に付与されるか:平均10,000 標準偏差5,000 範囲1~50000で生成したランダムな数字を作成し、その件数分の企業をランダムで抽出、タグを付与した。

作成した結果が下記の通りである(Dockerのvolumesが15GBくらいありました)。
企業数: 3,000,000(300万社)
タグ数: 7,631種類
1企業あたりのタグ数:
平均: 33.35
標準偏差: 5.77
最頻値: 33
中央値: 33.0

企業数とタグ数はあまり変わりませんでした。値をいじっていないので、再現性が高くて良かったです。
1企業あたりのタグ数が爆増したのでこれでまたテストを行ってみます。

まずは最も検索結果の多いsの部分一致で検索します。

実行回数: 10 回
検索一致件数:2610907 件

# 中間テーブルなし
平均処理時間: 33.29583 秒
最小処理時間: 31.93184 秒
最大処理時間: 35.94520 秒

# 中間テーブルあり
実行回数: 10 回
平均処理時間: 28.00889 秒
最小処理時間: 26.89056 秒
最大処理時間: 30.29725 秒

次はtdの部分一致で検索します。

実行回数: 10 回
検索一致件数: 299060件

# 中間テーブルなし
平均処理時間: 8.01372 秒
最小処理時間: 7.36693 秒
最大処理時間: 9.56331 秒

# 中間テーブルあり
平均処理時間: 3.07468 秒
最小処理時間: 2.49493 秒
最大処理時間: 6.40200 秒

再度にsssの部分一致で検索します。

実行回数: 10 回
検索一致件数:0 件

# 中間テーブルなし
平均処理時間: 6.64658 秒
最小処理時間: 6.28652 秒
最大処理時間: 7.95311 秒

# 中間テーブルあり
平均処理時間: 0.09830 秒
最小処理時間: 0.09444 秒
最大処理時間: 0.11451 秒

今回はsssの結果が0件だったので、bdmで再度テストしてみます。

実行回数: 10 回
検索一致件数:15268 件

# 中間テーブルなし
平均処理時間: 6.76783 秒
最小処理時間: 6.22868 秒
最大処理時間: 8.73820 秒

# 中間テーブルあり
平均処理時間: 1.54840 秒
最小処理時間: 1.19722 秒
最大処理時間: 4.10174 秒

前回同様、検索でヒットする件数が減る方が、差が顕著に大きくなります。
中間テーブルを用いない場合は、毎回全ての行を調べる必要がありますが、中間テーブルを用いて検索対象の重複を事前に減らすことができるので、検索処理が早くなっていることが想像できます。

これは逆を言えば、タグの種類が増えてしまうと差が出づらい可能性が高いということです。
それについてもテストをしてみます。

タグの種類が多い場合

今回はタグの種類を増やしたかったので、下記のような設定でデータを作成しました。
企業数:300万
タグ数:1万(アルファベット小文字6文字をランダムで生成、ランダム生成の結果の重複を許容)
1つのタグが何件の企業に付与されるか:平均2,000 標準偏差1,000 範囲50~10000で生成したランダムな数字を作成し、その件数分の企業をランダムで抽出、タグを付与した。

作成結果
企業数: 3,000,000(300万社)
タグ数: 10,000種類
1企業あたりのタグ数:

平均: 6.70
標準偏差: 2.58
最頻値: 6
中央値: 7.0

タグが全て異なるものになりました。
tdの部分一致で検索してみました。
こちらでも中間テーブルありの方が平均処理時間が約2.8倍早く処理しています。

実行回数: 10 回
検索一致件数:126579 件

# 中間テーブルなし
平均処理時間: 1.98278 秒
最小処理時間: 1.88219 秒
最大処理時間: 2.63487 秒

# 中間テーブルあり
平均処理時間: 0.71855 秒
最小処理時間: 0.68573 秒
最大処理時間: 0.90634 秒

この後も、1タグあたりに紐づく企業数を減らしたデータでも検証しましたが、一貫して中間テーブルありの方が検索が2.5倍程度早かったです。

まとめ

タグに全く同じ値が入らない場合は別ですが、タグが複数の企業に紐づくような場面においては中間テーブルを使用する方が良さそうです。

ただこれは検索をすることを前提とした結論です。滅多に検索をしないような場面では、データの追加だけをする中間テーブルなしの方が処理が最適化されると思います。

結局「場合による」という結論になって気持ち悪いですが、データベースの構造を考える際に、そのデータをどのように使う可能性があるのか?ということを考えるのは必須だと思います。

主題
正規化をしない場合でも、一定の共通性のある多対多の場合は中間テーブルを作成した方が良いのか?

結論

中間テーブルを作成する方が、良い場合が多そう。

  • 検索パフォーマンスを考えるならば、中間テーブルあり
  • データの追加を考えるならば、中間テーブルなし

今後の展望

今回テストを行う中で、Postgresqlにtsvector型というものがあることを知りました。全文検索などの場合はtsvector型が高速であり中間テーブルの有無とは別軸で最適化ができる可能性があります。
ただ、tsvector型はPostgresqlの機能だけでは日本語対応していない様なので、DB手前のバックエンドの処理を入れる必要があり、検証が複雑になりそうです。

また今回は、企業とタグの組み合わせの重複を許容していました。しかし実際の運用を考えると、組み合わせでの重複は単にデータが増えるだけで良いことがなさそうなので、基本的に重複はしなようにした方がいいと思います。これは中間テーブルの有無に関わらず検討すべきことです。
INSERT時に組み合わせでのユニーク制約みたいな機能は使ったことがありませんが、そちらも試してみたいです。

ソースコード

今回、検証に使用したコードが下記です。

テーブルの作成

from sqlalchemy import Column, ForeignKey, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker

# PostgreSQLの接続設定(適宜変更してください)
DATABASE_URL = "postgresql://postgres:password@localhost:5432/test_db"


# SQLAlchemyのエンジン作成
engine = create_engine(DATABASE_URL, echo=True)

# Baseクラスの作成
Base = declarative_base()


# companies テーブル
class Company(Base):
    __tablename__ = "companies"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String, nullable=False)

    # リレーションシップ
    all_in_tags = relationship(
        "AllInTag", back_populates="company", cascade="all, delete-orphan"
    )
    l_company_tags = relationship(
        "LCompanyTag", back_populates="company", cascade="all, delete-orphan"
    )


# all_in_tag テーブル(企業ごとにタグを直接持つ)
class AllInTag(Base):
    __tablename__ = "all_in_tag"

    id = Column(Integer, primary_key=True, autoincrement=True)
    company_id = Column(
        Integer, ForeignKey("companies.id", ondelete="CASCADE"), nullable=False
    )
    tag_name = Column(String, nullable=False)

    # リレーションシップ
    company = relationship("Company", back_populates="all_in_tags")


# tags テーブル(正規化したタグ管理)
class Tag(Base):
    __tablename__ = "tags"

    id = Column(Integer, primary_key=True, autoincrement=True)
    tag = Column(String, unique=True, nullable=False)

    # リレーションシップ
    l_company_tags = relationship(
        "LCompanyTag", back_populates="tag", cascade="all, delete-orphan"
    )


# l_company_tag テーブル(中間テーブル:企業とタグの多対多関係)
class LCompanyTag(Base):
    __tablename__ = "l_company_tag"

    id = Column(Integer, primary_key=True, autoincrement=True)
    company_id = Column(
        Integer, ForeignKey("companies.id", ondelete="CASCADE"), nullable=False
    )
    tag_id = Column(Integer, ForeignKey("tags.id", ondelete="CASCADE"), nullable=False)

    # リレーションシップ
    company = relationship("Company", back_populates="l_company_tags")
    tag = relationship("Tag", back_populates="l_company_tags")


# データベースにテーブルを作成
Base.metadata.create_all(engine)

# セッション作成
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

print("テーブル作成完了!")

ダミーデータの作成

import csv
import logging
import os
import random
import string
import time
from tempfile import NamedTemporaryFile

import numpy as np
import psycopg2


# ロガー設定
def get_logger():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger


logger = get_logger()


# タグのダミーデータを生成する
def generate_tags(tag_length):
    return "".join(random.choices(string.ascii_lowercase, k=tag_length))


# ランダム数の生成
def generate_random_integer(mean=200, std_dev=100, low=50, high=1000, size=1):
    values = np.random.normal(mean, std_dev, size)
    values = np.clip(values, low, high)
    return np.round(values).astype(int)


# PostgreSQLに接続
def make_connection():
    return psycopg2.connect(
        dbname="test_db",
        user="postgres",
        password="password",
        host="localhost",
        port="5432",
    )


# 企業データの挿入(COPY FROMを使用)
def insert_companies():
    conn = make_connection()
    cursor = conn.cursor()

    # 企業名のダミーデータを生成
    start_time = time.time()
    company_names = [("株式会社" + generate_tags(5),) for _ in range(3_000_000)]
    logger.info(f"企業名ダミーデータ生成_処理時間: {time.time() - start_time:.5f}")

    # 一時CSVファイルを作成
    with NamedTemporaryFile(mode="w", delete=False, newline="") as tmpfile:
        writer = csv.writer(tmpfile)
        writer.writerows(company_names)
        temp_file_path = tmpfile.name

    # COPY FROMを実行
    start_time = time.time()
    with open(temp_file_path, "r") as f:
        cursor.copy_from(f, "companies", sep=",", columns=("name",))

    conn.commit()
    logger.info(f"企業データ挿入(COPY)_処理時間: {time.time() - start_time:.5f}")

    cursor.close()
    conn.close()


# タグデータの挿入(COPY FROMを使用)
def insert_tags():
    conn = make_connection()
    cursor = conn.cursor()

    # companiesの全IDを取得
    cursor.execute("SELECT id FROM companies")
    company_ids = [row[0] for row in cursor.fetchall()]

    # ダミーデータ生成
    tag_data = []
    for _ in range(10000):
        # 1万個のタグの生成
        start_time = time.time()
        tag_name = generate_tags(6)
        how_many = generate_random_integer(size=1)[0]
        selected_company_ids = random.sample(company_ids, how_many)

        print(f"{_}回目 - {how_many}件の企業に{tag_name}を付与")

        ## all_in_tag テーブルへの挿入
        tag_data = []
        tag_data.extend([(company_id, tag_name) for company_id in selected_company_ids])
        # 一時CSVファイルを作成
        with NamedTemporaryFile(mode="w", delete=False, newline="") as tmpfile:
            writer = csv.writer(tmpfile)
            writer.writerows(tag_data)
            temp_file_path = tmpfile.name

        # COPY FROMを実行
        start_time = time.time()
        with open(temp_file_path, "r") as f:
            cursor.copy_from(
                f, "all_in_tag", sep=",", columns=("company_id", "tag_name")
            )

        conn.commit()
        os.remove(temp_file_path)
        logger.info(
            f"中間テーブルなし_処理時間{len(tag_data)}: {time.time() - start_time:.5f}"
        )

        ## tags テーブルへの挿入
        # tagsテーブルにタグが登録されているかを確認する
        start_time = time.time()
        cursor.execute("SELECT id, tag FROM tags WHERE tag = %s", (tag_name,))
        tag_id = cursor.fetchone()
        if tag_id is None:
            cursor.execute(
                "INSERT INTO tags (tag) VALUES (%s) RETURNING id", (tag_name,)
            )
            tag_id = cursor.fetchone()[0]
        else:
            tag_id = tag_id[0]
        tag_id = int(tag_id)
        # l_company_tag テーブルへの挿入
        l_company_tag_data = []
        for company_id in selected_company_ids:
            l_company_tag_data.append((company_id, tag_id))
        # 一時CSVファイルを作成
        with NamedTemporaryFile(mode="w", delete=False, newline="") as tmpfile:
            writer = csv.writer(tmpfile)
            writer.writerows(l_company_tag_data)
            temp_file_path = tmpfile.name

        # COPY FROMを実行
        with open(temp_file_path, "r") as f:
            cursor.copy_from(
                f, "l_company_tag", sep=",", columns=("company_id", "tag_id")
            )
        conn.commit()
        logger.info(
            f"中間テーブルあり_処理時間{len(l_company_tag_data)}: {time.time() - start_time:.5f}"
        )

    cursor.close()
    conn.close()


# 実行
def main():
    insert_companies()
    insert_tags()


main()

生成データの基本情報の確認

from make_table import Company, LCompanyTag, Tag
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker

# PostgreSQL に接続(適宜変更)
DATABASE_URL = "postgresql://postgres:password@localhost:5432/test_db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

# セッションを作成
session = SessionLocal()

# 企業数
company_count = session.query(func.count(Company.id)).scalar()

# タグ数(ユニークなタグの数)
tag_count = session.query(func.count(Tag.id)).scalar()

# 1企業あたりのタグ数を計算
tag_counts = (
    session.query(LCompanyTag.company_id, func.count(LCompanyTag.tag_id))
    .group_by(LCompanyTag.company_id)
    .all()
)

# タグ数のリストを作成
tag_count_list = [count for _, count in tag_counts]

# 平均、標準偏差、最頻値、中央値の計算
from collections import Counter

import numpy as np

if tag_count_list:
    mean_tags_per_company = np.mean(tag_count_list)
    std_tags_per_company = np.std(tag_count_list)
    median_tags_per_company = np.median(tag_count_list)
    most_common_tags_per_company = Counter(tag_count_list).most_common(1)[0][0]
else:
    mean_tags_per_company = std_tags_per_company = median_tags_per_company = (
        most_common_tags_per_company
    ) = 0

# タグの種類(ユニークなタグのリスト)
unique_tags = session.query(Tag.tag).distinct().all()
unique_tags = [tag[0] for tag in unique_tags]

# セッションを閉じる
session.close()

# 結果を表示
result = {
    "企業数": company_count,
    "タグ数": tag_count,
    "1企業あたりのタグ数": {
        "平均": mean_tags_per_company,
        "標準偏差": std_tags_per_company,
        "最頻値": most_common_tags_per_company,
        "中央値": median_tags_per_company,
    },
}

print(result)

検索テスト

import time

from make_table import AllInTag, Company, LCompanyTag, Tag
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

# PostgreSQL に接続
DATABASE_URL = "postgresql://postgres:password@localhost:5432/test_db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)


def search_companies_by_tag(session: Session, tag_keyword: str):
    """
    指定した部分一致タグを持つ企業の名前を取得する (all_in_tag 経由)
    :param session: SQLAlchemy セッション
    :param tag_keyword: 検索するタグの部分文字列
    :return: 企業名のリスト
    """
    query = (
        session.query(Company.name)
        .join(AllInTag, Company.id == AllInTag.company_id)
        .filter(AllInTag.tag_name.ilike(f"%{tag_keyword}%"))  # 部分一致
        .distinct()  # 重複を防ぐ
    )
    return [company[0] for company in query.all()]


def search_companies_by_ltabl(session: Session, tag_keyword: str):
    """
    指定した部分一致タグを持つ企業の名前を取得する (l_company_tag 経由)
    :param session: SQLAlchemy セッション
    :param tag_keyword: 検索するタグの部分文字列
    :return: 企業名のリスト
    """
    query = (
        session.query(Company.name)
        .join(LCompanyTag, Company.id == LCompanyTag.company_id)  # 中間テーブルを結合
        .join(Tag, LCompanyTag.tag_id == Tag.id)  # タグテーブルを結合
        .filter(Tag.tag.ilike(f"%{tag_keyword}%"))  # 部分一致検索 (大文字小文字を無視)
        .distinct()  # 重複を防ぐ
    )
    return [company[0] for company in query.all()]


def measure_query_time(
    session: Session, query_function, tag_keyword: str, num_trials: int = 10
):
    """
    クエリの処理時間を測定して出力する
    :param session: SQLAlchemy セッション
    :param query_function: 実行するクエリ関数
    :param tag_keyword: 検索するタグの部分文字列
    :param num_trials: 試行回数
    """
    execution_times = []

    for _ in range(num_trials):
        start_time = time.perf_counter()
        result = query_function(session, tag_keyword)
        end_time = time.perf_counter()

        execution_times.append(end_time - start_time)
        print(len(result), "件の企業が見つかりました")

    print(f"\n[{query_function.__name__}] の検索結果")
    print(f"実行回数: {num_trials}")
    print(f"平均処理時間: {sum(execution_times) / num_trials:.5f}")
    print(f"最小処理時間: {min(execution_times):.5f}")
    print(f"最大処理時間: {max(execution_times):.5f}\n")


# セッションを作成
session = SessionLocal()

# タグ "td" を含む企業名の検索と処理時間の計測
key_word = "td"
measure_query_time(session, search_companies_by_tag, key_word)
measure_query_time(session, search_companies_by_ltabl, key_word)

# セッションを閉じる
session.close()

データ追加テスト

import random
import string
import time
from collections import Counter

import numpy as np
from make_table import AllInTag, Company, LCompanyTag, Tag
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker

# PostgreSQL の接続設定
DATABASE_URL = "postgresql://postgres:password@localhost:5432/test_db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

# 追加回数
NUM_INSERTS = 1000


# タグを生成する関数
def generate_tag():
    return "".join(random.choices(string.ascii_lowercase, k=3))


# データ追加のスピード測定
def insert_all_in_tag(session, tag_name, company_id):
    """all_in_tag にデータを追加"""
    start_time = time.perf_counter()

    # タグを追加
    new_tag = AllInTag(company_id=company_id, tag_name=tag_name)
    session.add(new_tag)
    session.commit()

    elapsed_time = time.perf_counter() - start_time
    return elapsed_time


def insert_l_company_tag(session, tag_name, company_id):
    """l_company_tag にデータを追加"""
    start_time = time.perf_counter()

    # タグがすでに存在するか確認
    tag = session.query(Tag).filter_by(tag=tag_name).first()
    if not tag:
        tag = Tag(tag=tag_name)
        session.add(tag)
        session.commit()

    # 中間テーブルにデータを追加
    new_entry = LCompanyTag(company_id=company_id, tag_id=tag.id)
    session.add(new_entry)
    session.commit()

    elapsed_time = time.perf_counter() - start_time
    return elapsed_time


# セッションを作成
session = SessionLocal()

# 記録用リスト
all_in_tag_times = []
l_company_tag_times = []
existing_tag_count = 0

# 1000件のデータ追加
for _ in range(NUM_INSERTS + 1):
    new_tag = generate_tag()

    # 企業IDをランダムで1つ取得
    company_id = session.query(Company.id).order_by(func.random()).limit(1).scalar()

    # all_in_tag に追加
    all_in_tag_times.append(insert_all_in_tag(session, new_tag, company_id))

    # l_company_tag に追加
    existing_tags_before = session.query(Tag.id).filter_by(tag=new_tag).count()
    l_company_tag_times.append(insert_l_company_tag(session, new_tag, company_id))
    existing_tags_after = session.query(Tag.id).filter_by(tag=new_tag).count()

    if existing_tags_after > existing_tags_before:
        existing_tag_count += 1  # 過去と同じタグがあった場合

# セッションを閉じる
session.close()


# 統計情報を計算
def compute_statistics(times):
    return {
        "平均": np.mean(times),
        "標準偏差": np.std(times),
        "最頻値": Counter(times).most_common(1)[0][0],
        "中央値": np.median(times),
        "最小": np.min(times),
        "最大": np.max(times),
    }


result = {
    "all_in_tag の処理速度": compute_statistics(all_in_tag_times),
    "l_company_tag の処理速度": compute_statistics(l_company_tag_times),
    "過去と同じタグの件数": existing_tag_count,
}
print(result)

# 結果を出力
import pandas as pd

df = pd.DataFrame(result)
print(df)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?