NVIDIA-Certified Associate: Multimodal Generative AIの資格習得に向けて勉強中なのですが、暑さのせいかモチベーションが上がらずにデータブリックスに手を出してしまいました。
本音を言うと自分のデータエンジニア経験の要素が乏しく、ずっと克服したいと思ってました。
データブリックスでのゴール
➀ 数百万件のリクエストを処理できるようにする。
(Kafakaを使ったMicro Batchingは考慮しない)
➁ RAGやAIエージェントを使いGeminiやChatGPTみたいなシステムの構築
上記の2つをゴールに頑張っていきます。
データブリックスでプロンプトを入れる方法
(スケーリングをしたいなら、Kafakaを使ったMicro Batchingをお勧め)
dbutils.widgets.text("prompt", "", "Enter your prompt here")
user_prompt = dbutils.widgets.get("prompt")
データブリックスの構造
Medallion Architecture:メダリオン構造
生のデータ(ユーザーのプロンプト)をブロンズ、シルバー、ゴールドレイヤーを通し、データの保存、クリーニング及び変換、結果を保存する構造の事です。
これにより大量のリクエストを処理しやすくなります。
ブロンズレイヤー
ブロンズレイヤーは生のデータの保存が目的です。
# pysparkはデータブリックスがディフォルトで提供している
from pyspark.sql.functions import current_timestamp, lit
def set_prompt_in_bronze_table(user_prompt: str) -> None:
bronze_table_path = '好きなPath'
# データフレームをタイムスタンプとプロンプトで作成
prompt_df = spark.createDataFrame(
[
(user_prompt,)
],
["prompt_text"]
).withColumn("ingestion_timestamp", current_timestamp())
# ここでデータを保存してます。
prompt_df.write \
.format("delta") \
.mode("append") \
.save(bronze_table_path)
シルバーレイヤー
シルバーレイヤーはデータのクリーニングや加工を目的としてます。
ここのレイヤーでプロンプトを変換して、RAGを使いたいと思います。
ChromaはSqllightを使っている為、ディストリビューションシステムに対応してません。
大量のデータを処理する為にディストリビューションシステムをデータブリックスが採用。
Chromaは自身のPC内などの限られたエリアで使う事を目的に作られている。
RAG用ベクターサーチの作成
# 好きなように変えて大丈夫:こちらのデータがRAGとして使われます
financial_documents = [
"The 2024 annual report for Company A shows a net profit of $50 million, a 15% increase from the previous year.",
"Company B's stock price experienced a significant drop following the announcement of new government regulations on its industry.",
"Economic forecasts predict a slight recession in the third quarter of 2025, which may affect consumer spending.",
"The Q1 2024 earnings call for Company C highlighted a strategic shift towards renewable energy investments.",
"A recent market analysis suggests that the technology sector will see moderate growth over the next five years."
]
# データタイプを作成
schema = StructType([
StructField("id", StringType(), True),
StructField("text", StringType(), True)
])
# 好きなように変えて大丈夫
root_location = "abfss://YOUR_PATH.dfs.core.windows.net/YOUR_NUM"
catalog_name = "rag"
schema_name = "finance"
index_name = "rag.finance.finance_rag"
endpoint_name = "finance_rag"
# doc_1、doc_2などを作成
ids = [f"doc_{i}" for i in range(len(financial_documents))]
data = list(zip(ids, financial_documents))
# データフレームの作成、ベクターサーチ用に保存:この後データブリックスで変更する必要がある
spark_df = spark.createDataFrame(data, schema)
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"
spark.sql(
f"CREATE CATALOG IF NOT EXISTS {catalog_name} MANAGED LOCATION '{root_location}'"
)
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")
spark_df.write.mode("overwrite").option("delta.enableChangeDataFeed", "true").saveAsTable(full_table_name)
RAGの作成
# データブリックスのベクターサーチを起動
vsc = VectorSearchClient(
workspace_url="データブリックスのホームのURL",
personal_access_token=get_databricks_toke()
)
try:
# データブリックスのベクターサーチは 1024 dims embeddingを使用.
query_embedding = model.encode([query_text]).tolist()[0]
except Exception as e:
print(f"Error generating embedding: {e}")
return ""
# RAG用のデータの検索
num_results = 2
try:
results = vsc.get_index(
endpoint_name=endpoint_name,
index_name=index_name
).similarity_search(
query_vector=query_embedding,
columns=["text"],
num_results=num_results,
)
except Exception as e:
print(f"Error performing search: {e}")
return ""
similarity_threshold = 0.5
# 関係性が低いなら削除
retrieved_documents = [
d[0] for d in results.get('result', {}).get('data_array', [])
if d[1] >= similarity_threshold
]
context = "\n\n".join(retrieved_documents)
retunr context
Z-Ordering
Z-Orderingはデータをまとめることで大量にある小さなデータを検索せずに素早く検索できるようにする技術。
基本的にシルバーレイヤーで使う事が多い。ベクターサーチ用のデータフレームはすでに最適化されており、Z-Orderingを使う必要はない。
spark.sql(f"OPTIMIZE delta.`{silver_table_path}` ZORDER BY (text_embedding)")
ゴールドーレイヤー
ゴールドーレイヤーは結果を保存したり、分析したりするのが目的。
今回はAIエージェントとRAGを組み合わせた結果を保存するのを目的。
def search_web_news(query: str) -> str:
"""Performs a DuckDuckGo news search for the given query, daily results, max 2."""
wrapper = DuckDuckGoSearchAPIWrapper(time="d", max_results=2)
engine = DuckDuckGoSearchResults(api_wrapper=wrapper, backend="news")
return engine.invoke(f"{query}")
tool_search_news = Tool(
name="news_search",
func=search_web_news,
description="Tool to perform a DuckDuckGo news search. "
"Useful for current events or recent information. "
"Input should be a search query string. Returns up to 2 news results."
)
tools = [tool_search_news]
prompt_template = ChatPromptTemplate.from_messages([
("system", """
You are a highly knowledgeable finance chatbot.
Your purpose is to provide accurate, insightful,
and actionable financial advice to users,
tailored to their specific needs and contexts.
Available tools: {tools}
Available tool names: {tool_names}
Additional context for your answer: {rag_context}
Responses should always follow this format:
Question: The question the user wants to answer
Thought: Think about what to do to answer the question
Action: The tool to use (must be one of the available tools)
Action Input: Input to the tool
Observation: Result of the tool
(Thought/Action/Action Input/Observation can be repeated
as many times as necessary to answer the question)
Thought: Determine that it's time to provide the final answer to the user
Final Answer: The final answer to the user
"""),
("user", "Analyze this company's finance status: user_prompt}\n{agent_scratchpad}")
])
agent = create_react_agent(llm_agent.llm, tools, prompt_template)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Keep verbose=True to ensure intermediate steps are printed to stdout
handle_parsing_errors=True
)
print("\n--- Agent's Reasoning Process ---")
result = agent_executor.invoke({"user_prompt": user_prompt, "rag_context": rag_context})
llm_response = result['output']
final_prompt = result['intermediate_steps'][-1]
print("\n--- Final Answer ---")
print(result['output'])
print("\n" + "=" * 80 + "\n")
gold_table_path = '好きなPath'
gold_df = spark.createDataFrame(
[(user_prompt, rag_context, final_prompt, llm_response, current_timestamp())],
["user_query", "retrieved_context", "final_llm_prompt", "llm_response", "timestamp"]
)
gold_df.write \
.format("delta") \
.mode("append") \
.save(gold_table_path)