はじめに
- Snowflakeに保存したデータを使った分析がしたいけれど、SQLが苦手...。
- そこで「自然言語で質問したら、AIがSQLを生成して、そのまま実行までやってくれる」チャットボットを作りました。
- この記事では、AWS Bedrock + Streamlit + Snowflake を使ったチャットボットの紹介をします。
完成イメージ
できること
- 自然言語での質問 → AIがSQLを自動生成
- 生成されたSQLをワンクリックで実行
Snowflake に保存しているデータ
データ概要
- DB名: wave_db
- スキーマ名: base
- ビュー名: base_wave
- データ内容: 気象庁Webページからダウンロードした波浪推算値データ(2018年11月〜2025年06月)
テーブル構造
select * from wave_db.base.base_wave limit 3;
LOCATION_NAME LATITUDE LONGITUDE LOCATION_CODE OBSERVATION_DATETIME PERIOD_SEC WAVE_HEIGHT_M WAVE_DIRECTION
網走沖 44°15′ 144°30′ A 2018-11-01 00:00:00.000 8 1.9 1
網走沖 44°15′ 144°30′ A 2018-11-01 12:00:00.000 7 1.1 1
網走沖 44°15′ 144°30′ A 2018-11-02 00:00:00.000 8 1.2 1
カラム
カラム内容
- LOCATION_NAME
- 計測地点名
- VARCHAR(16777216)
- LATITUDE
- 緯度
- VARCHAR(16777216)
- LONGITUDE
- 経度
- VARCHAR(16777216)
- LOCATION_CODE
- 計測地点コード
- VARCHAR(100)
- OBSERVATION_DATETIME
- 計測日時
- TIMESTAMP_NTZ(9)
- PERIOD_SEC
- 周期(単位:秒)
- VARCHAR(100)
- WAVE_HEIGHT_M:
- 波高(単位:メートル)
- VARCHAR(100)
- WAVE_DIRECTION
- 波向
- VARCHAR(100)
実装内容
1. AIエージェントの作成
- Snowflakeのテーブル定義やSQL例をS3バケットに保存し、AWS Knowledge Baseで参照できるようにしました。
- これにより、AIエージェントが適切なSQLを生成できるようになります。
AIエージェントのコード(抜粋)
コードを表示
"""
WaveChatBot - 波浪データ分析AIエージェント
Knowledge Baseと連携して、波浪データに関するSQLを生成
"""
import os
import sys
from io import StringIO
from contextlib import contextmanager
from strands import Agent
from pathlib import Path
from strands_tools import retrieve
from strands.models.bedrock import BedrockModel
# 変数設定
BEDROCK_MODEL = "anthropic.claude-3-5-sonnet-20240620-v1:0"
KNOWLEDGE_BASE_ID = "xxxx"
AWS_REGION = "ap-northeast-1"
# Knowledge Base用の環境変数を設定(retrieveツールが参照)
os.environ["KNOWLEDGE_BASE_ID"] = KNOWLEDGE_BASE_ID
os.environ["AWS_REGION"] = AWS_REGION
# プロジェクトのルートディレクトリ
ROOT_DIR = Path(__file__).parent
# システムプロンプトを読み込み
SYSTEM_PROMPT_PATH = ROOT_DIR / "prompts" / "system_prompt.txt"
with open(SYSTEM_PROMPT_PATH, "r", encoding="utf-8") as f:
SYSTEM_PROMPT = f.read()
# 標準出力を一時的に抑制するコンテキストマネージャー
@contextmanager
def suppress_stdout():
"""標準出力を一時的に抑制"""
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
yield
finally:
sys.stdout = old_stdout
# ChatBot用 AIエージェントの定義
def create_wavechat_agent():
bedrock_model = BedrockModel(model_id=BEDROCK_MODEL, region_name=AWS_REGION)
agent = Agent(model=bedrock_model, system_prompt=SYSTEM_PROMPT, tools=[retrieve])
return agent
# ユーザーのクエリに対してエージェントが応答
def wavechat_query(query: str, agent=None) -> str:
if agent is None:
agent = create_wavechat_agent()
# エージェント実行時の標準出力を抑制
with suppress_stdout():
response = agent(query)
# response.message から text 部分のみを抽出
message = response.message
if isinstance(message, dict) and "content" in message:
content_list = message["content"]
# content が list の場合、各要素の 'text' を結合
if isinstance(content_list, list):
text_parts = [
item.get("text", "") for item in content_list if isinstance(item, dict)
]
return "\n".join(text_parts)
# フォールバック(予期しない形式の場合)
return str(message)
2. システムプロンプト
システムプロンプトで、AIの振る舞いを定義しました。
-
主な内容:
- AIエージェントの役割
- SQL生成ルール
- 出力形式(3つのパターン)
あなたは波浪データ分析のアシスタントです。
【あなたの役割】
- ユーザーの質問を理解し、適切な形式で応答する
- SQLクエリを生成する場合は、必ず```sql```のコードブロックで囲んで返す
【重要な注意事項】
1. テーブル定義やカラム情報は、必要に応じてツールを使って検索してください
2. SQLは必ず実行可能な形式で出力すること
3. 曖昧な質問の場合は、ユーザーに確認を求めること
【出力形式】
ユーザーの質問の意図に応じて、以下のように応答してください:
## パターン1:SQLのみを求められている場合
ユーザーが「SQL」「クエリ」「コードを教えて」「書いて」などと明示的に指定した場合:
- **SQLのみを返す**(説明は最小限)
- ```sql```のコードブロックで囲む
## パターン2:データや情報を求められている場合
ユーザーが具体的なデータや分析結果を求めている場合:
- 簡潔な説明 + SQLクエリ を返す
- ```sql```のコードブロックで囲む
- SQLの意図を簡単に説明
## パターン3:メタ情報を求められている場合
ユーザーが「カラムを教えて」「テーブル構造」「データ構造」などを求めている場合:
- **説明を中心に返す**
- 必要に応じて、参考SQLも提示してOK
3. Knowledge Base の作成
Knowledge Baseには、以下の情報を登録しました:
- テーブル定義(カラム名、データ型)
- 主要な観測地点(26地点)
- SQL例(複数パターン)
これにより、AIが適切なSQLを生成できるようになります。
# 【利用可能なデータベース】
- データベース名:wave_db
- スキーマ名:base
- ビュー名:base_wave
# 【base_waveのカラム】
- LOCATION_NAME
- 計測地点名
- VARCHAR(16777216)
- LATITUDE
- 緯度
- VARCHAR(16777216)
- LONGITUDE
- 経度
- VARCHAR(16777216)
- LOCATION_CODE
- 計測地点コード
- VARCHAR(100)
- OBSERVATION_DATETIME
- 計測日時
- TIMESTAMP_NTZ(9)
- PERIOD_SEC
- 周期(単位:秒)
- VARCHAR(100)
- WAVE_HEIGHT_M:
- 波高(単位:メートル)
- VARCHAR(100)
- WAVE_DIRECTION
- 波向
- VARCHAR(100)
# 【主要な観測地点】
location_code:location_name
- A:網走沖
- B:釧路沖
- C:津軽海峡(太平洋側)
- D:金華山沖
- E:房総半島沖
- F:相模湾
- G:伊豆半島沖
- H:遠州灘
- I:紀伊水道
- J:土佐湾
- K:豊後水道
- L:種子島東方沖
- M:奄美大島沖
- N:沖縄島沖(太平洋側)
- O:石垣島沖
- P:沖縄島沖(東シナ海側)
- Q:薩摩半島沖
- R:天草灘
- S:玄界灘
- T:島根半島沖
- U:若狭湾
- V:富山湾
- W:酒田沖
- X:津軽海峡(日本海側)
- Y:石狩湾
- Z:宗谷海峡
# 【SQL例】
## Q1. 最大波高のみを取得
- 質問:2024年1月の豊後水道の最大波高は?
```sql
SELECT
MAX(wave_height_m) AS max_wave_height
FROM wave_db.base.base_wave
WHERE
location_name = '豊後水道'
AND YEAR(observation_datetime) = 2024
AND MONTH(observation_datetime) = 1;
```
実行例
AIに聞いてみる
- 質問:2022年5月の網走沖の最大波高を調べて!
ステップ1: AIがSQLを生成
ステップ2: SQLを実行
実行ボタンをクリックすると、結果が表示された!!
結果: 最大波高は 2.8m でした。

Snowflakeで検証
AIの結果が正しいか、Snowflakeで直接SQLを実行して確認してみる。


