4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake Data for BreakfastのSlack - Cortex Agents連携を試してみる

Last updated at Posted at 2025-03-30

はじめに

3/27にSnowflake Data for Breakfast Tokyoというイベントに参加してきました。

Keynoteのなかで、SlackでBotに対して質問を投げると、Cortex AgentsがSnowflake内の構造化/非構造化データから情報を抽出して返答するというデモがありました。

単純に面白そうに思ったのと、Getting Startedでコードも公開されているとのことだったので、Cortex系の機能を理解するのも兼ねて試してみました。

以下、実行例です。

image.png

image.png

実行環境

  • Slack
    • Appの作成や設定権限を持っているもの
  • Snowflake
    • Enterprise / AWS / Asia Pacific (Tokyo)
    • トライアルアカウントでOK
  • Google Colaboratory

処理の仕組み

全体の処理は以下のようになっています。

slack_cortex_agent_architecture.drawio.png

  • ユーザーはSlackに質問を投げる
  • Python環境がSlackに対してSocket Modeで待機しており、特定のイベントが補足されたときに事後の処理を実行
  • Cortex Agentが質問内容を踏まえて、構造化データベースではCortex Analystを、非構造化データベースではCortex Searchを利用
  • Semantic Modelや前処理されたテキストデータもとに返答を作成して回答
  • ここには記載されていないが、SQLが生成された場合は実行してデータを取得し、データに応じてmatplotlibをもとにしたグラフを返答することもある

Cortex Agent自体の仕組みは全然知りませんでしたが、以下の記事にまとまっていました。

各種設定を行って試してみる

Slackを設定する

基本的にやることはGetting started with Bolt for Pythonに記載されている通りです。細かい処理はGitHubで公開されているスクリプトを利用するので、権限設定だけ済ませてしまいましょう。

ポイントだけ記載しておくと、

  • Basic InformationApp-Level Tokensconnections:writeの権限を付与してトークンを生成する
  • OAuth & Permissionschat:writefiles:writeを追加してトークンを生成する
  • Socket Modeを有効化してEvent Subscriptionsを使えるようにする
  • Event Subscriptionsで利用したいイベントを指定する

です。イベントの種別については以下の通りになっているので、適宜試したいものを利用してください。

左側のサイドバーの「イベントサブスクリプション」に移動し、有効に切り替えます。 「ボットイベントへのサブスクライブ」で、ボットが応答するイベントを追加できます。メッセージに関連するイベントは 4 つあります。

  • message.channelsアプリが追加されたパブリックチャンネルのメッセージをリッスンします
  • message.groupsアプリが追加された🔒プライベートチャンネルのメッセージをリッスンします
  • message.imアプリのDMでユーザーとのメッセージをリッスンします
  • message.mpimアプリが追加された複数人DMのメッセージをリッスンします

ボットが追加されたすべての場所からのメッセージをリッスンしたい場合は、4 つのメッセージ イベントすべてを選択します。ボットがリッスンするイベントを選択したら、緑色の [変更を保存]ボタンをクリックします。

出典:イベントの設定

ここまで出来たら、Google Colaboratoryでシークレットを登録して、以下のコードを実行してください。

from google.colab import userdata
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler

slack_bot_token = userdata.get('SLACK_BOT_TOKEN_FOR_CORTEX_AGENT')
slack_app_level_token = userdata.get('SLACK_APP_LEVEL_TOKEN_FOR_CORTEX_AGENT')

app = App(token=slack_bot_token)

@app.message('hello')
def message_hello(message, say):
    say(f"Hey there <@{message['user']}>!")

if __name__ == '__main__':
    handler = SocketModeHandler(app, slack_app_level_token)
    handler.start()

⚡️ Bolt app is running!と表示されます。

image.png

これで正常に処理できているので、SlackのDMでAppに対してhelloと呼びかけてみましょう。

image.png

返事がきたら接続検証は完了です。

Snowflakeを設定する

まず、Snowflakeで接続するための鍵を生成します。Colaboratoryで以下のコードを実行して、表示される公開鍵をメモしてください。

!openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
!openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
!cat rsa_key.pub | tr -d '\n' | sed 's/-----BEGIN PUBLIC KEY-----//g' | sed 's/-----END PUBLIC KEY-----//g'

SnowflakeのSQLワークシートで、rsa_public_keyに先ほどメモした公開鍵を記載して、実行してください。

-- ref: https://quickstarts.snowflake.com/guide/integrate_snowflake_cortex_agents_with_slack/index.html
-- ref: https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/setup.sql

use role useradmin;
create role if not exists cortex_agent_app;
grant role cortex_agent_app to role sysadmin;
create user if not exists cortex_agent_app;
grant role cortex_agent_app to user cortex_agent_app;
alter user cortex_agent_app
  set rsa_public_key = '<YOUR PUBLIC KEY>';
alter user cortex_agent_app
  set default_role = cortex_agent_app;

use role accountadmin;
grant create database on account to role cortex_agent_app;
grant create warehouse on account to role cortex_agent_app;

use role cortex_agent_app;
create database if not exists dash_db;
create schema if not exists dash_schema;
create warehouse if not exists dash_s
  warehouse_size=small;

use role useradmin;
alter user cortex_agent_app
  set default_role = cortex_agent_app;

use role cortex_agent_app;
use dash_db.dash_schema;
use warehouse dash_s;

create file format if not exists csv_format
  skip_header = 1
  field_optionally_enclosed_by = '"'
  type = 'csv';

create stage if not exists support_tickets_data_stage
  file_format = csv_format
  url = 's3://sfquickstarts/sfguide_integrate_snowflake_cortex_agents_with_slack/';

create or replace table support_tickets (
  ticket_id varchar(60),
  customer_name varchar(60),
  customer_email varchar(60),
  service_type varchar(60),
  request varchar,
  contact_preference varchar(60)
);

copy into support_tickets
  from @support_tickets_data_stage;

create stage if not exists dash_semantic_models
  encryption = (type='snowflake_sse')
  directory = (enable=true);

-- manual upload a file below
-- https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/support_tickets_semantic_model.yaml
  
create stage if not exists dash_pdfs
  encryption = (type='snowflake_sse')
  directory = (enable=true);

-- manual upload files below
-- https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/tree/main/data

セマンティックモデルとして利用するyamlファイルと、非構造化データの参照元とするPDFファイルは手動でステージにアップロードする必要があるので、アップロード後に以下のSQLを実行してください。

-- ref: https://github.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/blob/main/cortex_search_service.sql

create or replace table parse_pdfs
as
select
  relative_path,
  snowflake.cortex.parse_document(
    @dash_db.dash_schema.DASH_DB.DASH_SCHEMA.DASH_SEMANTIC_MODELSdash_pdfs,
    relative_path,
    {'mode': 'layout'}
  ) as data
from
  directory(@dash_db.dash_schema.dash_pdfs);

create or replace table parsed_pdfs
as
with parsed_data as (
  select
    relative_path,
    snowflake.cortex.split_text_recursive_character(to_variant(data):content, 'markdown', 1800, 300) as chunks
  from
    parse_pdfs
  where
    to_variant(data):content is not null
)

select
  to_varchar(c.value) as page_content,
  regexp_replace(relative_path, '\\.pdf$', '') as title,
  'dash_db.dash_schema.dash_pdfs' as input_stage,
  relative_path as relative_path
from
  parsed_data p,
  lateral flatten(input => p.chunks) c;

create or replace cortex search service dash_db.dash_schema.vehicles_info on page_content
  warehouse = dash_s
  target_lag = '1 hour'
  as (
    select
      '' as page_url,
      page_content,
      title,
      relative_path
    from
      parsed_pdfs
  );


-- delete created objects
-- use role accountadmin;
-- drop role cortex_agent_app;
-- drop database if exists dash_db;
-- drop warehouse if exists dash_s;

これで設定は完了です。

Google Colaboratoryを設定する

GitHubで公開されているスクリプトを取得しつつ、元のものは英語のみで動作するものになっているので、日本語で利用できるようにapp.pyのスクリプトを調整します。雑に力技で処理しているのは目をつぶってください。

!curl -o requirements.txt https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/requirements.txt
!curl -o app.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/app.py
!curl -o cortex_chat.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/cortex_chat.py
!curl -o generate_jwt.py https://raw.githubusercontent.com/Snowflake-Labs/sfguide-integrate-snowflake-cortex-agents-with-slack/refs/heads/main/generate_jwt.py
!pip install -r requirements.txt
!pip install deep-translator langdetect
with open('./app.py') as f:
    code = f.read()

code_for_ja = code.replace(
        'import requests',
        'import requests\nfrom langdetect import detect\nfrom deep_translator import GoogleTranslator'
    ) \
    .replace(
        'app = App(token=SLACK_BOT_TOKEN)\nmessages = []',
        '''app = App(token=SLACK_BOT_TOKEN)\nmessages = []

def translate_ja_to_en(text):
    language = detect(text)
    if language == 'ja':
        text = GoogleTranslator(source='ja', target='en').translate(text)
    return text

def translate_en_to_ja(text):
    language = detect(text)
    if language == 'en':
        text = GoogleTranslator(source='en', target='ja').translate(text)
    return text
'''
    ) \
    .replace(
        "prompt = body['event']['text']",
        "prompt = translate_ja_to_en(body['event']['text'])"
    ) \
    .replace(
        '"text": ":snowflake: Snowflake Cortex AI is generating a response. Please wait..."',
        '"text": f":snowflake: {translate_en_to_ja(\'Snowflake Cortex AI is generating a response. Please wait...\')}"',
    ) \
    .replace(
        '"text": f"Answer: {content[\'text\']}"',
        '"text": translate_en_to_ja(f"Answer: {content[\'text\']}")'
    ) \
    .replace(
        '"text": f"* Citation: {content[\'citations\']}"',
        '"text": translate_en_to_ja(f"* Citation: {content[\'citations\']}")'
    ) \
    .replace(
        '"Answer',
        '"回答'
    )

with open('./app.py', mode='w', encoding='utf-8') as f:
    f.write(code_for_ja)

元のファイルが.envからクレデンシャルを読み込む形になっているので、Google Colaboratoryのシークレットにデータを登録して、.envファイルを作成するようにします。

シークレットに登録する値
DEMO_DATABASE='DASH_DB'
DEMO_SCHEMA='DASH_SCHEMA'
WAREHOUSE='DASH_S'
DEMO_USER='CORTEX_AGENT_APP'
DEMO_USER_ROLE='CORTEX_AGENT_APP'
SEMANTIC_MODEL='@DASH_DB.DASH_SCHEMA.DASH_SEMANTIC_MODELS/support_tickets_semantic_model.yaml'
SEARCH_SERVICE='DASH_DB.DASH_SCHEMA.vehicles_info'
ACCOUNT='<Account Identifier>'  # Account Details > Account Identifier
HOST='<Account/Server URL>'  # Account Details > Account/Server URL
AGENT_ENDPOINT='https://<Account Identifier>.snowflakecomputing.com/api/v2/cortex/agent:run'
SLACK_APP_TOKEN='<YOUR_APP_TOKEN>'
SLACK_BOT_TOKEN='<YOUR_BOT_TOKEN>'

# You may NOT edit below values  
RSA_PRIVATE_KEY_PATH='rsa_key.p8'
MODEL = 'claude-3-5-sonnet'
from google.colab import userdata
credentials_for_cortex_agent = userdata.get('CREDENTIALS_FOR_CORTEX_AGENT')
with open('./.env', 'w', encoding='utf-8') as f:
    f.write(credentials_for_cortex_agent)

この状態でapp.pyを実行して、⚡️ Bolt app is running!が表示されれば設定は完了です。

image.png

実行すると冒頭でお見せしたような回答が返ってきます。

詳しく見てみて考えたこと

非技術者による構造化データと非構造化データの統合利用がはじまる?

Data for Breakfastでは、

  • いかに構造化データだけでなく非構造化データも合わせて統合的に分析に活用できるか
  • そうした分析をいかに非技術者の立場からもできるようになるか

がこれからの大きなテーマとして挙げられていました。今回Cortex Agentを通してその一端に触れられたことは、非常に面白く感じました。

Semantic Modelの仕組み

yamlファイルをSemantic Modelとして指定することでCortex Analystが動いています。ファイルを見るにテーブルの情報を上手く使うにはメタデータが必須というのは改めてですがそれはそう、という印象でした。

今回は1テーブル分の記載になっていますが、複数テーブルを記載することで参照先のテーブルとしてどれかを選択するみたいな形はできそうで、複数の結果を組み合わせるとかはできるのですかね。

非構造化データの処理がすごい

Cortex Searchを実現するために、PDFをテキスト化して、テキスト化したデータをマークダウン形式で分割し、検索可能にするという前処理が入っています。

テキスト化した段階では構造的な情報が落ちるわけですが、マークダウン形式にするときに見出しの付与などが行われており、こういう処理をさっとできるところにすごさを感じました。細かい精度までは見ていないので、どこまで実運用に耐えられるレベルかは不明ですが・・・。

そして、テーブルリネージをよく見てみるとDynamic Tableが作成されていて、一度読み取り用のテーブルを作っておくと、元となるテーブルを更新すれば反映される仕組みになっているようです。

image.png

Socket Modeで待機し続けないといけない

シンプルな仕組みでできるのかと思っていたところ、いきなりPythonかよ!とびっくりしてしまいました。メッセージの投稿といったイベント起点でその後の処理を回すので、待機しておかないといけないのはもう少し何とかならないのだろうか・・・

日本語でどこまで使いやすいのか

公式ドキュメントを全然読んでいないので全くわかっていない前提ですが、質問や応答もそうですし、ベースとしているセマンティックモデルやテーブルとして格納されているデータについても、日本語であると精度面での課題や、運用上の課題が出てきそうに思いました。

細かい点では今回のチュートリアルではmatplotlibを使っているので、日本語だとエラーが出そうな気が・・・(未検証)

おわりに

色々と今後に向けての展望を感じられた非常に興味深いデモ/チュートリアルでした。実務に落とし込んでいくには、

  • 日本語という大きな壁をどう乗り越えるか
  • 準拠する元としての構造化データ/セマンティックモデル/非構造化データをいかに品質高く整備するか
  • どこまでの範囲が回答可能かを意識しないですむような、回答の準拠範囲をいかに広げるか
  • 非技術者が扱うとなったときに、質問の質/回答の質/回答後の行動変容までいかに実運用レベルまで持っていくか

などが課題になりそうですね。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?