2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangfuseのDatabricks連携を試す

Last updated at Posted at 2025-03-07

先日のイベントでLangfuseというサービスのことを伺いました。

LLMアプリケーションをデバッグ、改善するためのトレース、評価、プロンプト管理、メトリクスを提供しているオープンソースのLLMエンジニアリングプラットフォームとのこと。MLflowLangSmithと同じようなモチベーションを持つプラットフォームですね。

調べてみたら、Databricksとも連携しているので試してみます。こちらのノートブックを動かします。

翻訳版はこちらです。

Langfuseを使用したDatabricksモデルの可観測性

Databricksは、大規模な言語モデルをホスティングおよび提供するための強力なプラットフォームを提供します。Databricksの提供エンドポイントとLangfuseを組み合わせることで、開発および本番環境でAIワークロードをトレース、監視、および分析できます。

このノートブックでは、Langfuseを使用してDatabricksモデルを使用する3つの異なる方法を示します:

  1. OpenAI SDK: OpenAI SDKを介してDatabricksモデルエンドポイントを使用します。
  2. LangChain: LangChainパイプラインでDatabricks LLMインターフェースと統合します。
  3. LlamaIndex: LlamaIndex内でDatabricksエンドポイントを使用します。

Databricks Model Servingとは?
Databricks Model Servingは、大規模なモデルを本番環境で提供するための自動スケーリングと堅牢なインフラストラクチャを備えています。また、プライベートデータでLLMを微調整することも可能で、データプライバシーを維持しながら独自の情報を活用できます。

Langfuseとは?
Langfuseは、LLMの可観測性と監視のためのオープンソースプラットフォームです。メタデータ、プロンプトの詳細、トークン使用量、レイテンシーなどをキャプチャすることで、AIアプリケーションをトレースおよび監視するのに役立ちます。

1. 依存関係のインストール

始める前に、Python環境に必要なパッケージをインストールします:

  • openai: OpenAI SDKを介してDatabricksエンドポイントを呼び出すために必要です。
  • databricks-langchain: "OpenAIのような"インターフェースを介してDatabricksエンドポイントを呼び出すために必要です。
  • llama-index および llama-index-llms-databricks: LlamaIndex内でDatabricksエンドポイントを使用するために必要です。
  • langfuse: Langfuseプラットフォームにトレースデータを送信するために必要です。
%pip install openai langfuse databricks-langchain llama-index llama-index-llms-databricks
%restart_python

2. 環境変数の設定

LangfuseのクレデンシャルとDatabricksのクレデンシャルを環境変数として設定します。以下のダミーキーをそれぞれのアカウントから取得した実際のキーに置き換えてください。

  • LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY: Langfuseプロジェクト設定から取得します。
  • LANGFUSE_HOST: https://cloud.langfuse.com(EUリージョン)または https://us.cloud.langfuse.com(USリージョン)。
  • DATABRICKS_TOKEN: Databricksのパーソナルアクセストークン。
  • DATABRICKS_HOST: DatabricksワークスペースのURL(例:https://dbc-xxxxxxx.cloud.databricks.com)。

Langfuseにサインアップして、Organization、Projectを作成することでAPIキーを取得することができます。Secret KeyとPublic Keyをコピーしておきます。

Screenshot 2025-03-07 at 9.37.32.png

import os

# 環境変数の例(実際のキー/トークンに置き換えてください)
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."  # あなたのパブリックキー
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."  # あなたのシークレットキー
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"  # または https://us.cloud.langfuse.com

os.environ["DATABRICKS_TOKEN"] = "dapi..."  # Databricks パーソナルアクセストークン
os.environ["DATABRICKS_HOST"] = "https://.....cloud.databricks.com/"

アプローチ1: OpenAI SDKを介したDatabricksモデルの使用

Databricksエンドポイントは、OpenAI APIの代替として機能できます。これにより、openaiライブラリに依存する既存のコードと簡単に統合できます。内部では、langfuse.openai.OpenAIがリクエストを自動的にLangfuseにトレースします。

手順

  1. langfuse.openaiからOpenAIクライアントをインポートします。
  2. クライアントを作成し、api_keyにDatabricksトークンを、base_urlにDatabricksワークスペースエンドポイントを設定します。
  3. クライアントのchat.completions.create()メソッドを使用してプロンプトを送信します。
  4. Langfuseダッシュボードでトレースを確認します。

注: Langfuseを使用したOpenAIのトレースに関する詳細な例については、OpenAI統合ドキュメントを参照してください。

# Langfuse OpenAI クライアント
from langfuse.openai import OpenAI

# 環境変数を取得
databricks_token = os.environ.get("DATABRICKS_TOKEN")
databricks_host = os.environ.get("DATABRICKS_HOST")

# Databricks を指す OpenAI ライクなクライアントを作成
client = OpenAI(
    api_key=databricks_token,  # Databricks パーソナルアクセストークン
    base_url=f"{databricks_host}/serving-endpoints",  # あなたの Databricks ワークスペース
)
response = client.chat.completions.create(
    messages=[
        {"role": "system", "content": "あなたはAIアシスタントです。"},
        {"role": "user", "content": "Databricksとは何ですか?"}
    ],
    model="databricks-meta-llama-3-3-70b-instruct",  # Databricksモデルサービングエンドポイント名に基づいて調整
    max_tokens=256
)

# モデルからの応答を表示
display(response.choices[0].message.content)
'Databricks は、Apache Spark をベースにした、データエンジニアリング、データサイエンス、データ分析のためのクラウドベースのプラットフォームです。Databricks は、データの統合、処理、分析を容易にし、データ駆動型の意思決定を促進することを目的としています。\n\nDatabricks の主な機能には、以下のものがあります。\n\n1. Apache Spark: Databricks は Apache Spark を基盤としており、Spark のすべての機能を利用できます。\n2. クラウドネイティブ: Databricks はクラウドベースのプラットフォームであり、AWS、Azure、Google Cloud Platform などの主要なクラウドプロバイダーで利用できます。\n3. データレイク: Databricks では、データレイクを使用して構造化されたデータと非構造化されたデータを統合できます。\n4. ノートブック: Databricks では、Python、R、SQL などの言語を使用したノートブックを使用してコードを書き、データを分析できます。\n5. コラボレーション: Databricks'

Langfuseのダッシュボードを確認するとトレースを確認できます。プロンプト、応答、レイテンシ、トークン使用量などの詳細が表示されます。

Screenshot 2025-03-07 at 9.41.32.png

アプローチ2: LangChainの使用

DatabricksモデルはLangChainを介しても使用できます。ChatDatabricksクラスは、Databricksのモデルサービングエンドポイントをラップします。

手順

  1. 環境変数としてDATABRICKS_HOSTを設定します。
  2. トレースデータを自動的に収集するLangfuseのCallbackHandlerを初期化します。
  3. エンドポイント名、温度、その他のパラメータを使用してChatDatabricksを使用します。
  4. メッセージでモデルを呼び出し、Langfuseのコールバックハンドラーを渡します。
  5. Langfuseダッシュボードでトレースを確認します。

注: Langfuseを使用したLangChainのトレースに関する詳細な例については、LangChain統合ドキュメントを参照してください。

from langfuse.callback import CallbackHandler

# Langfuseコールバックハンドラーを初期化
langfuse_handler = CallbackHandler(
    secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
    public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
    host=os.environ.get("LANGFUSE_HOST")
)
from databricks_langchain import ChatDatabricks

chat_model = ChatDatabricks(
    endpoint="databricks-meta-llama-3-3-70b-instruct",   # Databricksモデルサービングエンドポイント名
    temperature=0.1,
    max_tokens=256,
    # 他のパラメータをここに追加できます
)

# プロンプトをシステム/ユーザーメッセージのリストとして構築
messages = [
    ("system", "あなたはDatabricksに関する質問に答えることができるチャットボットです。"),
    ("user", "Databricks Model Servingとは何ですか?")
]

# LangChainの.invoke()メソッドを使用してモデルを呼び出す
chat_model.invoke(messages, config={"callbacks": [langfuse_handler]})
AIMessage(content='Databricks Model Servingは、機械学習モデルを簡単にデプロイして管理できるようにする、Databricksのクラウドベースのサービスです。データ サイエンティストとエンジニアは、Databricks Model Serving を使用して、トレーニング済みのモデルを簡単にデプロイして、リアルタイムの予測とバッチ予測のために REST エンドポイントとして公開できます。モデルは、Databricks のマネージド サービスとして自動的にホストされ、スケーリングされ、セキュリティが保護されます。Databricks Model Serving を使用すると、モデルを迅速にデプロイして、ビジネス上の意思決定を支援する予測を生成できます。', additional_kwargs={}, response_metadata={'prompt_tokens': 44, 'completion_tokens': 170, 'total_tokens': 214}, id='run-f62a4015-0dd5-44ae-a627-be83abd34baf-0')

コードを実行した後、Langfuseダッシュボードを開いて記録された会話を確認します。

Screenshot 2025-03-07 at 9.43.01.png

アプローチ3: LlamaIndexの使用

データの取り込み、インデックス作成、またはRAGにLlamaIndexを使用する場合、デフォルトのLLMをDatabricksエンドポイントに置き換えることができます。

手順

  1. llama_index.llms.databricksからDatabricksをインポートします。
  2. エンドポイント名とDatabricksの認証情報を使用してDatabricks LLMを初期化します。
  3. langfuse.llama_indexからLlamaIndexInstrumentorを使用して自動トレースを有効にします。
  4. チャットリクエストでLLMを呼び出します。
  5. Langfuseダッシュボードでトレースを確認します。

注: Langfuseを使用したLlamaIndexのトレースに関する詳細な例については、LlamaIndex統合ドキュメントを参照してください。

from llama_index.llms.databricks import Databricks

# Databricks LLMインスタンスを作成
llm = Databricks(
    model="databricks-meta-llama-3-3-70b-instruct",  # Databricksモデルサービングエンドポイント名
    api_key=os.environ.get("DATABRICKS_TOKEN"),
    api_base=f"{os.environ.get('DATABRICKS_HOST')}/serving-endpoints/"
)
from langfuse.llama_index import LlamaIndexInstrumentor
from llama_index.core.llms import ChatMessage

# LlamaIndexの操作をトレースするためにLlamaIndexInstrumentorを初期化
instrumentor = LlamaIndexInstrumentor(
    secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
    public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
    host=os.environ.get("LANGFUSE_HOST")
)

# 自動トレースを開始
instrumentor.start()
messages = [
    ChatMessage(role="system", content="あなたは役に立つアシスタントです。"),
    ChatMessage(role="user", content="Databricksとは何ですか?")
]

response = llm.chat(messages)
display(response)

# Flush any pending events to Langfuse
instrumentor.flush()
ChatResponse(message=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='Databricksは、Apache Sparkをベースにしたデータエンジニアリング、データサイエンス、データ分析のためのクラウドベースのプラットフォームです。Databricksは、データレイクハウス、データウェアハウス、リアルタイムデータ処理、機械学習などの機能を提供し、ビッグデータの処理と分析を効率化することを目的としています。')]), raw=ChatCompletion(id='chatcmpl_c5974af8-6da9-4787-83d8-31a2b0219a07', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='Databricksは、Apache Sparkをベースにしたデータエンジニアリング、データサイエンス、データ分析のためのクラウドベースのプラットフォームです。Databricksは、データレイクハウス、データウェアハウス、リアルタイムデータ処理、機械学習などの機能を提供し、ビッグデータの処理と分析を効率化することを目的としています。', refusal=None, role='assistant', audio=None, function_call=None, tool_calls=None))], created=1741304951, model='meta-llama-3.3-70b-instruct-121024', object='chat.completion', service_tier=None, system_fingerprint=None, usage=CompletionUsage(completion_tokens=93, prompt_tokens=35, total_tokens=128, completion_tokens_details=None, prompt_tokens_details=None)), delta=None, logprobs=None, additional_kwargs={'prompt_tokens': 35, 'completion_tokens': 93, 'total_tokens': 128})

LlamaIndexの呼び出しを確認するためにLangfuseにログインし、プロンプト、トークン使用量、完了データなどの詳細を確認できます。

Screenshot 2025-03-07 at 9.45.46.png

そして、Langfuseのダッシュボードでは様々なメトリクスを概観することができます。これは便利。

Screenshot 2025-03-07 at 10.11.43.png
Screenshot 2025-03-07 at 10.11.53.png

トレースの強化 (オプション)

Langfuseは、より豊富なトレースデータのための追加機能をサポートしています:

詳細については、Langfuseのドキュメントを参照してください。

次のステップ

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?