0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DynamoDBのGSI(疎なインデックス)を活用した社内RAGチャット履歴参照機能

Posted at

DynamoDBのGSI(疎なインデックス)を活用したチャットログ設計

1. はじめに

背景:大手化粧品企業の社内業務規則検索チャットボット

大手化粧品企業では、膨大な社内業務規則(労務管理マニュアル、コンプライアンス規定、製造ガイドラインなど)を、従業員が必要なときに素早く参照できる環境が求められています。従来は数百ページに及ぶPDFマニュアルから該当箇所を探す必要があり、時間がかかっていました。

そこで、**AWSのナレッジベース(Amazon Bedrock Knowledge Bases)**を活用したRAG(Retrieval-Augmented Generation)チャットボットを開発することになりました。従業員が自然言語で質問すると、関連する社内規則を自動で検索し、正確な回答を生成するシステムです。

チャットログ管理の課題

このようなAIチャットボットを開発する際、ユーザーとの会話ログをどのように保存・管理するかは重要な課題です。ログは以下の目的で必要となります:

  • 📊 利用状況の分析: どの部署でどんな質問が多いのか
  • 🔍 監査対応: コンプライアンス上、会話履歴の保存が必要
  • 💡 サービス改善: よくある質問の傾向を把握し、ナレッジベースを改善

しかし、全従業員が日々利用すれば会話データは膨大になり、スケーラビリティとコストの両立が求められます。

本記事では、AWSのDynamoDBを採用し、以下の要件を満たすスケーラブルなチャットログのデータベース設計を行った際の考え方をまとめます。

要件

  • 要件1: 特定の会話(conversation_id)を指定すると、その会話のメッセージ全体を時系列で高速に取得したい
  • 要件2: 特定のユーザー(user_id)を指定すると、そのユーザーの会話一覧を時系列で取得したい
  • 要件3: コストは可能な限り抑え、将来的なデータ量増加にも耐えられる(スケーラブルである)こと

2. ログデータの基本スキーマ

保存するメッセージデータの基本的なJSON構造は以下のように定義しました。実際のプロジェクトで使用している構造をベースにしています。

{
  "message_id": "uuid",
  "conversation_id": "uuid",
  "user_id": "string",
  "timestamp": "datetime",
  "message_type": "user|assistant",
  "message_content": "text",
  "metadata": {
    "company_id": "string",
    "working_status_id": "string",
    "genre_id": "string",
    "rag_sources": [
      {
        "source_pdf_name": "string",
        "snippet": "string"
      }
    ]
  }
}

各フィールドの説明

  • message_id: メッセージの一意識別子(UUID)
  • conversation_id: 会話セッションID(UUID)
  • user_id: ユーザーID(Microsoft Entra IDから取得)
  • timestamp: メッセージのタイムスタンプ(ISO 8601形式: 2024-01-02T06:04:05.000Z
  • message_type: user または assistant(ユーザーの質問か、AIの回答か)
  • message_content: メッセージ本文
  • metadata: 分析・監査用の追加情報
    • company_id: 所属会社ID(グループ企業が複数ある場合の識別用)
    • working_status_id: 雇用形態ID(正社員/契約社員/派遣社員など)
    • genre_id: 質問ジャンルID(労務/コンプライアンス/製造など、フロントエンドで選択)
    • rag_sources: RAG(Amazon Bedrock Knowledge Bases)が参照したソース情報
      • source_pdf_name: 参照した規則マニュアルのPDF名(例: 労務管理規定_2024.pdf
      • snippet: 実際に参照したテキストの抜粋(トレーサビリティ確保)

3. なぜDynamoDBか?

今回、いくつかのAWSデータベースサービス(RDS, OpenSearchなど)を比較検討しましたが、最終的にDynamoDBを採用しました。

DynamoDBの主な特徴

  • 無限にスケール: 完全マネージド型のサーバレスDBで、テーブルサイズが10GBを超えると自動でパーティションが分割・拡張されます
  • 高速なキー検索: プライマリキー(PK, SK)に基づいた検索が非常に高速です
  • 低コスト(特にサーバレス): オンデマンドキャパシティの場合、リクエスト量とストレージ量に応じた従量課金であり、利用が少ない場合のコストを非常に低く抑えられます

ただし、DynamoDBはプライマリキーの設計が命です。キーに基づかない検索(例: message_contentでのテキスト検索)は非常に苦手としています。

💡 将来の拡張性について

本プロジェクトでは、現時点では「特定の会話を取得」「ユーザーの会話一覧を取得」というキーベースの検索のみを要件としていますが、将来的にチャット内容の全文検索(「過去に○○について質問したメッセージを検索」など)が必要になった場合は、DynamoDB + OpenSearchのハイブリッド構成への移行を検討しています。

DynamoDB Streamsを使えば、既存のDynamoDBテーブルからOpenSearchへデータを非同期で同期できるため、後からの追加も容易です。プライマリストレージはDynamoDBのまま維持できるため、データの永続性と信頼性を保ちながら、全文検索機能を追加できます。

4. 設計(1): メインテーブル(要件1の実現)

まず、**要件1: 「特定の会話を時系列で取得」**を実現するためのメインテーブルを設計します。

DynamoDBでは、パーティションキー(PK)とソートキー(SK)の組み合わせでデータを効率的に格納・取得します。

  • Partition Key (PK): conversation_id
  • Sort Key (SK): timestamp

この設計により、conversation_idが同じメッセージ(=特定の会話)は、すべて同じパーティションにtimestamp順でソートされて格納されます。conversation_idを指定してQueryを実行するだけで、コスト効率よく(1回のQueryで)特定の会話ログを時系列で取得できます。

5. 設計(2): GSIと「疎なインデックス」(要件2の実現)

次に、**要件2: 「ユーザーごとの会話一覧を取得」**を考えます。

🚨 やってはいけない「Scan」

user_idはメインテーブルのキーになっていないため、単純に探そうとするとScan(テーブル全件検索)になってしまいます。

Scanを避けるべき理由:

  1. 高コスト: Scanはテーブル内の全アイテムを読み込むため、データが100万件あれば100万件分の読み込み料金がかかります
  2. パフォーマンス低下: Scanはテーブルの読み込み能力(スループット)を大量に消費し、本来高速なはずの他のQuery(要件1の取得など)を妨害(スロットリング)する可能性があります

💡 GSI (グローバルセカンダリインデックス) の活用

この問題を解決するのがGSIです。GSIは、メインテーブルとは異なるPK/SKを持った、データの「コピー」(インデックス)を作成する機能です。

しかし、単純にuser_idをPKにしたGSIを作ると、すべてのメッセージがGSIにコピーされてしまい、ストレージコストが単純に2倍になってしまいます。

✨ 「疎なインデックス (Sparse Index)」テクニック

私たちが本当に欲しいのは「ユーザーごとの会話一覧(=各会話の最初のメッセージ)」であり、全メッセージではありません。

そこで、「会話の最初のメッセージにだけ、特定の目印(属性)を付与する」というテクニックを使います。

DynamoDBのGSIは、GSIのキーとして定義された属性が存在するアイテムだけをインデックスの対象とします。これを利用し、「疎な(まばらな)インデックス」を作成します。

具体的な実装:

  1. アプリケーション側で、各会話の最初のメッセージを書き込む時だけ、gsi_pk_userという属性を追加します
  2. このgsi_pk_userをPK、timestampをSKとするGSIを作成します
  • GSI Partition Key (GSI-PK): gsi_pk_user (値には user_id を入れる)
  • GSI Sort Key (GSI-SK): timestamp

データ格納イメージ

例えば、ユーザーU456が2つの会話(A, B)を行った場合、データは以下のようになります。

▼ 会話Aの「最初の」メッセージ

{
  "conversation_id": "A123",
  "timestamp": "2025-11-05T09:00:00Z",
  "user_id": "U456",
  "message_content": "こんにちは",
  "gsi_pk_user": "U456"  // <-- 目印!
}

▼ 会話Aの「2番目以降の」メッセージ

{
  "conversation_id": "A123",
  "timestamp": "2025-11-05T09:00:05Z",
  "user_id": "U456",
  "message_content": "調子はどう?"
  // "gsi_pk_user" 属性は「存在しない」
}

▼ 会話Bの「最初の」メッセージ

{
  "conversation_id": "B789",
  "timestamp": "2025-11-05T10:30:00Z",
  "user_id": "U456",
  "message_content": "DynamoDBについて教えて",
  "gsi_pk_user": "U456"  // <-- 目印!
}

結果:

  • メインテーブルには、会話AとBの全てのメッセージが格納されます
  • GSIには、gsi_pk_user属性を持つ**「最初のメッセージ」2件だけ**がコピーされます

これにより、GSIに対して gsi_pk_user = "U456"Queryを実行するだけで、**要件2: 「ユーザー(U456)の会話一覧(の先頭メッセージ)を時系列で」**を、低コストかつ高速に取得できるようになりました。

GSIのプロジェクション設定でさらにコスト削減

GSIには「プロジェクション」という設定があり、どの属性をGSIにコピーするかを制御できます。

  • KEYS_ONLY: PKとSKのみをコピー(最小コスト)
  • INCLUDE: 指定した属性のみをコピー
  • ALL: すべての属性をコピー(最大コスト)

今回のユースケースでは、会話一覧を取得する際に必要なのは conversation_idtimestamp だけです。メッセージ本文は不要なため、プロジェクション設定を KEYS_ONLY または INCLUDEconversation_id のみを含めることで、GSIのストレージコストをさらに削減できます。

詳しいメッセージ内容が必要な場合は、取得した conversation_id を使って、メインテーブルに対して追加のQueryを実行すればよいのです。

「最初のメッセージ」の判定ロジック

アプリケーション側で「最初のメッセージ」を判定する方法はいくつかあります。

方法1: フロントエンドからフラグを送る

新しい会話を開始する際、フロントエンドで新しい conversation_id (UUID等) を生成し、最初のメッセージ送信時に is_first_message: true フラグを付けて送信します。

方法2: バックエンドで既存会話をチェック

メッセージ書き込み前に、DynamoDBで該当する conversation_id が既に存在するかを確認し、存在しない場合のみ gsi_pk_user を付与します。

推奨アプローチ: 方法1の方がシンプルで効率的です。追加のQuery(既存会話チェック)が不要なため、書き込みコストと遅延を削減できます。

6. 他のDBサービスとの比較

参考までに、今回比較検討した他のAWSサービスとの使い分けをまとめます。

サービス データベース種別 主な強み 検索の柔軟性 コスト(低利用時) 適したユースケース
Amazon DynamoDB NoSQL (KVS/Doc) 高速な読み書き、スケーラビリティ △ (キー検索は◎) ◎ (非常に安い) 時系列ログ保存、ユーザー別データ取得
Amazon RDS RDB (SQL) SQLによる柔軟な集計・分析 ○ (SQLは得意) △ (中) 複雑な集計、レポート生成、JOIN処理
Amazon OpenSearch 検索エンジン 全文検索、あいまい検索 ◎ (検索特化) ✕ (高い) ログ全文検索、ダッシュボード分析
Amazon DocumentDB NoSQL (Doc) JSONの柔軟な操作 (MongoDB互換) ○ (JSONクエリ) ✕ (高い) MongoDB移行、複雑なJSON操作

ユースケース別の推奨

  • DynamoDB単体:
    今回のような「ログの確実な保存」と「低コストな時系列取得」が最優先の場合に最適です。データ量が増えても自動でスケールし、コストも従量課金で無駄がありません。

  • DynamoDB + OpenSearchのハイブリッド構成:
    「ログは確実に保存したいが、キーワード検索もしたい」という場合は、DynamoDBをプライマリストレージとし、DynamoDB StreamsでOpenSearchに非同期でデータを流す構成が一般的です。障害時もDynamoDBにデータは残るため、データロストのリスクを最小化できます。

  • RDSを選ぶべきケース:
    もし「部署ごとや日ごとのメッセージ数を集計したい」「ユーザー属性とJOINして分析したい」といったSQLによる複雑な集計・分析がメイン要件であれば、RDSが適しています。ただし、最小インスタンスでも常時起動コストがかかります。

  • OpenSearchの活用場面:
    過去の質問と回答をキーワードで全文検索したい場合や、OpenSearch Dashboardsで以下のような分析を行いたい場合に適しています:

    • ワードクラウド(質問に含まれる単語の頻出度を可視化)
    • よく聞かれる質問ランキング
    • カテゴリ別の円グラフ

7. DynamoDBの制約事項と注意点

DynamoDBは優れたデータベースですが、すべてのケースに適しているわけではありません。設計前に以下の制約事項を理解しておくことが重要です。

複雑な集計クエリが苦手

DynamoDBは「特定のキーで高速に取得」することに最適化されているため、以下のようなSQLで簡単に実現できる操作は困難です:

  • 部署別の月間メッセージ数の集計
  • ユーザーテーブルとJOINした分析
  • GROUP BY、AVG、SUMなどの集計関数

このような要件がある場合は、分析用にRDSやAthena(S3 + SQLクエリ)との併用を検討してください。

全文検索は不可

メッセージ本文のキーワード検索(例: 「契約に関する質問を検索」)はDynamoDBだけでは実現できません。全文検索が必要な場合は、OpenSearchとの併用が必須です。

ホットパーティション問題

特定のユーザーが極端に多くの会話を開始する場合、そのユーザーのGSIパーティション(gsi_pk_user)が「ホット」になり、スロットリング(リクエスト制限)が発生する可能性があります。

対策: 非常に高トラフィックの場合は、gsi_pk_user に日付をサフィックスとして追加(例: U456#2025-11-05)することで、パーティションを分散できます。

TTL(Time to Live)の活用

チャットログを永久に保存する必要がない場合、DynamoDBのTTL機能を使えば、一定期間経過後のログを自動削除でき、長期的なストレージコストを削減できます。

例えば、監査要件に応じて2年後に自動削除する、といった設定が可能です。TTL設定は ttl 属性に削除したいタイムスタンプ(Unixエポック秒)を設定するだけで実現できます。

8. 実装例

ここでは、Python(boto3)を使った実装例を示します。

テーブル作成(Terraform/CloudFormation推奨)

実際の運用では、Terraformやcloudformationでテーブルを作成することを推奨しますが、参考までにboto3での作成例を示します。

import boto3

dynamodb = boto3.client('dynamodb', region_name='ap-northeast-1')

# テーブル作成
response = dynamodb.create_table(
    TableName='ChatMessages',
    KeySchema=[
        {'AttributeName': 'conversation_id', 'KeyType': 'HASH'},   # Partition Key
        {'AttributeName': 'timestamp', 'KeyType': 'RANGE'}         # Sort Key
    ],
    AttributeDefinitions=[
        {'AttributeName': 'conversation_id', 'AttributeType': 'S'},
        {'AttributeName': 'timestamp', 'AttributeType': 'S'},
        {'AttributeName': 'gsi_pk_user', 'AttributeType': 'S'}
    ],
    GlobalSecondaryIndexes=[
        {
            'IndexName': 'UserConversationsIndex',
            'KeySchema': [
                {'AttributeName': 'gsi_pk_user', 'KeyType': 'HASH'},
                {'AttributeName': 'timestamp', 'KeyType': 'RANGE'}
            ],
            'Projection': {
                'ProjectionType': 'INCLUDE',
                'NonKeyAttributes': ['conversation_id', 'message_content']
            }
        }
    ],
    BillingMode='PAY_PER_REQUEST',  # オンデマンドモード
    StreamSpecification={
        'StreamEnabled': True,
        'StreamViewType': 'NEW_AND_OLD_IMAGES'  # OpenSearch連携用
    }
)

メッセージの書き込み

from datetime import datetime
import boto3

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('ChatMessages')

def save_message(conversation_id, user_id, message_content, message_type, is_first_message=False):
    """
    メッセージをDynamoDBに保存
    
    Args:
        conversation_id: 会話ID
        user_id: ユーザーID
        message_content: メッセージ本文
        message_type: "user" or "assistant"
        is_first_message: 会話の最初のメッセージかどうか
    """
    item = {
        'conversation_id': conversation_id,
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'user_id': user_id,
        'message_type': message_type,
        'message_content': message_content
    }
    
    # 最初のメッセージの場合のみ、GSI用のキーを追加(疎なインデックス)
    if is_first_message:
        item['gsi_pk_user'] = user_id
    
    table.put_item(Item=item)
    print(f"Message saved: {conversation_id}")

# 使用例:新しい会話の開始
save_message(
    conversation_id='conv_20251105_001',
    user_id='U456',
    message_content='DynamoDBについて教えて',
    message_type='user',
    is_first_message=True  # 最初のメッセージ
)

# 使用例:同じ会話の2番目以降のメッセージ
save_message(
    conversation_id='conv_20251105_001',
    user_id='U456',
    message_content='DynamoDBは...',
    message_type='assistant',
    is_first_message=False  # 2番目以降
)

特定の会話のメッセージを時系列で取得

def get_conversation_messages(conversation_id):
    """
    特定の会話のメッセージ全体を時系列で取得
    
    Args:
        conversation_id: 会話ID
        
    Returns:
        メッセージのリスト
    """
    response = table.query(
        KeyConditionExpression='conversation_id = :conv_id',
        ExpressionAttributeValues={
            ':conv_id': conversation_id
        },
        ScanIndexForward=True  # 時系列順(昇順)
    )
    
    return response['Items']

# 使用例
messages = get_conversation_messages('conv_20251105_001')
for msg in messages:
    print(f"[{msg['timestamp']}] {msg['message_type']}: {msg['message_content']}")

ユーザーの会話一覧を取得(GSI使用)

def get_user_conversations(user_id, limit=20):
    """
    特定ユーザーの会話一覧を最新順で取得
    
    Args:
        user_id: ユーザーID
        limit: 取得する会話数の上限
        
    Returns:
        会話一覧(各会話の最初のメッセージ)
    """
    response = table.query(
        IndexName='UserConversationsIndex',  # GSIを使用
        KeyConditionExpression='gsi_pk_user = :user_id',
        ExpressionAttributeValues={
            ':user_id': user_id
        },
        ScanIndexForward=False,  # 最新順(降順)
        Limit=limit
    )
    
    return response['Items']

# 使用例
conversations = get_user_conversations('U456')
for conv in conversations:
    print(f"会話ID: {conv['conversation_id']}")
    print(f"開始時刻: {conv['timestamp']}")
    print(f"最初のメッセージ: {conv['message_content']}\n")

フロントエンドでの会話ID生成例(TypeScript)

import { v4 as uuidv4 } from 'uuid';

// 新しい会話を開始
function startNewConversation() {
  const conversationId = `conv_${Date.now()}_${uuidv4().slice(0, 8)}`;
  return conversationId;
}

// 最初のメッセージ送信時
const conversationId = startNewConversation();
await sendMessage({
  conversationId,
  userId: currentUser.id,
  messageContent: userInput,
  messageType: 'user',
  isFirstMessage: true  // フラグを明示的に送る
});

このように、アプリケーション側で「最初のメッセージかどうか」を判定し、gsi_pk_user属性を条件付きで追加することで、疎なインデックスを実現しています。

9. まとめ

本記事では、大手化粧品企業の社内業務規則検索チャットボットの開発経験をもとに、DynamoDBのGSI(グローバルセカンダリインデックス)における「疎なインデックス」テクニックを活用した、スケーラブルなチャットログ設計を紹介しました。

記事のポイント

  1. プライマリキーの適切な設計

    • メインテーブルでは conversation_id(PK)と timestamp(SK)で会話ごとの時系列取得を実現
  2. 疎なインデックスによるコスト最適化

    • 会話の最初のメッセージにのみ gsi_pk_user 属性を付与
    • GSIには全メッセージではなく、必要なデータ(会話一覧)のみをコピー
    • 結果として、ストレージコストを大幅に削減
  3. 実用的な実装方法

    • フロントエンドで会話IDを生成し、is_first_message フラグで判定
    • boto3を使った具体的なコード例で、すぐに実装可能
  4. 制約事項の理解

    • 複雑な集計や全文検索にはRDSやOpenSearchとの併用を検討
    • ホットパーティション問題への対策(日付サフィックスなど)
    • TTLでの自動削除によるコスト削減

適用できる他のユースケース

この「疎なインデックス」パターンは、チャットログ以外にも応用できます:

  • 社内ヘルプデスク: 全サポートメッセージをチケット別に保存し、未解決チケットの最初のメッセージのみGSIで管理
  • 顧客サポートチャット: 全会話をセッション別に保存し、エスカレーション案件の会話のみGSIで抽出
  • IoTセンサーデータ: 全データはdevice_id別に保存し、アラート時のデータのみGSIに格納
  • Eコマース: 全注文履歴はuser_id別に保存し、返品・クレーム案件のみGSIで管理
  • タスク管理: 全タスクをproject_id別に保存し、未完了タスクのみGSIで効率的に取得

DynamoDBは、プライマリキー設計とGSIの活用次第で、非常に強力かつコスト効率の良いデータベースとなります。特に「疎なインデックス」は、コスト最適化の重要なテクニックです。

Amazon Bedrock Knowledge Basesを使ったRAGアプリケーションや、その他の会話型AIシステムを開発する際には、ぜひこの設計パターンを検討してみてください!

10. 参考リンク

公式ドキュメント

AWS技術記事

コミュニティ記事

その他

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?