1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Streamlit in Snowflakeを利用してcortex AIによるRAGを作成し、チャットボットで遊んでみた。

Posted at

はじめに

今回はsnowflakeの機能である「Streamlit in Snowflake」を利用して、Cortex AIによるRAG(Retrieval-Augmented Generation)を作成し、チャットボットとして利用してみました。

Streamlit in Snowflakeとは

Streamlit はオープンソースのPythonライブラリで、機械学習やデータサイエンスのためのカスタムウェブアプリを簡単に作成・共有できます。Streamlitを使用することで、強力なデータアプリケーションを迅速に構築し、展開することができます。オープンソースライブラリの詳細については、 Streamlit Libraryのドキュメンテーション をご参照ください。

Streamlit in Snowflake は、開発者がSnowflakeのデータクラウド上でStreamlitアプリを安全に構築、展開、共有するのに役立ちます。 Streamlit in Snowflake を使用すると、データやアプリケーションコードを外部システムに移動することなく、Snowflake のデータを処理して使用するアプリケーションを構築できます。

https://docs.snowflake.com/ja/developer-guide/streamlit/about-streamlit より引用

実際に作ってみた

事前準備

用意するもの
・RAG用のデータセット
・snowflakeで利用するデータベース、ウェアハウスなどのリソース
・streamlitを利用できるsnowflake環境

注意:snowflakeのリソース利用については料金がかかります。

RAG用のデータセット

今回はkaggleで公開されているオープンデータセットを利用します。

image.png
https://www.kaggle.com/datasets/elvinrustam/books-dataset/data より引用

snowflakeで利用するデータベース、ウェアハウスなどのリソース

1. snowsightで利用できるSQLワークシートにて以下のクエリでデータベースとウェアハウスを作成していきます。

create_db_wh.sql
-- 基本リソース設定
-- データベース作成
CREATE DATABASE IF NOT EXISTS test_db;
-- ウェアハウス作成
CREATE OR REPLACE WAREHOUSE test_wh WITH
    WAREHOUSE_SIZE='X-SMALL'
    AUTO_SUSPEND = 120
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED=TRUE;
-- 利用するウェアハウスを設定
USE WAREHOUSE test_wh;

2. 次にデータセットをsnowflake上にアップロードするためのステージを作成していきます。

create_stage.sql
-- ステージ作成
CREATE OR REPLACE STAGE test_stage
    DIRECTORY = (ENABLE = TRUE)
    ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

3. 用意したデータセットを先ほど作成したステージにアップロードし、テーブルにロードしていきます。
・ステージにファイルをアップロード
image.png
・テーブルに名前を付けてロード

1番上の行をヘッダーにする設定をすること

image.png

ロード確認画面にて、スペースや括弧の含まれるカラムの設定に注意

4. テキストをチャンクするpython UDFを作成していきます。

python_udf.sql
CREATE OR REPLACE FUNCTION test_db.public.books_chunk(
    description string, title string, authors string, category string, publisher string
)
    returns table (chunk string, title string, authors string, category string, publisher string)
    language python
    runtime_version = '3.9'
    handler = 'text_chunker'
    packages = ('snowflake-snowpark-python','langchain')
    as
$$
from langchain.text_splitter import RecursiveCharacterTextSplitter
import copy
from typing import Optional

class text_chunker:

    def process(self, description: Optional[str], title: str, authors: str, category: str, publisher: str):
        if description == None:
            description = "" # handle null values

        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size = 2000,
            chunk_overlap  = 300,
            length_function = len
        )
        chunks = text_splitter.split_text(description)
        for chunk in chunks:
            yield (title + "\n" + authors + "\n" + chunk, title, authors, category, publisher) # タイトルとチャンクを一緒にする
$$;

5. トランスクリプトから抽出したテキストのチャンクを格納するテーブルを作成していきます。

chunk_table.sql
-- 作成したチャンクをもとにしたテーブルを作成
CREATE TABLE test_db.public.book_description_chunks AS (
    SELECT
        books.*,
        t.CHUNK as CHUNK
    FROM test_db.public.test_table books,
        TABLE(test_db.public.books_chunk(books.description, books.title, books.authors, books.category, books.publisher)) t
);

-- テーブルの内容を確認
SELECT chunk, * FROM book_description_chunks LIMIT 10;

6. テーブル上にCortex Search Serviceを作成し、チャンクを検索できるようにしていきます。

cortex_search_service.sql
-- cortex search serviceの作成
CREATE CORTEX SEARCH SERVICE test_db.public.books_dataset_service
    ON CHUNK
    WAREHOUSE = test_wh
    TARGET_LAG = '1 hour'
    AS (
        SELECT *
        FROM test_db.public.book_description_chunks
    );

streamlitを利用できるsnowflake環境

では実際にアプリケーションを構築していきます。

実行する際には、「snowflake 0.8.0」のパッケージを追加してください。

streamlit.py

import streamlit as st
from snowflake.core import Root # requires snowflake>=0.8.0
from snowflake.snowpark.context import get_active_session

MODELS = [
    'mixtral-8x7b',
    'snowflake-arctic',
    'mistral-large',
    'mistral-large2',
    'llama3-8b',
    'llama3-70b',
    'reka-flash',
    'mistral-7b',
    'llama2-70b-chat',
    'gemma-7b'
]

def init_messages():
    """
    チャットメッセージのセッション状態を初期化する。 
    """
    if st.session_state.clear_conversation or "messages" not in st.session_state:
        st.session_state.messages = []

def init_service_metadata():
    """
    cortex searchのメタデータのセッション状態を初期化する。 
    """
    if "service_metadata" not in st.session_state:
        services = session.sql("SHOW CORTEX SEARCH SERVICES;").collect()
        service_metadata = []
        if services:
            for s in services:
                svc_name = s["name"]
                svc_search_col = session.sql(
                    f"DESC CORTEX SEARCH SERVICE {svc_name};"
                ).collect()[0]["search_column"]
                service_metadata.append(
                    {"name": svc_name, "search_column": svc_search_col}
                )

        st.session_state.service_metadata = service_metadata

def init_config_options():
    """
    サイドバーの設定オプションを初期化する。 
    """
    st.sidebar.selectbox(
        "Select cortex search service:",
        [s["name"] for s in st.session_state.service_metadata],
        key="selected_cortex_search_service",
    )

    st.sidebar.button("Clear conversation", key="clear_conversation")
    st.sidebar.toggle("Debug", key="debug", value=False)
    st.sidebar.toggle("Use chat history", key="use_chat_history", value=True)

    with st.sidebar.expander("Advanced options"):
        st.selectbox("Select model:", MODELS, key="model_name")
        st.number_input(
            "Select number of context chunks",
            value=5,
            key="num_retrieved_chunks",
            min_value=1,
            max_value=10,
        )
        st.number_input(
            "Select number of messages to use in chat history",
            value=5,
            key="num_chat_messages",
            min_value=1,
            max_value=10,
        )

    st.sidebar.expander("Session State").write(st.session_state)

def query_cortex_search_service(query):
    """
    選択されたcortex searchに与えられたクエリーでコンテキストドキュメントを取得する。
    """
    db, schema = session.get_current_database(), session.get_current_schema()

    cortex_search_service = (
        root.databases[db]
        .schemas[schema]
        .cortex_search_services[st.session_state.selected_cortex_search_service]
    )

    context_documents = cortex_search_service.search(
        query, columns=[], limit=st.session_state.num_retrieved_chunks
    )
    results = context_documents.results

    service_metadata = st.session_state.service_metadata
    search_col = [s["search_column"] for s in service_metadata
                    if s["name"] == st.session_state.selected_cortex_search_service][0]

    context_str = ""
    for i, r in enumerate(results):
        context_str += f"Context document {i+1}: {r[search_col]} \n" + "\n"

    if st.session_state.debug:
        st.sidebar.text_area("Context documents", context_str, height=500)

    return context_str

def get_chat_history():
    """
    セッション状態からチャット履歴を取得する。
    """
    start_index = max(
        0, len(st.session_state.messages) - st.session_state.num_chat_messages
    )
    return st.session_state.messages[start_index : len(st.session_state.messages) - 1]

def complete(model, prompt):
    """
    指定されたモデルを使用して、指定されたプロンプトの補完関数を生成する。
    """
    return session.sql("SELECT snowflake.cortex.complete(?,?)", (model, prompt)).collect()[0][0]

def make_chat_history_summary(chat_history, question):
    """
   クエリを拡張するために現在の質問と組み合わせたチャット履歴の要約を生成する
    """
    prompt = f"""
        [INST]
        以下のチャット履歴と質問に基づいて、質問を拡張するためのクエリを生成してください。
        クエリは自然言語で表現してください。
        結果には説明を加えず、クエリのみを返してください。

        <chat_history>
        {chat_history}
        </chat_history>
        <question>
        {question}
        </question>
        [/INST]
    """

    summary = complete(st.session_state.model_name, prompt)

    if st.session_state.debug:
        st.sidebar.text_area(
            "Chat history summary", summary.replace("$", "\$"), height=150
        )

    return summary

def create_prompt(user_question):
    """
    ユーザーの質問と、cortex検索サービスおよびチャット履歴から取得したコンテキストを組み合わせて、言語モデルのプロンプトを作成する。
    """
    if st.session_state.use_chat_history:
        chat_history = get_chat_history()
        if chat_history != []:
            question_summary = make_chat_history_summary(chat_history, user_question)
            prompt_context = query_cortex_search_service(question_summary)
        else:
            prompt_context = query_cortex_search_service(user_question)
    else:
        prompt_context = query_cortex_search_service(user_question)
        chat_history = ""

    prompt = f"""
            [INST]
            あなたは役に立つAIチャットアシスタントであり、RAG(Retrieval Augmented Generation)機能を備えています。
            ユーザーが質問をすると、以下の<context>タグ内に提供されるコンテキストや
            <chat_history>タグ内のチャット履歴を使用して、質問に対する要約や回答を生成してください。
            回答は簡潔でわかりやすく、ユーザーの質問に直接関係するものである必要があります。

            <chat_history>
            {chat_history}
            </chat_history>
            <context>
            {prompt_context}
            </context>
            <question>
            {user_question}
            </question>
            [/INST]
            Answer:
        """
    return prompt

def main():
    st.title(f":speech_balloon: Snowflake Cortexを利用したチャットボット")

    init_service_metadata()
    init_config_options()
    init_messages()

    icons = {"assistant": "❄️", "user": "👤"}

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        with st.chat_message(message["role"], avatar=icons[message["role"]]):
            st.markdown(message["content"])

    disable_chat = (
        "service_metadata" not in st.session_state
        or len(st.session_state.service_metadata) == 0
    )
    if question := st.chat_input("質問を入力してください。", disabled=disable_chat):
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": question})
        # Display user message in chat message container
        with st.chat_message("user", avatar=icons["user"]):
            st.markdown(question.replace("$", "\$"))

        # Display assistant response in chat message container
        with st.chat_message("assistant", avatar=icons["assistant"]):
            message_placeholder = st.empty()
            question = question.replace("'", "")
            with st.spinner("Thinking..."):
                generated_response = complete(
                    st.session_state.model_name, create_prompt(question)
                )
                message_placeholder.markdown(generated_response)

        st.session_state.messages.append(
            {"role": "assistant", "content": generated_response}
        )

if __name__ == "__main__":
    session = get_active_session()
    root = Root(session)
    main()

これを動かしてみます。
image.png
このような簡易的なUIを作成することができました。
チャット形式で利用することができるので、いろいろな質問をしてみようと思います。
image.png
このように、本に関するデータセットでこたえられる範囲の質問には、それに則って回答してくれました。
以下の内容が実際にロードしたcsvに含まれていました。

# 該当する行
Japanese in Plain English,"By De Mente, Boye","Discusses Japanese pronunciation and grammar, offers practice exercises, and lists the Japanese equivalents for more than twelve hundred English words and expressions"," Foreign Language Study , Japanese",McGraw-Hill,4.99,January,1987

ここで、データセットと関係ない質問をしたらどのような回答になるか試してみます。
image.png
全然関係ない質問でしたが、正確な回答が返ってきました。
ここで、モデルを mixtral-8x7b に変更して再実行してみます。

image.png

この質問は、提供された文脈のどれとも直接は関係ないが、それでも私はいくつかの一般的な情報を提供することができる。 2024年のオリンピック開催国はフランスのパリである。 この質問は、COVID-19パンデミックのために2021年に延期され、日本の東京で開催された2020年オリンピックとごっちゃになっているようだ。

こちらもデータセットとは関係ないという内容は出てきたものの、正確な回答がかえってきました。
質問の内容が簡単すぎたのかモデルが事前学習している内容だけでこたえられてしまったのでしょうか…
また、日本語のプロンプトを設定していますが、英語で返答されている点も気になるところです…

まとめ

読み込ませるデータセットの詳細度や量を変えてみたり、選択するモデルを変えてみたりすることで、多種多様な回答が生成されるため、いろんな遊び方ができるようです。

参考

今回は下記チュートリアルの内容を参考にしました。
日本語版は現在見つからなかったので、英語版になります。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?