0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

今更だからこそRAGを作ってみよう~GCPオンリー編~

Posted at

みなさんこんにちは。

本当に今更ながらRAGを作っていきます。
今回はGCP上で完結する作りにします。

ただ、やってみて今更作るからこそ得られる利点というのもあるのかなと感じました。
そこをお伝えできれば嬉しいです。

ざっくり背景

社内でAI活用の流れ->実装楽で使いやすい->RAG
という感じです。

実際には使ってもらうところが難しいのですが、情報検索は業務中によくやることだと思うので、アプローチの一つとして社内で知ってもらえたらいいなと思ってます。

使用ツールはSlackとNotionです。
Notion上のデータをSlackから検索できるようにするが目的です。
APIさえあれば、任意のツールで置き換え可能だと思います。(SDKがあるとなお良し)

それでは参ります。

構成

今回は二段組です。
データ取得部分とデータ検索部分です。

データ取得

データ取得はCloud Schedulerによって定期的にJobを実行し、NotionのDBをベクトル化する流れです。
overview_exploit.png

データ検索

データ検索はSlackのメンションを受けてCloud Run側で結果をとってくる形にしてます。
うho.png

下準備

  1. Bigquery
    1. データセット作る
    2. NotionのDBと対応するテーブルを作っておく

データ取得部分の作成

外部モデルの追加

まずはBigQueryにVertexAIとの接続情報を設定する必要があります。

エクスプローラの横3点リーダーからデータを追加します。

Screenshot 2025-05-14 at 11.23.52.png

次にリソースのところからvertex AIを検索します。

Screenshot 2025-05-14 at 11.24.15.png

接続情報を入力して完了です。

Screenshot 2025-05-14 at 11.24.30.png

リージョンを指定すると各リージョンでのデータを結合したりできないが、コストが計算しやすく、レイテンシも低めらしいですよ!
ちなみにリージョンはus-central1がおすすめです。理由は生成系の言語モデルを使いやすいリージョンだからです。asia-northeast1とかだと、一部言語モデルが使えなかったりします。

接続設定に存在するサービスアカウントに対してVertex AIの権限を付与

gcloud projects add-iam-policy-binding <プロジェクト> \
[--member="serviceAccount:<BIG QUERYのサービスアカウント>](mailto:--member=%22serviceAccount:<BIG QUERYのサービスアカウント>)" \
--role="roles/aiplatform.user"

新規で埋め込みモデルを追加 on BigQuery

今回は日本語使いたいのでmultilingualにしちゃいます。

CREATE OR REPLACE MODEL `<プロジェクト>.<データセット>.<保存したい名前>`
REMOTE WITH CONNECTION `projects/<プロジェクト>/locations/us-central1/connections/<作った接続名>`
OPTIONS (ENDPOINT = 'text-multilingual-embedding-002')

ベクトル作成部分

BigQuery ML で on-the-fly 埋め込み

BigQuery ML の組み込み関数 ML.GENERATE_EMBEDDING を使います。

軽く特徴や注意点をまとめます。

  • content列がembeddingされる
  • content列は一つのレコードでもNULLまたは空文字だと全処理が失敗
  • ML.GENERATE_EMBEDDINGはテーブルをレスポンスする関数なのでFROM句で使う
  • output_dimensionalityを低めに設定しておかないと文章量に依存して時間もお金もかかる(注意)
    • いうてもそこまで課金されない(はず)
CREATE OR REPLACE TABLE `プロジェクトID.データセット.pages_embedded` AS
SELECT
  page_id,
  section,
  event_date,
  content,
  ml_generate_embedding_result AS embedding
FROM
  ML.GENERATE_EMBEDDING(
    MODEL `プロジェクトID.データセット.REMOTE_MODEL_ID`,
    (
      -- サブクエリで content 列を指定
      SELECT
        page_id,
        section,
        event_date,
        content_md AS content
      FROM
        `プロジェクトID.データセット.pages`
			WHERE content_md IS NOT NULL
      AND TRIM(content_md) != ''
    ),
    STRUCT(
      TRUE AS flatten_json_output,
      16 AS output_dimensionality
    )
  );

(補足)ベクターインデックス作成

インデックス作るなら最低5000件のドキュメントがいるらしいです!
場合によっては作りましょう。私は今回スキップしてますので補足的に紹介。

  • num listsはクラスターの数的な意味
  • 多い方が精度は上がるが速度が下がるトレードオフを持つ

chatGPT曰く…

何か参考になるやも。

データ件数(embeddingの行数) 推奨 num_lists
〜10,000件 10〜50
10,000〜100,000件 50〜200
100,000件〜 200〜1000

一応クエリも載せときます。
distanceはユークリッド距離と内積とコサイン距離があるみたいです。

CREATE OR REPLACE VECTOR INDEX pages_vec_idx
  ON `プロジェクトID.データセット.pages_embedded`(embedding)
  OPTIONS(
    index_type   = "IVF",
    distance_type= "COSINE",
    ivf_options  = '{"num_lists":10}'
  );

データを取得するスクリプト

こやつをCloud Run Jobで待機させます。
secret managerのデータ取得とか、クエリのところとかちょっとだせえですけど。

テーブル構成がこうなってます。

Screenshot 2025-05-14 at 11.40.59.png

ちなみにregionはus-central1をお勧めします。
言語モデル使えないことがあるので!!

import os, re, json
from notion2md.exporter.block import StringExporter
from notion_client import Client
from google.cloud import bigquery, secretmanager

PROJECT     = os.environ["GCP_PROJECT"]
DATASET     = os.environ.get("BQ_DATASET", "<データセット>")
TABLE       = os.environ.get("BQ_TABLE",   "pages")
EMB_MODEL   = os.environ.get("EMB_MODEL", "emb")
DATABASE_ID = os.environ["NOTION_DATABASE_ID"]
TOKEN_SECRET= os.environ["NOTION_TOKEN_SECRET"]
LOCATION    = os.environ.get("BQ_LOCATION", "us-central1")

bq = bigquery.Client(project=PROJECT, location=LOCATION)
sm = secretmanager.SecretManagerServiceClient()

def get_notion_client():
    secret = sm.access_secret_version(
        name=f"projects/{PROJECT}/secrets/{TOKEN_SECRET}/versions/latest"
    ).payload.data.decode()
    return Client(auth=secret), secret

def truncate_table():
    print("❗️ 既存テーブルのレコード削除中...")
    bq.query(f"TRUNCATE TABLE `{PROJECT}.{DATASET}.{TABLE}`").result()

def fetch_and_insert_pages(notion: Client, secret: str):
    print("📥 ページの取得・挿入を開始...")
    cursor = None
    while True:
        resp = notion.databases.query(
            database_id=DATABASE_ID,
            start_cursor=cursor,
            page_size=100,
            sorts=[{"timestamp":"last_edited_time","direction":"ascending"}],
        )

        rows = []
        for page in resp["results"]:
            md_text = StringExporter(
                token=secret,
                block_id=page["id"],
                output_path="./md_pages",
                download=False
            ).export()

            # 画像リンク除去
            img_pat = re.compile(r'!\[.*?\]\(.*?\)')
            cleaned = [img_pat.sub('', l) for l in md_text.splitlines() if img_pat.sub('', l).strip()]
            md_text = "\n".join(cleaned)

            p = page["properties"]
            raw = p["日時"]["date"]["start"]
            if raw and len(raw)==10: raw += "T00:00:00+09:00"

            rows.append({
                "page_id":   page["id"],
                "title":     p["事例"]["title"][0]["plain_text"] if p["事例"]["title"] else None,
                "section":   [s["name"] for s in p[""]["multi_select"]],
                "author":    [a["name"] for a in p["作成者"]["people"]],
                "event_date": raw,
                "content_md": md_text,
                "updated_at": page["last_edited_time"],
            })

        if rows:
            errors = bq.insert_rows_json(
                f"{PROJECT}.{DATASET}.{TABLE}",
                rows,
                row_ids=[r["page_id"] for r in rows],
            )
            if errors:
                raise RuntimeError(errors)

        if not resp["has_more"]:
            break
        cursor = resp["next_cursor"]

def run_post_ingest_query():
    query = f"""
        CREATE OR REPLACE TABLE `{PROJECT}.{DATASET}.{TABLE}_embedded` AS
            SELECT
                page_id,
                section,
                event_date,
                content,
                ml_generate_embedding_result AS embedding
            FROM
                ML.GENERATE_EMBEDDING(
                    MODEL `{PROJECT}.{DATASET}.{EMB_MODEL}`,
                    (
                        SELECT
                            page_id,
                            section,
                            event_date,
                            content_md AS content
                        FROM
                            `{PROJECT}.{DATASET}.{TABLE}`
                        WHERE content_md IS NOT NULL
                        AND TRIM(content_md) != ''
                    ),
                    STRUCT(
                        TRUE AS flatten_json_output,
                        16 AS output_dimensionality
                    )
                ); 
    """
    print("📊 クエリ実行中...")
    result = bq.query(query).result()
    for row in result:
        print(dict(row))

def main():
    notion, secret = get_notion_client()
    truncate_table()
    fetch_and_insert_pages(notion, secret)
    run_post_ingest_query()

if __name__ == "__main__":
    try:
        main()
        print("✅ sync 完了")
    except Exception as e:
        print(f"❌ エラー発生: {e}")
        raise e  

データ検索部分

boltというSlack App用のフレームワークを使います。

slackアプリ

import os
from slack_bolt import App
from slack_bolt.adapter.flask import SlackRequestHandler
from flask import Flask, request
from gemini_rag import run_summary  # ここは既存関数でOK!

# 環境変数からSlackトークン読む
app = App(
    token=os.environ["SLACK_BOT_TOKEN"],
    signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
flask_app = Flask(__name__)
handler = SlackRequestHandler(app)

@app.event("app_mention")
def handle_mention(event, say, ack, client):
    ack()  

    client.chat_postMessage(
        channel=event["channel"],
        thread_ts=event["ts"],
        text="ご質問ありがとうございます。少々お待ちください、ただいま内容を確認しております。"
    )

    user_query = event["text"]
    results = run_summary(user_query)
    
    # スレッド内に再返信
    client.chat_postMessage(
        channel=event["channel"],
        thread_ts=event["ts"],
        text=results
    )

@flask_app.route("/slack/events", methods=["POST"])
def slack_events():
    return handler.handle(request)

if __name__ == "__main__":
    flask_app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Gemini

ここは2点くらい注目ポイントがあって、一つはVertex AI側のGemini使ってるとこですね。
bigquery側のGenerativeモデルはレコード単位しか適用できないっぽくて一発で取りたいなってことでvertexの方使ってます(調査不足やったらすみません)。

もう一個はfunction callingでstructed outputしてるところです。
これ結構便利で、構造的な出力を使ってるので後続の処理がやりやすいんですね。

あとクエリの書き方はちょっとどうにかしたいですね。

from google.cloud import bigquery
from vertexai.preview.generative_models import GenerationConfig, Tool, FunctionDeclaration, GenerativeModel
import vertexai
import os

# 環境初期化
PROJECT = os.environ.get("GCP_PROJECT", "<環境>")
LOCATION = os.environ.get("GCP_REGION", "us-central1")

structured_output = Tool(
    function_declarations=[
        FunctionDeclaration(
            name="return_matched_use_cases",
            description="与えられたクエリに基づいて、関連する活用事例がある場合はそれを返します。",
            parameters={
                "type": "object",
                "properties": {
                    "matched_use_cases": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "page_id": {"type": "string"},
                                "summary": {"type": "string"},
                            },
                            "required": ["page_id", "summary"],
                        }
                    }
                },
                "required": ["matched_use_cases"],
            },
        )
    ]
)

vertexai.init(project=PROJECT, location=LOCATION)
bq_client = bigquery.Client(project=PROJECT, location=LOCATION)
model = GenerativeModel("gemini-2.0-flash")

def run_summary(user_query: str) -> str:
    query = get_sql_query(user_query, PROJECT)
    print(f"Query will be executed: {query}")
    results = bq_client.query(query).result()
    results = list(bq_client.query(query).result())
    print(f"Number of rows: {len(results)}")
    print(f"Results: {results}")
    docs = [{"page_id": row.page_id, "content": row.content} for row in results]

    if not docs:
        return ""

    prompt = f"""以下の文書の中から「{user_query}」に近い活用事例を選び、summaryとpage_idを返してください。
    
    summaryは日本語でなるべく簡潔に記載すること。
    
    """
    for doc in docs:
        prompt += f"[page_id: {doc['page_id']}]\n{doc['content']}\n---\n"

    full_prompt = f"""
    {prompt}
    出力形式:
    {{
    "matched_use_cases": [
        {{ "page_id": "abc123", "summary": "..." }},
        ...
    ]
    }}
    該当なしの場合は"matched_use_cases": [] を返してください
    """

    response = model.generate_content(
        full_prompt,
        tools=[structured_output],
        generation_config=GenerationConfig(temperature=0.7, max_output_tokens=1000)
    )

    content_part = response.candidates[0].content.parts[0]

    summaries = []
    if content_part.function_call:
        summaries = content_part.function_call.args["matched_use_cases"]

    # チャットっぽい整形
    messages = ""
    if summaries:
        messages += "以下の活用事例が見つかりました。\n\n"
        for i, item in enumerate(summaries, 1):
            # NotionのページIDを置き換え
            url_id = item["page_id"].replace("-", "")
            msg = f"{i}. {item['summary']}\n事例はこちらから👉 https://www.notion.so/{url_id}"
            messages += msg + "\n\n"
    else:
        messages += "申し訳ありませんが、関連する活用事例は見つかりませんでした。"
    
    return messages


def get_sql_query(user_query: str, project_id: str) -> str:
    return f"""
        CREATE TEMP TABLE query_embedding AS
        SELECT ml_generate_embedding_result AS embedding
        FROM ML.GENERATE_EMBEDDING(
        MODEL `{project_id}.<データセット名>.emb`,
        (SELECT '{user_query}' AS content),
        STRUCT(TRUE AS flatten_json_output, 16 AS output_dimensionality)
        );

        WITH result AS (
        SELECT
            p.page_id AS page_id,
            p.section AS section,
            p.event_date AS event_date,
            p.content AS content,
            result.distance AS distance
        FROM VECTOR_SEARCH(
                TABLE query_embedding,
                'embedding',
                TABLE `{project_id}.<データセット名>.pages_embedded`,
                'embedding',
                top_k => 5,
                distance_type => 'COSINE',
                options => '{{"use_brute_force":true}}'
            ) AS result
        JOIN `{project_id}.<データセット名>.pages_embedded` AS p
        ON p.page_id = result.query.page_id
        ),
        ranked AS (
        SELECT *, ROW_NUMBER() OVER (ORDER BY distance ASC) AS rn
        FROM result
        )
        SELECT *
        FROM ranked
        WHERE rn <= 5;
    """

if __name__ == "__main__":
    result = run_summary("言語モデルのアーキテクチャについて")

    print(result)

Cloud Runを作る

上記の内容をArtifact Registry上にコンテナとして展開し、コンテナを使って起動できるようにします。
あと、最低インスタンス数は1にしておくといいです。(これならGCEでもいいかも)
してないとslackの3秒以内にレスポンス返してね、に引っかかって立ち上がってる間に3回くらいリクエストが飛びます。

Slack Appの設定を変更

Features->Event SubscriptionsにてEnable Eventsをオンにして、Request URLにCloud Runのリンク+上記実装例であれば/slack/eventsをつけたものを設定します。

例: https://cloudrunちゃん.app/slack/event

ここでチャレンジが成功したらokです。

重要: アプリをインストールする

Settings->Install Appから組織にアプリをインストールします。

メンションしてみてください!きましたか??

まとめ

今やって嬉しいこととして、やっぱり環境が整ってることが大きいですね。
前だったらVectorDBの準備とかもしないといけなかったですし。
今ならほぼほぼBigqueryに入れちゃえばおっけーって感じなので、作りやすいです。

あとfunction callingやstructed outputのような出力や行動を制御するための仕組みが導入されているので尤度に負けない安定した挙動が目指せます。

仲間を募集中!!

株式会社ホープでは、福岡で働くエンジニアを募集中です。
ぜひ、求人を見てみてください!
▼ Wantedly求人
https://www.wantedly.com/projects/1684027
▼ コーポレートサイト
https://www.zaigenkakuho.com/recruit/
「自治体を通じて人々に新たな価値を提供し、会社及び従業員の成長を追求する」
この理念の実現に向けて、今後も自治体の課題解決に取り組んでいきます。
ご応募お待ちしております!

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?