0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データブリックスでRAG、生成AI、AIエージェントを駆使して作るチャットボット

Posted at

NVIDIA-Certified Associate: Multimodal Generative AIの資格習得に向けて勉強中なのですが、暑さのせいかモチベーションが上がらずにデータブリックスに手を出してしまいました。

本音を言うと自分のデータエンジニア経験の要素が乏しく、ずっと克服したいと思ってました。

データブリックスでのゴール

➀ 数百万件のリクエストを処理できるようにする。
(Kafakaを使ったMicro Batchingは考慮しない)
➁ RAGやAIエージェントを使いGeminiやChatGPTみたいなシステムの構築

上記の2つをゴールに頑張っていきます。

データブリックスでプロンプトを入れる方法
(スケーリングをしたいなら、Kafakaを使ったMicro Batchingをお勧め)

dbutils.widgets.text("prompt", "", "Enter your prompt here")
user_prompt = dbutils.widgets.get("prompt")

データブリックスの構造

Medallion Architecture:メダリオン構造
生のデータ(ユーザーのプロンプト)をブロンズ、シルバー、ゴールドレイヤーを通し、データの保存、クリーニング及び変換、結果を保存する構造の事です。
これにより大量のリクエストを処理しやすくなります。

ブロンズレイヤー

ブロンズレイヤーは生のデータの保存が目的です。

# pysparkはデータブリックスがディフォルトで提供している
from pyspark.sql.functions import current_timestamp, lit 

def set_prompt_in_bronze_table(user_prompt: str) -> None:
   bronze_table_path = '好きなPath'
   
   # データフレームをタイムスタンプとプロンプトで作成
   prompt_df = spark.createDataFrame(
       [
           (user_prompt,)
       ],
       ["prompt_text"]
   ).withColumn("ingestion_timestamp", current_timestamp())

   # ここでデータを保存してます。
   prompt_df.write \
       .format("delta") \
       .mode("append") \
       .save(bronze_table_path)

シルバーレイヤー

シルバーレイヤーはデータのクリーニングや加工を目的としてます。
ここのレイヤーでプロンプトを変換して、RAGを使いたいと思います。
ChromaはSqllightを使っている為、ディストリビューションシステムに対応してません。
大量のデータを処理する為にディストリビューションシステムをデータブリックスが採用。
Chromaは自身のPC内などの限られたエリアで使う事を目的に作られている。

RAG用ベクターサーチの作成

# 好きなように変えて大丈夫:こちらのデータがRAGとして使われます
financial_documents = [
       "The 2024 annual report for Company A shows a net profit of $50 million, a 15% increase from the previous year.",
       "Company B's stock price experienced a significant drop following the announcement of new government regulations on its industry.",
       "Economic forecasts predict a slight recession in the third quarter of 2025, which may affect consumer spending.",
       "The Q1 2024 earnings call for Company C highlighted a strategic shift towards renewable energy investments.",
       "A recent market analysis suggests that the technology sector will see moderate growth over the next five years."
   ]

# データタイプを作成
schema = StructType([
   StructField("id", StringType(), True),
   StructField("text", StringType(), True)
])

# 好きなように変えて大丈夫
root_location = "abfss://YOUR_PATH.dfs.core.windows.net/YOUR_NUM"
catalog_name = "rag"
schema_name = "finance"
index_name = "rag.finance.finance_rag"
endpoint_name = "finance_rag"

# doc_1、doc_2などを作成
ids = [f"doc_{i}" for i in range(len(financial_documents))]
data = list(zip(ids, financial_documents))

# データフレームの作成、ベクターサーチ用に保存:この後データブリックスで変更する必要がある
spark_df = spark.createDataFrame(data, schema)
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"
spark.sql(
 f"CREATE CATALOG IF NOT EXISTS {catalog_name} MANAGED LOCATION '{root_location}'"
)
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")
spark_df.write.mode("overwrite").option("delta.enableChangeDataFeed", "true").saveAsTable(full_table_name)

RAGの作成

# データブリックスのベクターサーチを起動
vsc = VectorSearchClient(
   workspace_url="データブリックスのホームのURL",
   personal_access_token=get_databricks_toke()
)

try:
   # データブリックスのベクターサーチは 1024 dims embeddingを使用.
   query_embedding = model.encode([query_text]).tolist()[0]
except Exception as e:
   print(f"Error generating embedding: {e}")
   return ""
   
# RAG用のデータの検索
num_results = 2
try:
   results = vsc.get_index(
       endpoint_name=endpoint_name,
       index_name=index_name
   ).similarity_search(
       query_vector=query_embedding,
       columns=["text"],
       num_results=num_results,
   )
except Exception as e:
   print(f"Error performing search: {e}")
   return ""

similarity_threshold = 0.5
# 関係性が低いなら削除
retrieved_documents = [
   d[0] for d in results.get('result', {}).get('data_array', [])
   if d[1] >= similarity_threshold
]
context = "\n\n".join(retrieved_documents)
retunr context

Z-Ordering

Z-Orderingはデータをまとめることで大量にある小さなデータを検索せずに素早く検索できるようにする技術。
基本的にシルバーレイヤーで使う事が多い。ベクターサーチ用のデータフレームはすでに最適化されており、Z-Orderingを使う必要はない。

spark.sql(f"OPTIMIZE delta.`{silver_table_path}` ZORDER BY (text_embedding)")

ゴールドーレイヤー

ゴールドーレイヤーは結果を保存したり、分析したりするのが目的。
今回はAIエージェントとRAGを組み合わせた結果を保存するのを目的。

def search_web_news(query: str) -> str:
    """Performs a DuckDuckGo news search for the given query, daily results, max 2."""
    wrapper = DuckDuckGoSearchAPIWrapper(time="d", max_results=2)
    engine = DuckDuckGoSearchResults(api_wrapper=wrapper, backend="news")
    return engine.invoke(f"{query}")
    
tool_search_news = Tool(
    name="news_search",
    func=search_web_news,
    description="Tool to perform a DuckDuckGo news search. "
                "Useful for current events or recent information. "
                "Input should be a search query string. Returns up to 2 news results."
)
   
tools = [tool_search_news]

prompt_template = ChatPromptTemplate.from_messages([
        ("system", """
        You are a highly knowledgeable finance chatbot.
        Your purpose is to provide accurate, insightful,
        and actionable financial advice to users,
        tailored to their specific needs and contexts.

        Available tools: {tools}
        Available tool names: {tool_names}

        Additional context for your answer: {rag_context}
        
        Responses should always follow this format:
        Question: The question the user wants to answer
        Thought: Think about what to do to answer the question
        Action: The tool to use (must be one of the available tools)
        Action Input: Input to the tool
        Observation: Result of the tool
    
        (Thought/Action/Action Input/Observation can be repeated
        as many times as necessary to answer the question)
        Thought: Determine that it's time to provide the final answer to the user
        Final Answer: The final answer to the user
        """),

        ("user", "Analyze this company's finance status: user_prompt}\n{agent_scratchpad}")
    ])

agent = create_react_agent(llm_agent.llm, tools, prompt_template)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,  # Keep verbose=True to ensure intermediate steps are printed to stdout
    handle_parsing_errors=True
)

print("\n--- Agent's Reasoning Process ---")
result = agent_executor.invoke({"user_prompt": user_prompt, "rag_context": rag_context})
llm_response = result['output']
final_prompt = result['intermediate_steps'][-1]
print("\n--- Final Answer ---")
print(result['output'])
print("\n" + "=" * 80 + "\n")

gold_table_path = '好きなPath'
gold_df = spark.createDataFrame(
    [(user_prompt, rag_context, final_prompt, llm_response, current_timestamp())],
    ["user_query", "retrieved_context", "final_llm_prompt", "llm_response", "timestamp"]
)    
gold_df.write \
    .format("delta") \
    .mode("append") \
    .save(gold_table_path)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?