これまでの記事でも、DatabricksでMLflow Tracingが動いてることにさらっと触れてました。
そして、先人がすでに試されています。
マニュアルも整理されていました。
そういった中、自分自身コンセプトをよく理解できていなかったので、公式ドキュメントを翻訳しました。トレースの必要性、スパンの概念も勉強になりました。
これらを踏まえて、あらためて段階を踏んでMLflow TracingをDatabricksで動かします。マニュアルの流れに添いますが、一部こちらのコードもお借りしています。Databricks MLランタイム16.1で試しています。
MLflow Tracingをインストール
%pip install mlflow==2.18.0 langchain==0.3.0 langchain-openai==0.2.9
%pip install -U openai
%restart_python
LangChain / LangGraph 自動トレーシング
LangChain の自動ロギング統合の一環として、チェーンの呼び出し API を呼び出すと、トレースがアクティブな MLflow 実験に記録されます。mlflow.langchain.autolog()
関数を呼び出すことで、LangChain および LangGraph のトレーシングを有効にできます。
import mlflow
mlflow.langchain.autolog()
import os
os.environ["OPENAI_API_KEY"] = dbutils.secrets.get(scope="demo-token-takaaki.yayoi", key="openai_api_key")
import mlflow
import os
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# LangChainの自動ログを有効にすると、トレースログが有効になります。
mlflow.langchain.autolog()
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7, max_tokens=1000)
prompt_template = PromptTemplate.from_template(
"質問に{person}として答えてください。彼らのスタイル、ウィット、個性、話し方の癖を完全に体現してください。"
"彼らの癖や特徴を最大限に模倣し、建設的でない場合や無害でない場合でも、彼らの特性を受け入れてください。質問は: {question}"
)
chain = prompt_template | llm | StrOutputParser()
# 別の呼び出しをテストしましょう
chain.invoke(
{
"person": "リーナス・トーバルズ",
"question": "みんなのアクセスをsudoに設定して簡単にすることはできますか?",
}
)
'ああ、素晴らしい質問ですね!まず第一に、sudoはシステム管理において非常に強力なツールです。みんなが簡単にアクセスできるように設定するというのは、まさに「みんなにおもちゃを与える」ようなものです。たしかに、そうすることで便利になるかもしれませんが、それはまるで火を扱う子供と同じです。最初は楽しいかもしれませんが、やがて大火事になるリスクがあります。\n\nなので、もし「みんな」がsudoを使えるようにしたいのであれば、まずは慎重に考えましょう。具体的には、特定のグループに対してsudo権限を付与するのが良い方法です。たとえば、`/etc/sudoers`ファイルを編集して、特定のユーザーグループに権限を与えることができます。これにより、全員が無制限にsudoを使えるわけではなく、必要な人だけがアクセスできるようになります。\n\nもちろん、最終的にはリスクと利便性のバランスを取ることが重要です。簡単にすることはできますが、その結果としてシステムの安全性が損なわれることがあるので、その点はしっかり考えてください。Linuxは自由ですが、その自由には責任が伴います。さて、これであなたの質問に答えられたでしょうか?もし他に聞きたいことがあれば、遠慮なくどうぞ。'
チェーンのトレースが表示されます。
Fluent APIを使用して、エージェントにトレースを手動で追加する
MLflowのFluent APIでは、コードの実行フローに基づいて、自動的にトレースの階層が生成されます。
関数を修飾する
@mlflow.trace
デコレーターを使用して、デコレートされた関数のスコープからなるスパンを作成します。
@mlflow.trace(name="agent", span_type="TYPE", attributes={"key": "value"})
def my_function(x, y):
return x + y
関数を呼び出すことで、スパンに対するトレースが取得できます。
my_function(5, 3)
8
トレースコンテキストマネージャーの使用
関数だけではなく、任意のコードブロックのスパンを作成するには、コードブロックをラッピングするトレースコンテキストマネージャとしてmlflow.start_span()
を使うことができます。スパンの入出力はスパンのsetterメソッドで提供します。
x = 3
y = 4
with mlflow.start_span("my_span") as span:
span.set_inputs({"x": x, "y": y})
result = x + y
span.set_outputs(result)
span.set_attribute("key", "value")
外部関数をラップする
外部ライブラリの関数をトレースするには、関数をmlflow.trace
でラッピングします。
from sklearn.metrics import accuracy_score
y_pred = [0, 2, 1, 3]
y_true = [0, 1, 2, 3]
traced_accuracy_score = mlflow.trace(accuracy_score)
traced_accuracy_score(y_true, y_pred)
Fluent API の例
以下の例では、Fluent APIのmlflow.trace
とmlflow.start_span
を使用してquickstart-agent
をトレースする方法を示しています。
import mlflow
from mlflow.deployments import get_deploy_client
class QAChain(mlflow.pyfunc.PythonModel):
def __init__(self):
self.client = get_deploy_client("databricks")
@mlflow.trace(name="quickstart-agent")
def predict(self, model_input, system_prompt, params):
messages = [
{
"role": "system",
"content": system_prompt,
},
{
"role": "user",
"content": model_input[0]["query"]
}
]
traced_predict = mlflow.trace(self.client.predict)
output = traced_predict(
endpoint=params["model_name"],
inputs={
"temperature": params["temperature"],
"max_tokens": params["max_tokens"],
"messages": messages,
},
)
with mlflow.start_span(name="_final_answer") as span:
# 別のスパン生成を開始
span.set_inputs({"query": model_input[0]["query"]})
answer = output["choices"][0]["message"]["content"]
span.set_outputs({"generated_text": answer})
# 実行時に計算された属性はset_attributes()メソッドを使用して設定できます。
span.set_attributes({
"model_name": params["model_name"],
"prompt_tokens": output["usage"]["prompt_tokens"],
"completion_tokens": output["usage"]["completion_tokens"],
"total_tokens": output["usage"]["total_tokens"]
})
return answer
トレースは、呼び出しメソッドpredict()
を実行すると自動的に表示されます。
SYSTEM_PROMPT = """
あなたはDatabricksユーザーのためのアシスタントです。Python、コーディング、SQL、データエンジニアリング、Spark、データサイエンス、DWとプラットフォーム、API、またはインフラストラクチャ管理に関連するDatabricksの質問に答えます。これらのトピックに関連しない質問については、回答を控えてください。答えがわからない場合は、知らないと言ってください。答えを作り上げようとしないでください。答えはできるだけ簡潔にしてください。以下のコンテキストを使用して質問に答えてください:
"""
model = QAChain()
prediction = model.predict(
[
{"query": "MLflow 5.0には何が含まれていますか"},
],
SYSTEM_PROMPT,
{
# テストを容易にするためにDatabricks Foundation Modelを使用しています。必要に応じて置き換えてください。
"model_name": "databricks-dbrx-instruct",
"temperature": 0.1,
"max_tokens": 1000,
}
)
MLflowクライアントAPI
Fluent APIでは要件を満たせない、きめ細かいトレースが必要な際にはMLflowClient
のAPIを使用します。
from mlflow.client import MlflowClient
mlflow_client = MlflowClient()
root_span = mlflow_client.start_trace(
name="simple-rag-agent",
inputs={
"query": "デモ",
"model_name": "DBRX",
"temperature": 0,
"max_tokens": 200
}
)
request_id = root_span.request_id
# クエリに類似したドキュメントを取得
similarity_search_input = dict(query_text="デモ", num_results=3)
span_ss = mlflow_client.start_span(
"search",
# スパンをトレースの正しい位置に作成するためにrequest_idとparent_idを指定
request_id=request_id,
parent_id=root_span.span_id,
inputs=similarity_search_input
)
retrieved = ["テスト結果"]
# スパンを明示的に終了する必要があります
mlflow_client.end_span(request_id, span_id=span_ss.span_id, outputs=retrieved)
mlflow_client.end_trace(request_id, outputs={"output": retrieved})
トレースのレビュー
- トレースの結果はノートブックにインラインでレンダリングされます。
- トレースはMLflowエクスペリメントに記録されます。エクスペリメントページのトレースタブでトレースの履歴を確認、検索できます。アクティブなMLflowランの下でエージェントを実行すると、実行ページにトレースが表示されます。
以下はエクスペリメントページに表示されるトレースです。
本番運用でのトレースの活用
Databricks上のMLflow TracingはMosaic AIモデルサービングとも統合されているので、サービングエンドポイントでMLflowトレースが有効化されていると、トレースが推論テーブルに記録されるようになります。