ここ数日こちらを準備していました。
私は一応、生成AI担当なのですが、これまでDatabricksを用いた包括的な生成AIワークショップがなかったので準備していた次第です(ほとんどは英語からの翻訳ですが)。
こちらは超概要ですが、ユースケースの紹介やハンズオン、技術概要の説明やアーキテクチャ評価などをカバーしています。ここで説明するのはハンズオンの内容です。詳細を知りたい方は私や周辺のDatabricksの人に聞いてみてください。前編ではデータの準備とツールの準備までをカバーします。
ハンズオンラボ: Databricksでエージェントシステムを構築しよう
このハンズオンは準備段階含めると3つのノートブックから構成されています。
- データ準備
- ツールの準備
- エージェントの構築および評価
上のリポジトリには使用するデータとノートブックすべてが格納されています。
このラボは2つのパートに分かれています。パート1では、カスタマーサービスのシナリオ向けに様々なツールコールを持つDatabricksエージェントを構築・テストします。パート2では、製品に関する質問に答えるよりシンプルなエージェントを作成し、その性能評価に注力します。
パート1: 最初のエージェントを設計しよう
ノートブック: 01_create_tools\01_create_tools
1.1 ツールの構築
-
SQL関数
- カスタマーサービスの返品ワークフローを処理するために重要なデータへアクセスするクエリを作成します。
- これらのSQL関数はノートブックやエージェントから簡単に呼び出せます。
-
シンプルなPython関数
- 言語モデルの一般的な制限に対処するPython関数を作成します。
- これを「ツール」として登録し、エージェントが必要に応じて呼び出せるようにします。
1.2 LLMとの統合 [AI Playground]
-
ツールとLLMの組み合わせ
- Databricks AI PlaygroundでSQL/Pythonツールと大規模言語モデル(LLM)を組み合わせます。
- モデル: Claude 3.7 Sonnet
1.3 エージェントのテスト [AI Playground]
- システムプロンプト: すべての会社方針が満たされるまでツールを呼び出してください。
-
質問例: 当社の方針に基づき、キュー内の最新の返品を受け付けるべきですか?
- エージェントの段階的な推論と最終回答を観察しましょう。
-
MLflowトレースの確認
- MLflowでエージェントの実行を確認し、各ツールの呼び出し方法を理解します。
パート2: エージェント評価
ノートブック: 02_agent_eval\agent
2.1 新しいエージェントとリトリーバーツールの定義
-
ベクター検索
- 関連する製品ドキュメントを取得するためのベクター検索インデックスを事前に用意しています。
- このVSインデックスは agents_lab.product.product_docs_index にあります。
-
リトリーバー関数の作成
- このベクター検索インデックスを関数でラップし、LLMが製品情報を検索できるようにします。
- 最終回答にも同じLLMを使用します。
ノートブック: 02_agent_eval/driver
2.2 評価用データセットの定義
-
提供データセットの利用
- サンプル評価データセットを活用し、エージェントが製品の質問に答える能力をテストします。
- (オプション)合成データ生成も試してみましょう。
2.3 エージェントの評価
-
MLflow.evaluate()
の実行- MLflowがエージェントの回答と正解データセットを比較します。
- LLMベースの判定者が各回答をスコア付けし、フィードバックを収集します。
2.4 改善と再評価
-
リトリーバーの改善
- 評価フィードバックに基づき、リトリーバー設定(k=5→k=1)を調整します。
-
再評価の実施
- 新しいMLflowランを開始
- 再度
MLflow.evaluate()
を実行し、結果を比較します。 - MLflow評価UIでパフォーマンス向上を確認しましょう
パート0 - データの準備: 00_Data preparation
Sparkでワークスペースファイルを読み込むので、サーバレスコンピュートではなく汎用コンピュートを使用します。
ワークスペースファイルとは? | Databricks Documentation
データの準備
%run ./config
ハンズオンで使用するカタログ: takaakiyayoi_catalog
ハンズオンで使用する共有スキーマ(データを格納): agents_lab
# カタログの作成
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog_name} TO `account users`")
# スキーマの作成
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{system_schema_name}")
spark.sql(f"GRANT USE SCHEMA, SELECT ON SCHEMA {catalog_name}.{system_schema_name} TO `account users`")
import os
# ワークスペースファイルとして保存されているCSVを読み込み
cust_service_data_df = spark.read.format("csv").load(f"file:{os.getcwd()}/data/cust_service_data.csv", header=True, inferSchema=True)
policies_df = spark.read.format("csv").load(f"file:{os.getcwd()}/data/policies.csv", header=True, inferSchema=True)
product_docs_df = spark.read.format("csv").load(f"file:{os.getcwd()}/data/product_docs.csv", header=True, inferSchema=True, multiline='true')
display(cust_service_data_df)
display(policies_df)
display(product_docs_df)
# Unity Catalogのテーブルに保存
cust_service_data_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.cust_service_data")
policies_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.policies")
product_docs_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{system_schema_name}.product_docs")
Vector Search Indexの作成
02_agent_eval
で使用するVector Search Indexを作成します。各ユーザーはこのVector Search Indexを使用します。
# ソーステーブル名
source_table = f"{catalog_name}.{system_schema_name}.product_docs"
# Change Data Feedを有効にする
spark.sql(f"""
ALTER TABLE {source_table}
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
print(f"✅ {source_table} のチェンジデータフィードの有効化")
# 設定を確認
spark.sql(f"SHOW TBLPROPERTIES {source_table}").filter("key = 'delta.enableChangeDataFeed'").show()
✅ takaakiyayoi_catalog.agents_lab.product_docs のチェンジデータフィードの有効化
+--------------------+-----+
| key|value|
+--------------------+-----+
|delta.enableChang...| true|
+--------------------+-----+
import requests
import json
import time
context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
host = context.apiUrl().get()
token = context.apiToken().get()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# パラメータ
endpoint_name = "one-env-shared-endpoint-1" # 必要に応じて変更ください
index_name = f"{catalog_name}.{system_schema_name}.product_docs_index"
source_table = f"{catalog_name}.{system_schema_name}.product_docs"
# インデックスを作成
url = f"{host}/api/2.0/vector-search/indexes"
payload = {
"name": index_name,
"endpoint_name": endpoint_name,
"primary_key": "product_id",
"index_type": "DELTA_SYNC",
"delta_sync_index_spec": {
"source_table": source_table,
"pipeline_type": "TRIGGERED",
"embedding_source_columns": [{ # columnsは配列形式
"name": "product_doc",
"embedding_model_endpoint_name": "databricks-gte-large-en"
}]
}
}
print("Vector Search Indexを作成中...")
response = requests.post(url, headers=headers, json=payload)
if response.status_code in [200, 201]:
print("✅ インデックス作成リクエストが正常に送信されました!")
print("📊 初回同期は自動的に開始されます...\n")
else:
print(f"❌ インデックス作成エラー: {response.status_code}")
print(response.text)
Vector Search Indexを作成中...
✅ インデックス作成リクエストが正常に送信されました!
📊 初回同期は自動的に開始されます...
if response.status_code in [200, 201]:
# インデックスの状態を監視
def monitor_index_status(index_name, timeout_minutes=60):
status_url = f"{host}/api/2.0/vector-search/indexes/{index_name}"
start_time = time.time()
timeout_seconds = timeout_minutes * 60
print(f"⏳ インデックスの状態を監視中...")
print(f"タイムアウト: {timeout_minutes} 分")
print("-" * 70)
previous_count = 0
first_check = True
while time.time() - start_time < timeout_seconds:
try:
response = requests.get(status_url, headers=headers)
if response.status_code == 200:
data = response.json()
status = data.get("status", {})
spec = data.get("delta_sync_index_spec", {})
# ステータス情報
state = status.get("detailed_state", "不明")
ready = status.get("ready", False)
indexed_count = status.get("indexed_row_count", 0)
message = status.get("message", "")
# 最初のチェックで総行数を取得
if first_check:
total_rows = spec.get("num_rows", 0)
if total_rows > 0:
print(f"📊 インデックス対象の総行数: {total_rows:,}")
first_check = False
# 進捗表示
elapsed = int(time.time() - start_time)
rows_diff = indexed_count - previous_count
speed = f"{rows_diff:,} 行" if rows_diff > 0 else "初期化中"
print(f"\r⏱️ {elapsed//60}分 {elapsed%60}秒 | "
f"状態: {state} | "
f"行数: {indexed_count:,} | "
f"速度: {speed}/30秒 | "
f"準備完了: {ready}", end="")
previous_count = indexed_count
# 成功判定:readyがTrueかつstateがONLINE系
if ready == True and ("ONLINE" in state or state == "READY"):
print(f"\n\n✅ インデックスの準備ができました!")
print(f"📊 インデックス済み総行数: {indexed_count:,}")
print(f"⏱️ 合計時間: {elapsed//60}分 {elapsed%60}秒")
print(f"📋 最終状態: {state}")
return True
# エラー判定
if state in ["FAILED", "ERROR"]:
print(f"\n\n❌ インデックス作成に失敗しました!")
print(f"エラー状態: {state}")
return False
# タイムアウト判定
if elapsed > timeout_seconds:
print(f"\n\n⏰ {timeout_minutes}分後にタイムアウトしました")
print(f"最終状態: {state}, 準備完了: {ready}")
return False
else:
print(f"\n⚠️ ステータス確認エラー: {response.status_code}")
print(response.text)
time.sleep(30) # 30秒ごとにチェック
except Exception as e:
print(f"\n⚠️ 例外発生: {e}")
time.sleep(30)
print(f"\n\n⏰ {timeout_minutes}分でタイムアウトしました")
return False
# インデックスの監視を開始
success = monitor_index_status(index_name, timeout_minutes=60)
if success:
print("\n🎉 ベクターサーチインデックスの準備ができました!")
# 最終的なインデックス情報を表示
final_url = f"{host}/api/2.0/vector-search/indexes/{index_name}"
final_response = requests.get(final_url, headers=headers)
if final_response.status_code == 200:
final_data = final_response.json()
print(f"\n📋 インデックス詳細:")
print(f" - 名前: {final_data.get('name')}")
print(f" - エンドポイント: {final_data.get('endpoint_name')}")
print(f" - ステータス: {final_data.get('status', {}).get('detailed_state')}")
print(f" - 行数: {final_data.get('status', {}).get('indexed_row_count', 0):,}")
else:
print("\n😞 インデックス作成が正常に完了しませんでした")
elif response.status_code == 409:
print("⚠️ インデックスはすでに存在します")
print("必要に応じて削除して再作成できます")
else:
print(f"❌ インデックス作成エラー: {response.status_code}")
print(response.text)
⏳ インデックスの状態を監視中...
タイムアウト: 60 分
----------------------------------------------------------------------
⏱️ 4分 4秒 | 状態: ONLINE_TRIGGERED_UPDATE | 行数: 552 | 速度: 411 行/30秒 | 準備完了: True了: False
✅ インデックスの準備ができました!
📊 インデックス済み総行数: 552
⏱️ 合計時間: 4分 4秒
📋 最終状態: ONLINE_TRIGGERED_UPDATE
🎉 ベクターサーチインデックスの準備ができました!
📋 インデックス詳細:
- 名前: takaakiyayoi_catalog.agents_lab.product_docs_index
- エンドポイント: one-env-shared-endpoint-1
- ステータス: ONLINE_TRIGGERED_UPDATE
- 行数: 552
これでハンズオンで使用するデータの準備ができあmした。
パート1 - 最初のエージェントを設計する: 01_create_tools\01_create_tools
この最初のエージェントは、カスタマーサービス担当者のワークフローに従い、さまざまなエージェント機能を説明します。商品返品の処理に焦点を当て、具体的な手順を追っていきます。
このノートブックでは、SQLとPythonセルで共通のパラメーターを参照するようにウィジェットを設定します。
# ウィジェットをクリア
dbutils.widgets.removeAll()
%pip install -qqqq -U -r requirements.txt
# パッケージをPython環境にロードするために再起動します
dbutils.library.restartPython()
ここからはユーザーごとのスキーマを作成して、ツールを格納していきます。
%run ../config
ハンズオンで使用するカタログ: takaakiyayoi_catalog
ハンズオンで使用する共有スキーマ(データを格納): agents_lab
from databricks.sdk import WorkspaceClient
import yaml
import os
import re
# ワークスペースクライアントを使用して現在のユーザーに関する情報を取得
w = WorkspaceClient()
user_email = w.current_user.me().emails[0].value
username = user_email.split('@')[0]
username = re.sub(r'[^a-zA-Z0-9_]', '_', username) # 特殊文字をアンダースコアに置換
# スキーマを指定します
user_schema_name = f"agents_lab_{username}" # ユーザーごとのスキーマ
print("あなたのカタログは:", catalog_name)
print("あなたのスキーマは:", user_schema_name)
# SQL/Python関数を作成する際にこれらの値を参照できるようにします
dbutils.widgets.text("catalog_name", defaultValue=catalog_name, label="Catalog Name")
dbutils.widgets.text("system_schema_name", defaultValue=system_schema_name, label="System Schema Name")
dbutils.widgets.text("user_schema_name", defaultValue=user_schema_name, label="User Schema Name")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{user_schema_name}")
あなたのカタログは: takaakiyayoi_catalog
あなたのスキーマは: agents_lab_takaaki_yayoi
DataFrame[]
1.1 シンプルなツールを作成する: カスタマーサービス返品処理ワークフロー
パート1では、以下のシナリオで必要となるエージェントのツールを準備します。
- SQL関数:返品処理ワークフローの各ステップで重要となるデータへアクセスするクエリを作成します。
- シンプルなPython関数:言語モデルの一般的な制限を克服するためのPython関数を作成し、登録します。
以下は、カスタマーサービス担当者が返品を処理する際に通常従う主要なステップの構造化された概要です。このワークフローにより、サポートチーム全体で一貫性と明確さが確保されます。
1. 処理キュー内の最新の返品を取得する
- アクション:チケッティングまたは返品システムから最新の返品リクエストを特定し、取得します。
- 理由:最も緊急または次に対応すべき顧客の問題に取り組んでいることを保証します。
処理キューにおける最新の返品を取得
%sql
-- インタラクションの日付、問題のカテゴリ、問題の説明、および顧客の名前を選択
SELECT
cast(date_time as date) as case_time,
issue_category,
issue_description,
name
FROM IDENTIFIER(:catalog_name || '.' || :system_schema_name || '.cust_service_data')
-- インタラクションの日付と時刻で結果を降順に並べ替え
ORDER BY date_time DESC
-- 結果を最新のインタラクションに制限
LIMIT 1
case_time | issue_category | issue_description | name |
---|---|---|---|
2025-07-11 | Returns | こんにちは、私は新しいSoundWave X5 Proヘッドフォンを楽しんでいましたが、突然電話に接続できなくなりました。Bluetoothデバイスのリストに全く表示されません。昨日は完璧に動作していたので、苛立っています。修正する方法はありますか、返品処理に進むべきですか? | Nicolas Pelaez |
%sql
USE CATALOG IDENTIFIER(:catalog_name);
USE SCHEMA IDENTIFIER(:system_schema_name)
Unity Catalogに登録される関数を作成
%sql
CREATE OR REPLACE FUNCTION
IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_latest_return')()
RETURNS TABLE(purchase_date DATE, issue_category STRING, issue_description STRING, name STRING)
COMMENT '最新のカスタマーサービス対応(返品など)を返します。'
RETURN (
SELECT
CAST(date_time AS DATE) AS purchase_date,
issue_category,
issue_description,
name
FROM cust_service_data
ORDER BY date_time DESC
LIMIT 1
);
最新の返品を取得するために関数呼び出しをテスト
%sql
select * from IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_latest_return')()
purchase_date | issue_category | issue_description | name |
---|---|---|---|
2025-07-11 | Returns | こんにちは、私は新しいSoundWave X5 Proヘッドフォンを楽しんでいましたが、突然電話に接続できなくなりました。Bluetoothデバイスのリストに全く表示されません。昨日は完璧に動作していたので、苛立っています。修正する方法はありますか、返品処理に進むべきですか? | Nicolas Pelaez |
2. 会社のポリシーを取得する
- アクション:返品、返金、および交換に関する内部ナレッジベースまたはポリシー文書にアクセスします。
- 理由:会社のガイドラインに準拠していることを確認することで、潜在的なエラーや対立を防ぎます。
返品ポリシーを取得する関数の呼び出し
%sql
CREATE OR REPLACE FUNCTION
IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_return_policy')()
RETURNS TABLE (
policy STRING,
policy_details STRING,
last_updated DATE
)
COMMENT '返品ポリシーの詳細を返します'
LANGUAGE SQL
RETURN (
SELECT
policy,
policy_details,
last_updated
FROM policies
WHERE policy = 'Return Policy'
LIMIT 1
);
返品ポリシーを取得する関数のテスト
%sql
select * from IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_return_policy')()
policy | policy_details | last_updated |
---|---|---|
Return Policy | カレンダー年ごとに12点まで返品することができ、それぞれが購入日から30日以内でなければなりません。 | 2023-01-15 |
3. 最新の返品のUserIDを取得する
- アクション:返品リクエストの詳細からユーザーの一意の識別子を記録します。
- 理由:正しいユーザーデータを正確に参照することで、処理が効率化され、顧客記録の混同を防ぎます。
名前に基づいてuserIDを取得する関数の作成
%sql
CREATE OR REPLACE FUNCTION
IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_user_id')(user_name STRING)
RETURNS STRING
COMMENT 'これは顧客の名前を入力として受け取り、対応するユーザーIDを返します'
LANGUAGE SQL
RETURN
SELECT customer_id
FROM cust_service_data
WHERE name = user_name
LIMIT 1
;
名前に基づいてuserIDを取得する関数のテスト
%sql
--新たなパラメータ構文 (MLR > 15.1)
select IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_user_id')('Nicolas Pelaez');
4. UserIDを使って注文履歴を照会する
- アクション:UserIDを使って注文管理システムや顧客データベースを検索します。
- 理由:過去の購入履歴や返品傾向、特記事項を確認することで、次に取るべき適切な対応(例:返品の適格性の確認)を判断できます。
userIDに基づいて注文履歴を取得する関数の作成
%sql
CREATE OR REPLACE FUNCTION
IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_order_history')(user_id STRING)
RETURNS TABLE (returns_last_12_months INT, issue_category STRING)
COMMENT 'これは顧客のuser_idを入力として受け取り、過去12か月間の返品数と問題カテゴリを返します'
LANGUAGE SQL
RETURN
SELECT count(*) as returns_last_12_months, issue_category
FROM cust_service_data
WHERE customer_id = user_id
GROUP BY issue_category;
userIDに基づいて注文履歴を取得する関数のテスト
%sql
select * from IDENTIFIER(:catalog_name || '.' || :user_schema_name || '.get_order_history')('453e50e0-232e-44ea-9fe3-28d550be6294')
returns_last_12_months | issue_category |
---|---|
23 | Returns |
5. LLMに今日の日付を取得するPython関数を提供する
- アクション:LLM(大規模言語モデル)に現在の日付を提供できるPython関数を用意します。
- 理由:日付の自動取得により、集荷のスケジューリング、返金のタイムライン、連絡期限などの管理が容易になります。
注:System.ai.python_execに登録された関数を使うことで、LLMが生成したコードをサンドボックス環境で実行できます
非常にシンプルなPython関数
def get_todays_date() -> str:
"""
今日の日付を 'YYYY-MM-DD' 形式で返します。
Returns:
str: 今日の日付を 'YYYY-MM-DD' 形式で返します。
"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d")
Python関数のテスト
today = get_todays_date()
today
'2025-08-05'
Python関数をUnity Catalogに登録
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
client = DatabricksFunctionClient()
# このツールをUCにデプロイし、ツールのdocstringと型ヒントに基づいてUCのメタデータを自動的に設定します
python_tool_uc_info = client.create_python_function(func=get_todays_date, catalog=catalog_name, schema=user_schema_name, replace=True)
# ツールはUC内の `{catalog}.{schema}.{func}` という名前の関数にデプロイされます。ここで {func} は関数の名前です
# デプロイされたUnity Catalog関数名を表示します
display(f"デプロイされたUnity Catalog関数名: {python_tool_uc_info.full_name}")
'デプロイされたUnity Catalog関数名: takaakiyayoi_catalog.agents_lab_takaaki_yayoi.get_todays_dat
作成した関数を見てみましょう
from IPython.display import display, HTML
# DatabricksのホストURLを取得
workspace_url = spark.conf.get('spark.databricks.workspaceUrl')
# 作成した関数へのHTMLリンクを作成
html_link = f'<a href="https://{workspace_url}/explore/data/functions/{catalog_name}/{user_schema_name}/get_todays_date" target="_blank">登録済み関数をUnity Catalogで確認</a>'
display(HTML(html_link))
1.2 LLMとの統合【AI Playground】
- 作成したツールを言語モデル(LLM)と組み合わせてAI Playgroundで利用します。
1.3 エージェントのテスト【AI Playground】
- エージェントに質問し、応答を観察します。
- MLflowトレースを活用してエージェントのパフォーマンスをさらに深く探ります。
では、これらの関数を使用して最初のエージェントを組み立てる方法をAIプレイグラウンドで見てみましょう!
-
システムプロンプト:
すべての社内ポリシーが満たされていると確信できるまで、ツールを呼び出すこと
-
質問例:
当社のポリシーに基づいて、最新の返品を受け付けるべきでしょうか?
AIプレイグラウンドは、左のナビゲーションバーの「AI/ML」から見つけることができます。または、以下に作成されたリンクを使用することもできます。
問い合わせに回答するために、LLM(Claude 3.7 Sonnet)が上で定義したツールを適切に呼び出していることがわかります。
パート1では、AI Playgroundを活用することで簡単にエージェントのプロトタイピングが行えることを体験いただきました。
後編のパート2ではエージェントの構築と評価を行います。