みなさんこんにちは。
本当に今更ながらRAGを作っていきます。
今回はGCP上で完結する作りにします。
ただ、やってみて今更作るからこそ得られる利点というのもあるのかなと感じました。
そこをお伝えできれば嬉しいです。
ざっくり背景
社内でAI活用の流れ->実装楽で使いやすい->RAG
という感じです。
実際には使ってもらうところが難しいのですが、情報検索は業務中によくやることだと思うので、アプローチの一つとして社内で知ってもらえたらいいなと思ってます。
使用ツールはSlackとNotionです。
Notion上のデータをSlackから検索できるようにするが目的です。
APIさえあれば、任意のツールで置き換え可能だと思います。(SDKがあるとなお良し)
それでは参ります。
構成
今回は二段組です。
データ取得部分とデータ検索部分です。
データ取得
データ取得はCloud Schedulerによって定期的にJobを実行し、NotionのDBをベクトル化する流れです。
データ検索
データ検索はSlackのメンションを受けてCloud Run側で結果をとってくる形にしてます。
下準備
- Bigquery
- データセット作る
- NotionのDBと対応するテーブルを作っておく
データ取得部分の作成
外部モデルの追加
まずはBigQueryにVertexAIとの接続情報を設定する必要があります。
エクスプローラの横3点リーダーからデータを追加します。
次にリソースのところからvertex AIを検索します。
接続情報を入力して完了です。
リージョンを指定すると各リージョンでのデータを結合したりできないが、コストが計算しやすく、レイテンシも低めらしいですよ!
ちなみにリージョンはus-central1
がおすすめです。理由は生成系の言語モデルを使いやすいリージョンだからです。asia-northeast1
とかだと、一部言語モデルが使えなかったりします。
接続設定に存在するサービスアカウントに対してVertex AIの権限を付与
gcloud projects add-iam-policy-binding <プロジェクト> \
[--member="serviceAccount:<BIG QUERYのサービスアカウント>](mailto:--member=%22serviceAccount:<BIG QUERYのサービスアカウント>)" \
--role="roles/aiplatform.user"
新規で埋め込みモデルを追加 on BigQuery
今回は日本語使いたいのでmultilingualにしちゃいます。
CREATE OR REPLACE MODEL `<プロジェクト>.<データセット>.<保存したい名前>`
REMOTE WITH CONNECTION `projects/<プロジェクト>/locations/us-central1/connections/<作った接続名>`
OPTIONS (ENDPOINT = 'text-multilingual-embedding-002')
ベクトル作成部分
BigQuery ML で on-the-fly 埋め込み
BigQuery ML の組み込み関数 ML.GENERATE_EMBEDDING を使います。
軽く特徴や注意点をまとめます。
- content列がembeddingされる
- content列は一つのレコードでもNULLまたは空文字だと全処理が失敗
- ML.GENERATE_EMBEDDINGはテーブルをレスポンスする関数なのでFROM句で使う
- output_dimensionalityを低めに設定しておかないと文章量に依存して時間もお金もかかる(注意)
- いうてもそこまで課金されない(はず)
CREATE OR REPLACE TABLE `プロジェクトID.データセット.pages_embedded` AS
SELECT
page_id,
section,
event_date,
content,
ml_generate_embedding_result AS embedding
FROM
ML.GENERATE_EMBEDDING(
MODEL `プロジェクトID.データセット.REMOTE_MODEL_ID`,
(
-- サブクエリで content 列を指定
SELECT
page_id,
section,
event_date,
content_md AS content
FROM
`プロジェクトID.データセット.pages`
WHERE content_md IS NOT NULL
AND TRIM(content_md) != ''
),
STRUCT(
TRUE AS flatten_json_output,
16 AS output_dimensionality
)
);
(補足)ベクターインデックス作成
インデックス作るなら最低5000件のドキュメントがいるらしいです!
場合によっては作りましょう。私は今回スキップしてますので補足的に紹介。
- num listsはクラスターの数的な意味
- 多い方が精度は上がるが速度が下がるトレードオフを持つ
chatGPT曰く…
何か参考になるやも。
データ件数(embeddingの行数) | 推奨 num_lists |
---|---|
〜10,000件 | 10〜50 |
10,000〜100,000件 | 50〜200 |
100,000件〜 | 200〜1000 |
一応クエリも載せときます。
distanceはユークリッド距離と内積とコサイン距離があるみたいです。
CREATE OR REPLACE VECTOR INDEX pages_vec_idx
ON `プロジェクトID.データセット.pages_embedded`(embedding)
OPTIONS(
index_type = "IVF",
distance_type= "COSINE",
ivf_options = '{"num_lists":10}'
);
データを取得するスクリプト
こやつをCloud Run Jobで待機させます。
secret managerのデータ取得とか、クエリのところとかちょっとだせえですけど。
テーブル構成がこうなってます。
ちなみにregionはus-central1
をお勧めします。
言語モデル使えないことがあるので!!
import os, re, json
from notion2md.exporter.block import StringExporter
from notion_client import Client
from google.cloud import bigquery, secretmanager
PROJECT = os.environ["GCP_PROJECT"]
DATASET = os.environ.get("BQ_DATASET", "<データセット>")
TABLE = os.environ.get("BQ_TABLE", "pages")
EMB_MODEL = os.environ.get("EMB_MODEL", "emb")
DATABASE_ID = os.environ["NOTION_DATABASE_ID"]
TOKEN_SECRET= os.environ["NOTION_TOKEN_SECRET"]
LOCATION = os.environ.get("BQ_LOCATION", "us-central1")
bq = bigquery.Client(project=PROJECT, location=LOCATION)
sm = secretmanager.SecretManagerServiceClient()
def get_notion_client():
secret = sm.access_secret_version(
name=f"projects/{PROJECT}/secrets/{TOKEN_SECRET}/versions/latest"
).payload.data.decode()
return Client(auth=secret), secret
def truncate_table():
print("❗️ 既存テーブルのレコード削除中...")
bq.query(f"TRUNCATE TABLE `{PROJECT}.{DATASET}.{TABLE}`").result()
def fetch_and_insert_pages(notion: Client, secret: str):
print("📥 ページの取得・挿入を開始...")
cursor = None
while True:
resp = notion.databases.query(
database_id=DATABASE_ID,
start_cursor=cursor,
page_size=100,
sorts=[{"timestamp":"last_edited_time","direction":"ascending"}],
)
rows = []
for page in resp["results"]:
md_text = StringExporter(
token=secret,
block_id=page["id"],
output_path="./md_pages",
download=False
).export()
# 画像リンク除去
img_pat = re.compile(r'!\[.*?\]\(.*?\)')
cleaned = [img_pat.sub('', l) for l in md_text.splitlines() if img_pat.sub('', l).strip()]
md_text = "\n".join(cleaned)
p = page["properties"]
raw = p["日時"]["date"]["start"]
if raw and len(raw)==10: raw += "T00:00:00+09:00"
rows.append({
"page_id": page["id"],
"title": p["事例"]["title"][0]["plain_text"] if p["事例"]["title"] else None,
"section": [s["name"] for s in p["課"]["multi_select"]],
"author": [a["name"] for a in p["作成者"]["people"]],
"event_date": raw,
"content_md": md_text,
"updated_at": page["last_edited_time"],
})
if rows:
errors = bq.insert_rows_json(
f"{PROJECT}.{DATASET}.{TABLE}",
rows,
row_ids=[r["page_id"] for r in rows],
)
if errors:
raise RuntimeError(errors)
if not resp["has_more"]:
break
cursor = resp["next_cursor"]
def run_post_ingest_query():
query = f"""
CREATE OR REPLACE TABLE `{PROJECT}.{DATASET}.{TABLE}_embedded` AS
SELECT
page_id,
section,
event_date,
content,
ml_generate_embedding_result AS embedding
FROM
ML.GENERATE_EMBEDDING(
MODEL `{PROJECT}.{DATASET}.{EMB_MODEL}`,
(
SELECT
page_id,
section,
event_date,
content_md AS content
FROM
`{PROJECT}.{DATASET}.{TABLE}`
WHERE content_md IS NOT NULL
AND TRIM(content_md) != ''
),
STRUCT(
TRUE AS flatten_json_output,
16 AS output_dimensionality
)
);
"""
print("📊 クエリ実行中...")
result = bq.query(query).result()
for row in result:
print(dict(row))
def main():
notion, secret = get_notion_client()
truncate_table()
fetch_and_insert_pages(notion, secret)
run_post_ingest_query()
if __name__ == "__main__":
try:
main()
print("✅ sync 完了")
except Exception as e:
print(f"❌ エラー発生: {e}")
raise e
データ検索部分
boltというSlack App用のフレームワークを使います。
slackアプリ
import os
from slack_bolt import App
from slack_bolt.adapter.flask import SlackRequestHandler
from flask import Flask, request
from gemini_rag import run_summary # ここは既存関数でOK!
# 環境変数からSlackトークン読む
app = App(
token=os.environ["SLACK_BOT_TOKEN"],
signing_secret=os.environ["SLACK_SIGNING_SECRET"]
)
flask_app = Flask(__name__)
handler = SlackRequestHandler(app)
@app.event("app_mention")
def handle_mention(event, say, ack, client):
ack()
client.chat_postMessage(
channel=event["channel"],
thread_ts=event["ts"],
text="ご質問ありがとうございます。少々お待ちください、ただいま内容を確認しております。"
)
user_query = event["text"]
results = run_summary(user_query)
# スレッド内に再返信
client.chat_postMessage(
channel=event["channel"],
thread_ts=event["ts"],
text=results
)
@flask_app.route("/slack/events", methods=["POST"])
def slack_events():
return handler.handle(request)
if __name__ == "__main__":
flask_app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
Gemini
ここは2点くらい注目ポイントがあって、一つはVertex AI側のGemini使ってるとこですね。
bigquery側のGenerativeモデルはレコード単位しか適用できないっぽくて一発で取りたいなってことでvertexの方使ってます(調査不足やったらすみません)。
もう一個はfunction callingでstructed outputしてるところです。
これ結構便利で、構造的な出力を使ってるので後続の処理がやりやすいんですね。
あとクエリの書き方はちょっとどうにかしたいですね。
from google.cloud import bigquery
from vertexai.preview.generative_models import GenerationConfig, Tool, FunctionDeclaration, GenerativeModel
import vertexai
import os
# 環境初期化
PROJECT = os.environ.get("GCP_PROJECT", "<環境>")
LOCATION = os.environ.get("GCP_REGION", "us-central1")
structured_output = Tool(
function_declarations=[
FunctionDeclaration(
name="return_matched_use_cases",
description="与えられたクエリに基づいて、関連する活用事例がある場合はそれを返します。",
parameters={
"type": "object",
"properties": {
"matched_use_cases": {
"type": "array",
"items": {
"type": "object",
"properties": {
"page_id": {"type": "string"},
"summary": {"type": "string"},
},
"required": ["page_id", "summary"],
}
}
},
"required": ["matched_use_cases"],
},
)
]
)
vertexai.init(project=PROJECT, location=LOCATION)
bq_client = bigquery.Client(project=PROJECT, location=LOCATION)
model = GenerativeModel("gemini-2.0-flash")
def run_summary(user_query: str) -> str:
query = get_sql_query(user_query, PROJECT)
print(f"Query will be executed: {query}")
results = bq_client.query(query).result()
results = list(bq_client.query(query).result())
print(f"Number of rows: {len(results)}")
print(f"Results: {results}")
docs = [{"page_id": row.page_id, "content": row.content} for row in results]
if not docs:
return ""
prompt = f"""以下の文書の中から「{user_query}」に近い活用事例を選び、summaryとpage_idを返してください。
summaryは日本語でなるべく簡潔に記載すること。
"""
for doc in docs:
prompt += f"[page_id: {doc['page_id']}]\n{doc['content']}\n---\n"
full_prompt = f"""
{prompt}
出力形式:
{{
"matched_use_cases": [
{{ "page_id": "abc123", "summary": "..." }},
...
]
}}
該当なしの場合は、"matched_use_cases": [] を返してください。
"""
response = model.generate_content(
full_prompt,
tools=[structured_output],
generation_config=GenerationConfig(temperature=0.7, max_output_tokens=1000)
)
content_part = response.candidates[0].content.parts[0]
summaries = []
if content_part.function_call:
summaries = content_part.function_call.args["matched_use_cases"]
# チャットっぽい整形
messages = ""
if summaries:
messages += "以下の活用事例が見つかりました。\n\n"
for i, item in enumerate(summaries, 1):
# NotionのページIDを置き換え
url_id = item["page_id"].replace("-", "")
msg = f"{i}. {item['summary']}\n事例はこちらから👉 https://www.notion.so/{url_id}"
messages += msg + "\n\n"
else:
messages += "申し訳ありませんが、関連する活用事例は見つかりませんでした。"
return messages
def get_sql_query(user_query: str, project_id: str) -> str:
return f"""
CREATE TEMP TABLE query_embedding AS
SELECT ml_generate_embedding_result AS embedding
FROM ML.GENERATE_EMBEDDING(
MODEL `{project_id}.<データセット名>.emb`,
(SELECT '{user_query}' AS content),
STRUCT(TRUE AS flatten_json_output, 16 AS output_dimensionality)
);
WITH result AS (
SELECT
p.page_id AS page_id,
p.section AS section,
p.event_date AS event_date,
p.content AS content,
result.distance AS distance
FROM VECTOR_SEARCH(
TABLE query_embedding,
'embedding',
TABLE `{project_id}.<データセット名>.pages_embedded`,
'embedding',
top_k => 5,
distance_type => 'COSINE',
options => '{{"use_brute_force":true}}'
) AS result
JOIN `{project_id}.<データセット名>.pages_embedded` AS p
ON p.page_id = result.query.page_id
),
ranked AS (
SELECT *, ROW_NUMBER() OVER (ORDER BY distance ASC) AS rn
FROM result
)
SELECT *
FROM ranked
WHERE rn <= 5;
"""
if __name__ == "__main__":
result = run_summary("言語モデルのアーキテクチャについて")
print(result)
Cloud Runを作る
上記の内容をArtifact Registry上にコンテナとして展開し、コンテナを使って起動できるようにします。
あと、最低インスタンス数は1にしておくといいです。(これならGCEでもいいかも)
してないとslackの3秒以内にレスポンス返してね、に引っかかって立ち上がってる間に3回くらいリクエストが飛びます。
Slack Appの設定を変更
Features->Event SubscriptionsにてEnable Eventsをオンにして、Request URLにCloud Runのリンク+上記実装例であれば/slack/eventsをつけたものを設定します。
例: https://cloudrunちゃん.app/slack/event
ここでチャレンジが成功したらokです。
重要: アプリをインストールする
Settings->Install Appから組織にアプリをインストールします。
メンションしてみてください!きましたか??
まとめ
今やって嬉しいこととして、やっぱり環境が整ってることが大きいですね。
前だったらVectorDBの準備とかもしないといけなかったですし。
今ならほぼほぼBigqueryに入れちゃえばおっけーって感じなので、作りやすいです。
あとfunction callingやstructed outputのような出力や行動を制御するための仕組みが導入されているので尤度に負けない安定した挙動が目指せます。
仲間を募集中!!
株式会社ホープでは、福岡で働くエンジニアを募集中です。
ぜひ、求人を見てみてください!
▼ Wantedly求人
https://www.wantedly.com/projects/1684027
▼ コーポレートサイト
https://www.zaigenkakuho.com/recruit/
「自治体を通じて人々に新たな価値を提供し、会社及び従業員の成長を追求する」
この理念の実現に向けて、今後も自治体の課題解決に取り組んでいきます。
ご応募お待ちしております!