はじめに
3/27にSnowflake Data for Breakfast Tokyoというイベントに参加してきました。
Keynoteのなかで、SlackでBotに対して質問を投げると、Cortex AgentsがSnowflake内の構造化/非構造化データから情報を抽出して返答するというデモがありました。
単純に面白そうに思ったのと、Getting Startedでコードも公開されているとのことだったので、Cortex系の機能を理解するのも兼ねて試してみました。
以下、実行例です。
実行環境
- Slack
- Appの作成や設定権限を持っているもの
- Snowflake
- Enterprise / AWS / Asia Pacific (Tokyo)
- トライアルアカウントでOK
- Google Colaboratory
処理の仕組み
全体の処理は以下のようになっています。
- ユーザーはSlackに質問を投げる
- Python環境がSlackに対してSocket Modeで待機しており、特定のイベントが補足されたときに事後の処理を実行
- Cortex Agentが質問内容を踏まえて、構造化データベースではCortex Analystを、非構造化データベースではCortex Searchを利用
- Semantic Modelや前処理されたテキストデータもとに返答を作成して回答
- ここには記載されていないが、SQLが生成された場合は実行してデータを取得し、データに応じてmatplotlibをもとにしたグラフを返答することもある
Cortex Agent自体の仕組みは全然知りませんでしたが、以下の記事にまとまっていました。
各種設定を行って試してみる
Slackを設定する
基本的にやることはGetting started with Bolt for Pythonに記載されている通りです。細かい処理はGitHubで公開されているスクリプトを利用するので、権限設定だけ済ませてしまいましょう。
ポイントだけ記載しておくと、
-
Basic Information
でApp-Level Tokens
にconnections:write
の権限を付与してトークンを生成する -
OAuth & Permissions
でchat:write
、files:write
を追加してトークンを生成する -
Socket Mode
を有効化してEvent Subscriptions
を使えるようにする -
Event Subscriptions
で利用したいイベントを指定する
です。イベントの種別については以下の通りになっているので、適宜試したいものを利用してください。
左側のサイドバーの「イベントサブスクリプション」に移動し、有効に切り替えます。 「ボットイベントへのサブスクライブ」で、ボットが応答するイベントを追加できます。メッセージに関連するイベントは 4 つあります。
message.channels
アプリが追加されたパブリックチャンネルのメッセージをリッスンしますmessage.groups
アプリが追加された🔒プライベートチャンネルのメッセージをリッスンしますmessage.im
アプリのDMでユーザーとのメッセージをリッスンしますmessage.mpim
アプリが追加された複数人DMのメッセージをリッスンしますボットが追加されたすべての場所からのメッセージをリッスンしたい場合は、4 つのメッセージ イベントすべてを選択します。ボットがリッスンするイベントを選択したら、緑色の [変更を保存]ボタンをクリックします。
出典:イベントの設定
ここまで出来たら、Google Colaboratoryでシークレットを登録して、以下のコードを実行してください。
from google.colab import userdata
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
slack_bot_token = userdata.get('SLACK_BOT_TOKEN_FOR_CORTEX_AGENT')
slack_app_level_token = userdata.get('SLACK_APP_LEVEL_TOKEN_FOR_CORTEX_AGENT')
app = App(token=slack_bot_token)
@app.message('hello')
def message_hello(message, say):
say(f"Hey there <@{message['user']}>!")
if __name__ == '__main__':
handler = SocketModeHandler(app, slack_app_level_token)
handler.start()
⚡️ Bolt app is running!
と表示されます。
これで正常に処理できているので、SlackのDMでAppに対してhelloと呼びかけてみましょう。
返事がきたら接続検証は完了です。
Snowflakeを設定する
まず、Snowflakeで接続するための鍵を生成します。Colaboratoryで以下のコードを実行して、表示される公開鍵をメモしてください。
!openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
!openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
!cat rsa_key.pub | tr -d '\n' | sed 's/-----BEGIN PUBLIC KEY-----//g' | sed 's/-----END PUBLIC KEY-----//g'
SnowflakeのSQLワークシートで、rsa_public_keyに先ほどメモした公開鍵を記載して、実行してください。
-- ref: https://quickstarts.snowflake.com/guide/integrate_snowflake_cortex_agents_with_slack/index.html
-- ref: https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/setup.sql
use role useradmin;
create role if not exists cortex_agent_app;
grant role cortex_agent_app to role sysadmin;
create user if not exists cortex_agent_app;
grant role cortex_agent_app to user cortex_agent_app;
alter user cortex_agent_app
set rsa_public_key = '<YOUR PUBLIC KEY>';
alter user cortex_agent_app
set default_role = cortex_agent_app;
use role accountadmin;
grant create database on account to role cortex_agent_app;
grant create warehouse on account to role cortex_agent_app;
use role cortex_agent_app;
create database if not exists dash_db;
create schema if not exists dash_schema;
create warehouse if not exists dash_s
warehouse_size=small;
use role useradmin;
alter user cortex_agent_app
set default_role = cortex_agent_app;
use role cortex_agent_app;
use dash_db.dash_schema;
use warehouse dash_s;
create file format if not exists csv_format
skip_header = 1
field_optionally_enclosed_by = '"'
type = 'csv';
create stage if not exists support_tickets_data_stage
file_format = csv_format
url = 's3://sfquickstarts/sfguide_integrate_snowflake_cortex_agents_with_slack/';
create or replace table support_tickets (
ticket_id varchar(60),
customer_name varchar(60),
customer_email varchar(60),
service_type varchar(60),
request varchar,
contact_preference varchar(60)
);
copy into support_tickets
from @support_tickets_data_stage;
create stage if not exists dash_semantic_models
encryption = (type='snowflake_sse')
directory = (enable=true);
-- manual upload a file below
-- https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/support_tickets_semantic_model.yaml
create stage if not exists dash_pdfs
encryption = (type='snowflake_sse')
directory = (enable=true);
-- manual upload files below
-- https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/tree/main/data
セマンティックモデルとして利用するyamlファイルと、非構造化データの参照元とするPDFファイルは手動でステージにアップロードする必要があるので、アップロード後に以下のSQLを実行してください。
-- ref: https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/cortex_search_service.sql
create or replace table parse_pdfs
as
select
relative_path,
snowflake.cortex.parse_document(
@dash_db.dash_schema.DASH_DB.DASH_SCHEMA.DASH_SEMANTIC_MODELSdash_pdfs,
relative_path,
{'mode': 'layout'}
) as data
from
directory(@dash_db.dash_schema.dash_pdfs);
create or replace table parsed_pdfs
as
with parsed_data as (
select
relative_path,
snowflake.cortex.split_text_recursive_character(to_variant(data):content, 'markdown', 1800, 300) as chunks
from
parse_pdfs
where
to_variant(data):content is not null
)
select
to_varchar(c.value) as page_content,
regexp_replace(relative_path, '\\.pdf$', '') as title,
'dash_db.dash_schema.dash_pdfs' as input_stage,
relative_path as relative_path
from
parsed_data p,
lateral flatten(input => p.chunks) c;
create or replace cortex search service dash_db.dash_schema.vehicles_info on page_content
warehouse = dash_s
target_lag = '1 hour'
as (
select
'' as page_url,
page_content,
title,
relative_path
from
parsed_pdfs
);
-- delete created objects
-- use role accountadmin;
-- drop role cortex_agent_app;
-- drop database if exists dash_db;
-- drop warehouse if exists dash_s;
これで設定は完了です。
Google Colaboratoryを設定する
GitHubで公開されているスクリプトを取得しつつ、元のものは英語のみで動作するものになっているので、日本語で利用できるようにapp.pyのスクリプトを調整します。雑に力技で処理しているのは目をつぶってください。
!curl -o requirements.txt https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/requirements.txt
!curl -o app.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/app.py
!curl -o cortex_chat.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/cortex_chat.py
!curl -o generate_jwt.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/generate_jwt.py
!pip install -r requirements.txt
!pip install deep-translator langdetect
with open('./app.py') as f:
code = f.read()
code_for_ja = code.replace(
'import requests',
'import requests\nfrom langdetect import detect\nfrom deep_translator import GoogleTranslator'
) \
.replace(
'app = App(token=SLACK_BOT_TOKEN)\nmessages = []',
'''app = App(token=SLACK_BOT_TOKEN)\nmessages = []
def translate_ja_to_en(text):
language = detect(text)
if language == 'ja':
text = GoogleTranslator(source='ja', target='en').translate(text)
return text
def translate_en_to_ja(text):
language = detect(text)
if language == 'en':
text = GoogleTranslator(source='en', target='ja').translate(text)
return text
'''
) \
.replace(
"prompt = body['event']['text']",
"prompt = translate_ja_to_en(body['event']['text'])"
) \
.replace(
'"text": ":snowflake: Snowflake Cortex AI is generating a response. Please wait..."',
'"text": f":snowflake: {translate_en_to_ja(\'Snowflake Cortex AI is generating a response. Please wait...\')}"',
) \
.replace(
'"text": f"Answer: {content[\'text\']}"',
'"text": translate_en_to_ja(f"Answer: {content[\'text\']}")'
) \
.replace(
'"text": f"* Citation: {content[\'citations\']}"',
'"text": translate_en_to_ja(f"* Citation: {content[\'citations\']}")'
) \
.replace(
'"Answer',
'"回答'
)
with open('./app.py', mode='w', encoding='utf-8') as f:
f.write(code_for_ja)
元のファイルが.envからクレデンシャルを読み込む形になっているので、Google Colaboratoryのシークレットにデータを登録して、.env
ファイルを作成するようにします。
DEMO_DATABASE='DASH_DB'
DEMO_SCHEMA='DASH_SCHEMA'
WAREHOUSE='DASH_S'
DEMO_USER='CORTEX_AGENT_APP'
DEMO_USER_ROLE='CORTEX_AGENT_APP'
SEMANTIC_MODEL='@DASH_DB.DASH_SCHEMA.DASH_SEMANTIC_MODELS/support_tickets_semantic_model.yaml'
SEARCH_SERVICE='DASH_DB.DASH_SCHEMA.vehicles_info'
ACCOUNT='<Account Identifier>' # Account Details > Account Identifier
HOST='<Account/Server URL>' # Account Details > Account/Server URL
AGENT_ENDPOINT='https://<Account Identifier>.snowflakecomputing.com/api/v2/cortex/agent:run'
SLACK_APP_TOKEN='<YOUR_APP_TOKEN>'
SLACK_BOT_TOKEN='<YOUR_BOT_TOKEN>'
# You may NOT edit below values
RSA_PRIVATE_KEY_PATH='rsa_key.p8'
MODEL = 'claude-3-5-sonnet'
from google.colab import userdata
credentials_for_cortex_agent = userdata.get('CREDENTIALS_FOR_CORTEX_AGENT')
with open('./.env', 'w', encoding='utf-8') as f:
f.write(credentials_for_cortex_agent)
この状態でapp.pyを実行して、⚡️ Bolt app is running!
が表示されれば設定は完了です。
実行すると冒頭でお見せしたような回答が返ってきます。
詳しく見てみて考えたこと
非技術者による構造化データと非構造化データの統合利用がはじまる?
Data for Breakfastでは、
- いかに構造化データだけでなく非構造化データも合わせて統合的に分析に活用できるか
- そうした分析をいかに非技術者の立場からもできるようになるか
がこれからの大きなテーマとして挙げられていました。今回Cortex Agentを通してその一端に触れられたことは、非常に面白く感じました。
Semantic Modelの仕組み
yamlファイルをSemantic Modelとして指定することでCortex Analystが動いています。ファイルを見るにテーブルの情報を上手く使うにはメタデータが必須というのは改めてですがそれはそう、という印象でした。
今回は1テーブル分の記載になっていますが、複数テーブルを記載することで参照先のテーブルとしてどれかを選択するみたいな形はできそうで、複数の結果を組み合わせるとかはできるのですかね。
非構造化データの処理がすごい
Cortex Searchを実現するために、PDFをテキスト化して、テキスト化したデータをマークダウン形式で分割し、検索可能にするという前処理が入っています。
テキスト化した段階では構造的な情報が落ちるわけですが、マークダウン形式にするときに見出しの付与などが行われており、こういう処理をさっとできるところにすごさを感じました。細かい精度までは見ていないので、どこまで実運用に耐えられるレベルかは不明ですが・・・。
そして、テーブルリネージをよく見てみるとDynamic Tableが作成されていて、一度読み取り用のテーブルを作っておくと、元となるテーブルを更新すれば反映される仕組みになっているようです。
Socket Modeで待機し続けないといけない
シンプルな仕組みでできるのかと思っていたところ、いきなりPythonかよ!とびっくりしてしまいました。メッセージの投稿といったイベント起点でその後の処理を回すので、待機しておかないといけないのはもう少し何とかならないのだろうか・・・
日本語でどこまで使いやすいのか
公式ドキュメントを全然読んでいないので全くわかっていない前提ですが、質問や応答もそうですし、ベースとしているセマンティックモデルやテーブルとして格納されているデータについても、日本語であると精度面での課題や、運用上の課題が出てきそうに思いました。
細かい点では今回のチュートリアルではmatplotlibを使っているので、日本語だとエラーが出そうな気が・・・(未検証)
おわりに
色々と今後に向けての展望を感じられた非常に興味深いデモ/チュートリアルでした。実務に落とし込んでいくには、
- 日本語という大きな壁をどう乗り越えるか
- 準拠する元としての構造化データ/セマンティックモデル/非構造化データをいかに品質高く整備するか
- どこまでの範囲が回答可能かを意識しないですむような、回答の準拠範囲をいかに広げるか
- 非技術者が扱うとなったときに、質問の質/回答の質/回答後の行動変容までいかに実運用レベルまで持っていくか
などが課題になりそうですね。