こちらのブログ記事のノートブックをウォークスルーしました。
こちらのリポジトリにソースがあります。
全体像はこちら。
README.md: SAP バイク販売エージェント
全体像と準備事項が記載されています。
このリポジトリには、Databricksブログ「ナレッジグラフとDatabricksエージェントの統合によるAI駆動のインサイト」のための補助コードベースが含まれています。
前提条件を満たし、コードを実行して、Databricksを使用したサプライチェーンデータコンテキストを持つエージェントチャットアプリケーションを作成します。その後、エージェントとチャットしてバイク販売データを探索し、サプライチェーンの最適化について質問してください!
推奨Databricksランタイム: サーバーレス
前提条件
- 次の機能/権限が有効になっているDatabricksワークスペース:
- Unity Catalog(少なくとも1つのカタログに対するUSE CATALOG権限)
- DBSQLサーバーレスウェアハウス(CAN USE権限付き)
- サーバーレスノートブックとMosaic AIモデルサービング
- Databricks Apps
- シークレットスコープ(少なくとも1つのスコープに対するMANAGE権限)
- Neo4j Auraインスタンス
- アカウントを作成し、こちらの手順に従って無料インスタンスをデプロイ
- インスタンス作成時にユーザー名と生成されたパスワードを保存すること
- ホストURLとパスワードをDatabricksシークレットスコープに追加
手順
前提条件が満たされたら、以下の手順に従ってサプライチェーンエージェントをデプロイします。各ノートブックには詳細な指示があります。
リソースの初期化
-
リポジトリをホームディレクトリの新しいGitフォルダにクローン
-
00_initノートブックを開き、サーバーレスコンピュートに接続し、最初の4つのセルを実行
-
ウィジェットの値を更新
-
ノートブックの残りを実行して、構成ファイルを更新し、必要なリソースを作成
Neo4jへのロード
- 01_load_to_neo4jノートブックを開き、サーバーレスコンピュートに接続
- ノートブックを実行して、前提条件として作成されたグラフデータベースをポピュレート
エージェントの作成
-
02_agentノートブックを開き、サーバーレスコンピュートに接続
-
03_driverノートブックを開き、サーバーレスコンピュートに接続
-
ノートブックを実行してエージェントをモデルサービングにデプロイし、レビューアプリを作成
アプリの作成
- 04_deploy_appノートブックを開き、サーバーレスコンピュートに接続
- ノートブックを実行してチャットインターフェースを作成
- アプリの詳細ページに移動し、URLをクリックしてエージェントにアクセス
データセットの詳細
SAPバイク販売データセットは、SAP Datasphereを探索するためのサンプルコンテンツとして提供されています。CSVファイルは https://github.com/SAP-samples/datasphere-content/tree/main/Sample_Bikes_Sales_content (Apache 2.0ライセンス)で見つけることができます。
データセットをロードするために使用されたCypherクエリは、オープンソースプロジェクトhttps://github.com/neo4j-partners/blog-load-SAP-data-cypher-queries (Apache 2.0ライセンス) から適応されました。このプロジェクトはこちらのブログの補助として提供されました。
依存関係
パッケージ | バージョン | ライセンス |
---|---|---|
PyYAML | 最新 | MITライセンス (MIT) |
py2neo | 最新 | Apache-2.0 |
mlflow | 最新 | Apache-2.0 |
pydantic | 最新 | MITライセンス (MIT) |
langchain | >0.2.16 | MITライセンス (MIT) |
langchain-community | >0.2.16 | MITライセンス (MIT) |
databricks_langchain | 最新 | MITライセンス (MIT) |
openai | 最新 | Apache-2.0 |
langchain_neo4j | 最新 | MITライセンス (MIT) |
langgraph | 0.2.74 | MITライセンス (MIT) |
langgraph-checkpoint | >1.0.12 | MITライセンス (MIT) |
00_init: リソースの初期化
使用するリソースの設定を行います。ノートブックを実行する前に以下の準備を行います。
- 前提条件にあるように、Neo4jにサインアップしてAuraインスタンスを作成します。
- シークレットにNeo4jの接続情報を格納します。
- Neo4jへのアクセスに使うSQLウェアハウスのIDをコピーしておきます。
Neo4jにサインアップします。無料枠で使えます。
余談ですが、情報入力してくとグラフが組み上がっていくのが面白いです。
インスタンス作成に数分要します。
インスタンスが作成されるとホストやパスワードの情報が格納されたファイルをダウンロードします。
# Wait 60 seconds before connecting using these details, or login to https://console.neo4j.io to validate the Aura Instance is available
NEO4J_URI=neo4j+s://<ホスト名>
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=<パスワード>
AURA_INSTANCEID=f7b5f429
AURA_INSTANCENAME=Free instance
ホスト名とパスワードをDatabricksシークレットに格納します。
databricks secrets put-secret demo-token-takaaki.yayoi neo4j-host
databricks secrets put-secret demo-token-takaaki.yayoi neo4j-key
ウィジェットパラメータを入力し、TODOを解決して、このノートブックを実行して、後続のノートブックを実行するために必要な前提リソースを作成します。
これには、Unity Catalogオブジェクト、Neo4j接続に使用されるシークレット、およびエージェントのSQLツールの作成が含まれます。
%pip install PyYAML
%restart_python
接続シークレットの作成
TODO: シークレットスコープを作成し、シークレットを追加する
- neo4j-host
- neo4j-key
dbutils.widgets.text("catalog", "users")
dbutils.widgets.text("schema", "takaaki_yayoi")
dbutils.widgets.text("secret_scope", "demo-token-takaaki.yayoi")
dbutils.widgets.text("warehouse_id", "75fd8278393d07eb")
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
secret_scope = dbutils.widgets.get("secret_scope")
warehouse_id = dbutils.widgets.get("warehouse_id")
UCオブジェクトの作成
自分は作成済みのカタログとスキーマを使ったのでコメントアウトしています。
%sql
--CREATE CATALOG IF NOT EXISTS ${catalog};
--CREATE SCHEMA IF NOT EXISTS ${catalog}.${schema};
エージェントのツールを初期化する
これには、エージェントツールとして使用するためのUnity Catalog SQL関数の登録が含まれます。Pythonツール(つまり、Graph Chain)はエージェントノートブックで登録されます。
%sql
CREATE OR REPLACE FUNCTION ${catalog}.${schema}.dummy_function(input_str STRING DEFAULT 'dummy')
RETURNS STRING
COMMENT 'Do not use as part of the tools.'
RETURN input_str;
config.ymlファイルの設定
必要な値をconfig.ymlファイルに追加するためにコードを実行します。
- _uc_functions_のカタログとスキーマを設定する
- _warehouse_id_を設定する
- _secret_scope_を設定する
- _llm_endpoint_を更新する(必要に応じて手動で更新)
import yaml
config_path = './config.yml'
# 既存のconfig.ymlファイルを読み込む
def read_config(file_path):
try:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
return config
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
config = read_config(config_path)
# configファイルの値を更新する
config['catalog'] = catalog
config['schema'] = schema
config['secret_scope'] = secret_scope
config['warehouse_id'] = warehouse_id
config['uc_functions'] = [f"{catalog}.{schema}.dummy_function"]
# 文字列にダブルクォートを強制するカスタムDumper
class DoubleQuotedDumper(yaml.Dumper):
def represent_str(self, data):
return self.represent_scalar('tag:yaml.org,2002:str', data, style='"')
# カスタム文字列表現を登録する
DoubleQuotedDumper.add_representer(str, DoubleQuotedDumper.represent_str)
# 更新されたconfig.ymlファイルを保存する
with open(config_path, 'w') as file:
yaml.dump(config, file, Dumper=DoubleQuotedDumper, width=float("inf"))
"agent_prompt": "Task: You are an expert in supply chain optimization. Your main purpose is to generate Neo4j Cypher statements that will be used to query the Aura Graph instance, and use results of the query to answer the question. The Graph database contains data about Bike Sales from SAP including information for products, suppliers, employees, and sales orders. Instructions: Interpret the user query to create and execute a Cypher statement using the provided tools. If the query fails, adjust the Cypher statement and try running again. Explain your process and logic for any actions taken."
"catalog": "users"
"llm_endpoint": "databricks-meta-llama-3-3-70b-instruct"
"schema": "takaaki_yayoi"
"secret_scope": "demo-token-takaaki.yayoi"
"uc_functions":
- "users.takaaki_yayoi.dummy_function"
"warehouse_id": "75fd8278393d07eb"
次のステップ
このノートブックの実行が完了したら、次のノートブック 01_load_to_neo4j に進み、グラフデータベースをポピュレートします。
01_load_to_neo4j: Cypherクエリを使用してグラフデータベースを構築
ノードとリレーションシップを作成するために、SAP Bike Salesサンプルデータセットを使用します。
このコードはNeo4jに接続し、GithubからCSVファイルをロードしてグラフ表現に整理するCypherクエリを実行します。Cypherクエリは、Neo4jのこのブログのために作成された元のクエリを適応させたものです。
%pip install py2neo PyYAML
%restart_python
import yaml
def read_config(file_path):
try:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
return config
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
config = read_config('./config.yml')
secret_scope = config.get('secret_scope')
from py2neo import Graph
def read_cypher_query(file_path):
try:
with open(file_path, 'r') as file:
return file.read()
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
def execute_cypher_query(cypher_query):
# Neo4jに接続
graph_db = Graph(dbutils.secrets.get(scope=secret_scope, key="neo4j-host"), auth=("neo4j", dbutils.secrets.get(scope=secret_scope, key="neo4j-key")))
# クエリを解析
split_values = [value.strip() for value in cypher_query.split(';') if value.strip()]
for q in split_values:
try:
# クエリを実行
results = graph_db.run(q)
# 結果を処理
for record in results:
print(record)
except Exception as e:
print(f"Error executing query: {e}")
import os
file_path=os.getcwd() + "/_resources/load_sap_sample_data.cypher"
execute_cypher_query(read_cypher_query(file_path))
Neo4jのExploreでグラフ構造を探索できます。
次のステップ
このノートブックの実行が完了したら、次のノートブック 02_agent に進んでエージェントを作成してください。
02_agent: エージェントノートブック
エージェントフレームワークには、同じフォルダーに3つのノートブックが必要です:
- 02_agent: エージェントを構築するコードが含まれています。
- config.yml): 設定が含まれています。
- 03_driver: エージェントのログ、評価、登録、およびデプロイを行います。
このノートブックはMosaic AIエージェントフレームワークを使用しています (AWS | Azure)
このノートブックを使用してエージェントを反復し、変更します。例えば、ツールを追加したり、システムプロンプトを変更したりできます。
注意: このノートブックはLangChainを使用していますが、AIエージェントフレームワークはPyfuncやLlamaIndexなどの他のエージェントフレームワークとも互換性があります。
%pip install -U -qqqq mlflow-skinny langchain>0.2.16 langgraph-checkpoint>1.0.12 langchain_core langchain-community>0.2.16 langgraph==0.2.74 pydantic databricks_langchain openai langchain_neo4j
dbutils.library.restartPython()
インポートとセットアップ
mlflow.langchain.autolog()
を使用して MLflow トレース を設定します。
import mlflow
from mlflow.models import ModelConfig
mlflow.langchain.autolog()
config = ModelConfig(development_config="config.yml")
チャットモデルとツールの定義
LangGraphツール呼び出しをサポートするLangChainチャットモデルを作成します。
注意: このノートブックはLangChainを使用していますが、AIエージェントフレームワークはPyfuncやLlamaIndexなどの他のエージェントフレームワークとも互換性があります。
from databricks_langchain import ChatDatabricks
# Initialize the llm
llm = ChatDatabricks(
endpoint=config.get("llm_endpoint")
)
from langchain_community.tools.databricks import UCFunctionToolkit
uc_functions = config.get("uc_functions")
tools = (
UCFunctionToolkit(warehouse_id=config.get("warehouse_id"))
.include(*uc_functions)
.get_tools()
)
#tools = []
グラフQAツールの登録
注意
以下のコメントにあるように、02_agent
ノートブックを実行する際にはコメントを解除して、設定値を読み込む必要があります。一方、03_driver
でこのノートブックを記録する際にはコメントアウトしておかないと、MLflowのロギングの過程でエラーになります。
# # テストのためにコメントを解除します(このコードをモデルとしてデプロイする場合、dbutilsの使用によりサービングが失敗します)
import os
host = dbutils.secrets.get(scope=config.get("secret_scope"), key="neo4j-host")
secret = dbutils.secrets.get(scope=config.get("secret_scope"), key="neo4j-key")
os.environ["NEO4J_HOST"] = host
os.environ["NEO4J_KEY"] = secret
from langchain_neo4j import Neo4jGraph, GraphCypherQAChain
from langchain_core.tools import Tool
import os
graph = Neo4jGraph(
url=os.getenv("NEO4J_HOST"),
username="neo4j",
password=os.getenv("NEO4J_KEY")
)
chain = GraphCypherQAChain.from_llm(
graph=graph,
cypher_llm=llm,
qa_llm=llm,
validate_cypher=True,
allow_dangerous_requests=True, # LLMがNeo4jデータベースにアクセスして変更を加えることを認めるためにこれをTrueに設定します。このフラグがないと、コードは失敗します。
use_function_response=True, # 関数の出力をツールとして使用するためにこれを追加します
verbose=True
)
graph_tool = Tool(
name="graph_qa_tool",
func=chain.invoke,
description="質問に答えるためのNeo4j Cypherクエリを生成し、グラフインスタンスでクエリを実行します。"
)
tools.append(graph_tool)
出力パーサー
Databricksインターフェース(AI Playgroundなど)は、ツール呼び出しを整形表示することができます。
次のヘルパー関数を使用して、LLMの出力を期待される形式に解析します。
from typing import Iterator, Dict, Any
from langchain_core.messages import (
AIMessage,
HumanMessage,
ToolMessage,
MessageLikeRepresentation,
)
import json
def stringify_tool_call(tool_call: Dict[str, Any]) -> str:
"""
tool_callに十分な情報がある場合、raw tool callをplayground UIが期待するフォーマットされた文字列に変換します
"""
try:
request = json.dumps(
{
"id": tool_call.get("id"),
"name": tool_call.get("name"),
"arguments": json.dumps(tool_call.get("args", {})),
},
indent=2,
)
return f"<tool_call>{request}</tool_call>"
except:
return str(tool_call)
def stringify_tool_result(tool_msg: ToolMessage) -> str:
"""
tool_msgに十分な情報がある場合、ToolMessageをplayground UIが期待するフォーマットされた文字列に変換します
"""
try:
result = json.dumps(
{"id": tool_msg.tool_call_id, "content": tool_msg.content}, indent=2
)
return f"<tool_call_result>{result}</tool_call_result>"
except:
return str(tool_msg)
def parse_message(msg) -> str:
"""異なるメッセージタイプを文字列表現に解析します"""
# tool call result
if isinstance(msg, ToolMessage):
return stringify_tool_result(msg)
# tool call
elif isinstance(msg, AIMessage) and msg.tool_calls:
tool_call_results = [stringify_tool_call(call) for call in msg.tool_calls]
return "".join(tool_call_results)
# 通常のHumanMessageまたはAIMessage(推論または最終回答)
elif isinstance(msg, (AIMessage, HumanMessage)):
return msg.content
else:
print(f"予期しないメッセージタイプ: {type(msg)}")
return str(msg)
def wrap_output(stream: Iterator[MessageLikeRepresentation]) -> Iterator[str]:
"""
メッセージストリームからフォーマットされた出力を処理して生成します。
invokeおよびstream langchain関数は異なる出力フォーマットを生成します。
この関数は両方のケースを処理します。
"""
for event in stream:
# エージェントがinvoke()で呼び出された場合
if "messages" in event:
for msg in event["messages"]:
yield parse_message(msg) + "\n\n"
# エージェントがstream()で呼び出された場合
else:
for node in event:
for key, messages in event[node].items():
if isinstance(messages, list):
for msg in messages:
yield parse_message(msg) + "\n\n"
else:
print("キー{key}に対する予期しない値{messages}。`MessageLikeRepresentation`のリストを期待しました")
yield str(messages)
エージェントの作成
ここでは、config.ymlで定義されたモデルとツールを使用するシンプルなグラフを提供します。このグラフは この LangGraph ガイド から適応されています。
LangGraph エージェントをさらにカスタマイズするには、以下を参照してください:
- LangGraph - クイックスタート で、この LangGraph エージェントで使用される概念の説明
- LangGraph - ハウツーガイド で、エージェントの機能を拡張する方法
from typing import (
Annotated,
Optional,
Sequence,
TypedDict,
Union,
)
from langchain_core.language_models import LanguageModelLike
from langchain_core.messages import (
BaseMessage,
SystemMessage,
)
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt.tool_executor import ToolExecutor
from langgraph.prebuilt.tool_node import ToolNode
# エージェントの状態を保持するクラスを作成します
# これは単にメッセージのリストを含みます
class AgentState(TypedDict):
"""エージェントの状態。"""
messages: Annotated[Sequence[BaseMessage], add_messages]
def create_tool_calling_agent(
model: LanguageModelLike,
tools: Union[ToolExecutor, Sequence[BaseTool]],
tool_choice: Optional[str] = 'auto',
agent_prompt: Optional[str] = None,
) -> CompiledGraph:
model = model.bind_tools(tools)
# どのノードに進むかを決定する関数を定義します
def should_continue(state: AgentState):
messages = state["messages"]
last_message = messages[-1]
# 関数呼び出しがない場合、終了します
if not last_message.tool_calls:
return "end"
else:
return "continue"
if agent_prompt:
system_message = SystemMessage(content=agent_prompt)
preprocessor = RunnableLambda(
lambda state: [system_message] + state["messages"]
)
else:
preprocessor = RunnableLambda(lambda state: state["messages"])
model_runnable = preprocessor | model
# モデルを呼び出す関数を定義します
def call_model(
state: AgentState,
config: RunnableConfig,
):
response = model_runnable.invoke(state, config)
return {"messages": [response]}
workflow = StateGraph(AgentState)
workflow.add_node("agent", RunnableLambda(call_model))
workflow.add_node("tools", ToolNode(tools))
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
# 最初に開始ノードを定義します。ここではagentを使用します。
# これはエージェントノードが呼び出された後に取られるエッジです。
"agent",
# 次に、どのノードが次に呼び出されるかを決定する関数を渡します。
should_continue,
# 以下のマッピングは、どのノードに進むかを決定するために使用されます
{
# toolsの場合、ツールノードを呼び出します。
"continue": "tools",
# ENDはグラフが終了することを示す特別なノードです。
"end": END,
},
)
# toolsからagentへの無条件エッジを追加します。
workflow.add_edge("tools", "agent")
return workflow.compile()
from langchain_core.runnables import RunnableGenerator
from mlflow.langchain.output_parsers import ChatCompletionsOutputParser
required_tools = 'required'
# システムメッセージが存在する場合、エージェントを作成します
try:
agent_prompt = config.get("agent_prompt")
agent_with_raw_output = create_tool_calling_agent(
llm, tools, tool_choice=required_tools, agent_prompt=agent_prompt
)
except KeyError:
agent_with_raw_output = create_tool_calling_agent(llm, tools)
agent = agent_with_raw_output | RunnableGenerator(wrap_output) | ChatCompletionsOutputParser()
注意
以下のブロックも03_driverで実行する際にはコメントアウトしましょう。
from IPython.display import Image
display(Image(agent.get_graph().draw_mermaid_png()))
エージェントのテスト
エージェントと対話してその出力をテストします。このノートブックでは mlflow.langchain.autolog()
を呼び出しているため、エージェントが実行する各ステップのトレースを確認できます。
# エージェントのための適切なドメイン固有の例でこのプレースホルダー入力例を置き換えます
for event in agent.stream({
"messages": [
{
"role": "user",
"content": "高い売上高に貢献する販売注文、製品、および製品カテゴリの関係は何ですか?"
}
]
}):
print(event, "---" * 20 + "\n")
RagChatCompletionResponse(
{'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': '<tool_call>{\n "id": "call_61e7f39b-18d1-438b-a17d-86320ddb6b4b",\n "name": "graph_qa_tool",\n "arguments": "{\\"__arg1\\": \\"MATCH (n:SalesOrder)-[:CONTAINS]->(p:Product)-[:PART_OF]->(c:ProductCategory) WHERE n.TotalSales > 10000 RETURN n, p, c\\"}"\n}</tool_call>\n\n'}, 'finish_reason': 'stop'}], 'object': 'chat.completion'} ------------------------------------------------------------
> Entering new GraphCypherQAChain chain...
Generated Cypher:
cypher
MATCH (n:SalesOrder)-[:ITEM]->(p:Product)<-[:PRODUCT_CATEGORY]-(c:ProductCategory)
WHERE n.net_amount > 10000
RETURN n, p, c
Full Context:
[{'n': {'tax_amount': 1698, 'sales_order_id': '0500000000', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 13587, 'net_amount': 11888}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 2499, 'product_id': 'MB-1034', 'weight_measure': 12, 'name': 'Mt Discovery Ulti', 'currency': 'USD'}, 'c': {'name': 'Mountain Bike', 'product_category_id': 'MB'}}, {'n': {'tax_amount': 1698, 'sales_order_id': '0500000000', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 13587, 'net_amount': 11888}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 399, 'product_id': 'CB-1161', 'weight_measure': 15, 'name': 'La Plage', 'currency': 'USD'}, 'c': {'name': 'Cruiser', 'product_category_id': 'CB'}}, {'n': {'tax_amount': 1577, 'sales_order_id': '0500000001', 'currency': 'USD', 'sales_org': 'EMEA', 'gross_amount': 12622, 'net_amount': 11044}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 1144, 'product_id': 'CC-1021', 'weight_measure': 8, 'name': 'Cyclone Basic', 'currency': 'USD'}, 'c': {'name': 'Cyclo-cross Bike', 'product_category_id': 'CC'}}, {'n': {'tax_amount': 1577, 'sales_order_id': '0500000001', 'currency': 'USD', 'sales_org': 'EMEA', 'gross_amount': 12622, 'net_amount': 11044}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 2499, 'product_id': 'RC-1056', 'weight_measure': 7, 'name': 'Rennsemmel 2', 'currency': 'USD'}, 'c': {'name': 'Racing Bike', 'product_category_id': 'RC'}}, {'n': {'tax_amount': 1577, 'sales_order_id': '0500000001', 'currency': 'USD', 'sales_org': 'EMEA', 'gross_amount': 12622, 'net_amount': 11044}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 399, 'product_id': 'CB-1161', 'weight_measure': 15, 'name': 'La Plage', 'currency': 'USD'}, 'c': {'name': 'Cruiser', 'product_category_id': 'CB'}}, {'n': {'tax_amount': 1577, 'sales_order_id': '0500000001', 'currency': 'USD', 'sales_org': 'EMEA', 'gross_amount': 12622, 'net_amount': 11044}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 899, 'product_id': 'HB-1175', 'weight_measure': 12, 'name': 'Specifica', 'currency': 'USD'}, 'c': {'name': 'Hybrid Bike', 'product_category_id': 'HB'}}, {'n': {'tax_amount': 5706, 'sales_order_id': '0500000002', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 45655, 'net_amount': 39948}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 299, 'product_id': 'BX-1015', 'weight_measure': 12, 'name': 'BMX Optima', 'currency': 'USD'}, 'c': {'name': 'BMX', 'product_category_id': 'BX'}}, {'n': {'tax_amount': 5706, 'sales_order_id': '0500000002', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 45655, 'net_amount': 39948}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 1144, 'product_id': 'CC-1021', 'weight_measure': 8, 'name': 'Cyclone Basic', 'currency': 'USD'}, 'c': {'name': 'Cyclo-cross Bike', 'product_category_id': 'CC'}}, {'n': {'tax_amount': 5706, 'sales_order_id': '0500000002', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 45655, 'net_amount': 39948}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 1999, 'product_id': 'RC-1055', 'weight_measure': 7, 'name': 'Rennsemmel', 'currency': 'USD'}, 'c': {'name': 'Racing Bike', 'product_category_id': 'RC'}}, {'n': {'tax_amount': 5706, 'sales_order_id': '0500000002', 'currency': 'USD', 'sales_org': 'APJ', 'gross_amount': 45655, 'net_amount': 39948}, 'p': {'quantity_unit': 'EA', 'weight_unit': 'KG', 'price': 1499, 'product_id': 'DB-1081', 'weight_measure': 13, 'name': 'Rooty Basic', 'currency': 'USD'}, 'c': {'name': 'Downhill Bike', 'product_category_id': 'DB'}}]
> Finished chain.
{'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': '<tool_call_result>{\n "id": "call_61e7f39b-18d1-438b-a17d-86320ddb6b4b",\n "content": "{\\"query\\": \\"MATCH (n:SalesOrder)-[:CONTAINS]->(p:Product)-[:PART_OF]->(c:ProductCategory) WHERE n.TotalSales > 10000 RETURN n, p, c\\", \\"result\\": \\"The query returns sales orders with total sales over $10,000, along with the products and product categories associated with these orders. \\\\n\\\\nThere are 3 unique sales orders: \\\\n- \'0500000000\' with a gross amount of $13,587\\\\n- \'0500000001\' with a gross amount of $12,622\\\\n- \'0500000002\' with a gross amount of $45,655\\\\n\\\\nThese orders are associated with the following products and categories:\\\\n- \'Mt Discovery Ulti\' (Mountain Bike)\\\\n- \'La Plage\' (Cruiser)\\\\n- \'Cyclone Basic\' (Cyclo-cross Bike)\\\\n- \'Rennsemmel 2\' (Racing Bike)\\\\n- \'Specifica\' (Hybrid Bike)\\\\n- \'BMX Optima\' (BMX)\\\\n- \'Rennsemmel\' (Racing Bike)\\\\n- \'Rooty Basic\' (Downhill Bike)\\"}"\n}</tool_call_result>\n\n'}, 'finish_reason': 'stop'}], 'object': 'chat.completion'} ------------------------------------------------------------
RagChatCompletionResponse(
{'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': "高い売上高に貢献する販売注文、製品、および製品カテゴリの関係は、販売注文 '0500000000'、'0500000001'、'0500000002' がそれぞれ $13,587、$12,622、$45,655 の売上高を記録しており、これらの注文は 'Mt Discovery Ulti' (Mountain Bike)、'La Plage' (Cruiser)、'Cyclone Basic' (Cyclo-cross Bike)、'Rennsemmel 2' (Racing Bike)、'Specifica' (Hybrid Bike)、'BMX Optima' (BMX)、'Rennsemmel' (Racing Bike)、'Rooty Basic' (Downhill Bike) などの製品と関連していることを示しています。\n\n"}, 'finish_reason': 'stop'}], 'object': 'chat.completion'} ------------------------------------------------------------
トレースでも挙動を確認できます。
mlflow.models.set_model(agent)
次のステップ
上記のセルを再実行して、エージェントを反復してテストできます。
このフォルダーの03_driverノートブックに移動して、エージェントをログ、登録、およびデプロイします。
03_driver: ドライバーノートブック
エージェントフレームワークには、同じフォルダーに3つのノートブックが必要です:
- 02_agent: エージェントを構築するコードが含まれています。
- config.yml: 設定が含まれています。
- 03_driver: エージェントのログ、評価、登録、およびデプロイを行います。
このノートブックは、Mosaic AI Agent Framework (AWS | Azure) を使用して、agent ノートブックで定義されたエージェントをデプロイします。ノートブックは以下を行います:
- エージェントをMLflowにログします
- エージェントをUnity Catalogに登録します
- エージェントをモデルサービングエンドポイントにデプロイします
前提条件
- このフォルダーの02_agentノートブックを確認して実行し、エージェントのコードを確認し、コードを反復し、出力をテストします。
%pip install -U -qqqq databricks-agents mlflow langchain>0.2.16 langgraph-checkpoint>1.0.12 langchain_core langchain-community>0.2.16 langgraph==0.2.74 pydantic databricks_langchain openai langchain_neo4j PyYAML
dbutils.library.restartPython()
import yaml
def read_config(file_path):
try:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
return config
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
config = read_config('./config.yml')
secret_scope = config.get('secret_scope')
catalog = config.get("catalog")
schema = config.get("schema")
agent
をMLflowモデルとしてログする
02_agentノートブックからコードとしてエージェントをログします。詳細は MLflow - Models from Code を参照してください。
# モデルをMLflowにログする
import os
import mlflow
from mlflow.models import ModelConfig
from mlflow.models.resources import DatabricksVectorSearchIndex, DatabricksFunction
config = ModelConfig(development_config="config.yml")
input_example = {
"messages": [
{
"role": "user",
"content": "高い売上高に貢献する販売注文、製品、および製品カテゴリの関係は何ですか?"
}
]
}
host = dbutils.secrets.get(scope=secret_scope, key="neo4j-host")
secret = dbutils.secrets.get(scope=secret_scope, key="neo4j-key")
os.environ["NEO4J_HOST"] = host
os.environ["NEO4J_KEY"] = secret
with mlflow.start_run():
logged_agent_info = mlflow.langchain.log_model(
lc_model=os.path.join(
os.getcwd(),
'02_agent',
),
pip_requirements=[
"langchain>0.2.16",
"langchain-community>0.2.16",
"langgraph-checkpoint>1.0.12",
"langgraph==0.2.74",
"openai",
"pydantic",
"databricks_langchain", # retrieverツールに使用
"langchain_neo4j", # Graph QAツールに使用
],
model_config="config.yml",
artifact_path='agent',
input_example=input_example,
resources=[
DatabricksFunction(function_name=config.get("uc_functions")[0])
],
)
モデルをUnity Catalogに登録する
以下の catalog
、schema
、model_name
を更新して、MLflowモデルをUnity Catalogに登録します。
mlflow.set_registry_uri("databricks-uc")
# TODO: カタログ、スキーマ、およびUCモデルの名前を定義する
model_name = "sap_sales_graph"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"
# モデルをUCに登録する
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)
Registered model 'users.takaaki_yayoi.sap_sales_graph' already exists. Creating a new version of this model...
エージェントをデプロイする
from databricks import agents
# モデルをレビューアプリとモデルサービングエンドポイントにデプロイする
deployment = agents.deploy(
UC_MODEL_NAME,
uc_registered_model_info.version,
scale_to_zero_enabled=True,
environment_vars={
"NEO4J_HOST": f"{{{{secrets/{secret_scope}/neo4j-host}}}}",
"NEO4J_KEY": f"{{{{secrets/{secret_scope}/neo4j-key}}}}"
}
)
しばらくするとエンドポイントが起動するので、AI Playgroundで動作確認します。サンプル質問も試せるようになってました。
次のDatabricks Appsの準備をしておきます。
import yaml
config_path = "./_resources/streamlit/app.yaml"
# YAML構造をPythonの辞書として定義する
data = {
"command": [
"streamlit",
"run",
"chainlink.py"
],
"env": [
{
"name": "DATABRICKS_SERVING_ENDPOINT", "value": deployment.endpoint_name
}
]
}
# 更新されたデータをYAMLファイルに書き込む
with open(config_path, "w") as yaml_file:
yaml.dump(data, yaml_file, default_flow_style=False, sort_keys=False)
次のステップ
このフォルダーの 04_deploy_appノートブックに移動して、Streamlitを使用してチャットインターフェイスをデプロイします。
04_deploy_app: アプリのデプロイ
ノートブック認証を使用してDatabricks SDKを使用します。
%pip install PyYAML
%restart_python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import apps
w = WorkspaceClient()
app_name = "taka-chainlink"
from databricks.sdk.service.apps import App
try:
existing_app = w.apps.get(name=app_name)
except:
existing_app = None
if not existing_app:
print(f'Creating app {app_name}...')
app = App(name=app_name)
w.apps.create_and_wait(app=app)
else:
print(f'App {app_name} already exists.')
Creating app taka-chainlink...
しばらくするとコンピュートがレディになります。
アプリをデプロイします。
import os
from databricks.sdk.service.apps import AppDeployment
app_deployment = AppDeployment(
source_code_path=os.getcwd() + "/_resources/streamlit"
)
print(f'Deploying app {app_name} from source code...')
deployment = w.apps.deploy(
app_name=app_name,
app_deployment=app_deployment
).result()
print('App deployed.')
Deploying app taka-chainlink from source code...
App deployed.
サービングエンドポイントへのアプリ権限の付与
エンドポイントに対してアプリサービスプリンシパルにCAN_QUERY権限を付与する必要があります
import yaml
def read_config(file_path):
try:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
return config
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
config = read_config('./_resources/streamlit/app.yaml')
from databricks.sdk.service.serving import ServingEndpointAccessControlRequest, ServingEndpointPermissionLevel
# サービングエンドポイントIDと権限を定義
# 注: SDKを使用すると権限が上書きされるため、作成者が引き続き権限を持っていることを確認する必要があります
# (このノートブックを実行している人にもアクセスを許可します)
serving_endpoint_id = w.serving_endpoints.get(name=config.get('env')[0]['value']).id
serving_endpoint_creator = w.serving_endpoints.get(name=config.get('env')[0]['value']).creator
existing_app = w.apps.get(name=app_name)
access_control_list = [
ServingEndpointAccessControlRequest(
user_name=serving_endpoint_creator,
permission_level=ServingEndpointPermissionLevel["CAN_MANAGE"]
),
ServingEndpointAccessControlRequest(
user_name=w.current_user.me().user_name,
permission_level=ServingEndpointPermissionLevel["CAN_MANAGE"]
),
ServingEndpointAccessControlRequest(
user_name=existing_app.service_principal_client_id,
permission_level=ServingEndpointPermissionLevel["CAN_QUERY"]
)
]
# サービングエンドポイントに権限を付与
w.serving_endpoints.set_permissions(
serving_endpoint_id=serving_endpoint_id,
access_control_list=access_control_list
)
print(f"Permissions granted successfully for serving endpoint {serving_endpoint_id}.")
Permissions granted successfully for serving endpoint 5952de4209c1430f8fddfe454c5c4bb1.
アプリにアクセスします。
動きました!