注意
こちらは、タイトルにあるシナリオにおいてDatabricksの機能をどのように活用できるのかに関する思考実験とプロトタイピングの内容となっています。
発端は最近のこちらの新機能のリリースです。
特にストリーミングテーブルが気になりまして。以下の点がモヤモヤしていました。
- そもそも、ストリーミングテーブルとは何か
- どう使ったらいいのか
色々触っていてどうにか理解できたとなったので、何かで実践したくなりました。
以下で説明はされています。ただし、結構難解。
ストリーミングテーブルは、ストリーミングまたは増分データ処理の追加サポートを備えたDeltaテーブルです。ストリーミングテーブルを使用すると、各行を1回だけ処理して、増大するデータセットを処理できます。ほとんどのデータセットは時間の経過とともに継続的に増加するため、ストリーミングテーブルはほとんどの取り込みワークロードに適しています。ストリーミングテーブルは、データの鮮度と低遅延を必要とするパイプラインに最適です。ストリーミングテーブルは、新しいデータが到着するたびに結果を増分計算して、更新のたびにすべてのソースデータを完全に再計算する必要がなく、結果を最新の状態に保つことができるため、大規模な変換にも役立ちます。ストリーミングテーブルは、追加専用のデータソース用に設計されています。
自分なりに読み解くと、
- ストリーミングテーブルのデータソースは追記型(ストレージにファイルが追加される、メッセージングサービスからメッセージが届く)
- このようなケースで、ソースにあるファイルやメッセージをすべて処理するのは非効率的
- ストリーミングテーブルを用いることで、追記されたデータのみを取り込めるようになる。これが上の各行を1回だけ処理しての意味。
と理解しました。ストリーミングテーブルはソースデータをどこまで処理したのかを記憶してくれます。Spark構造化ストリーミングにおけるチェックポイントの機能が組み込まれているということですね。
ストリーミングテーブルと生成AIによるカスタマーサポートの効率化
前振りが長くなりましたが、そういったデータソースがあるシナリオで簡単な仕組みを作ってみたいと思った訳です。それで思いついたのが、タイトルにあるカスタマーサポートです。オンライン窓口からやってくる問い合わせはまさに追記型です。そして、今のDatabricksの提供機能なら、生成AIの活用を含めエンドツーエンドで効率化のための仕組みが作れることにも気づきました。
今回使うDatabricksの機能は以下の通りです:
- Databricks Apps: 問い合わせ画面を作ります。
- Delta Live Tables: 新規に登録された問い合わせを処理するデータパイプラインを実装します。
- AI関数: 生成AIを用いて問い合わせのテキストを処理します。
- Databricksジョブ: 問い合わせが登録されたらデータパイプラインを起動するようにオーケストレートします。
- AI/BIダッシュボード: 問い合わせの処理結果を可視化します。
Databricks Appsによるフロントエンド画面の構築
構築したアプリの実装は以下の通りです。
import streamlit as st
import pandas as pd
from databricks.sdk import WorkspaceClient
from io import BytesIO, StringIO
from datetime import datetime
w = WorkspaceClient()
now = datetime.now()
filename = now.strftime('%Y%m%d_%H%M%S') + '.csv'
volume_file_path = f'/Volumes/taka_yayoi_catalog/customer_service/inquiries/{filename}.csv'
st.set_page_config(layout="wide")
st.header("カスタマーサポート問い合わせ窓口")
def form_callback(inquiry_input):
strIO = StringIO(f"inquiry\n{inquiry_input}\n")
binary_data = BytesIO(bytes(strIO.getvalue(), encoding='utf-8'))
# Upload a file to a volume.
w.files.upload(volume_file_path, binary_data, overwrite = True)
with st.form(key="my_form",clear_on_submit=True):
st.write("お問い合わせ内容を入力してください。")
inquiry_input = st.text_input('問い合わせ内容', key='inquiry')
submitted = st.form_submit_button("送信")
if submitted:
st.write("お問い合わせ内容", inquiry_input)
form_callback(inquiry_input)
画面はこんな感じで。
Databricks SDKを使って、問い合わせ内容をUnity Catalogのボリュームに書き出すようにしています。アプリのサービスプリンシパルに書き込み権限を与えておきます。
うまく動作すれば、問い合わせ登録の都度ファイルが生成されます。今回、個別のファイルに書き出しているのはジョブのファイル到着トリガーを使うためです。
Delta Live TablesとAI関数による生成AIを用いたインクリメンタルなテキスト処理
ここが個人的には面白かった部分。元々、Delta Live Tables(DLT)はデータパイプラインを効率的に開発、運用できるようにするためのフレームワークでしたが、そこにSQLからLLMを呼び出せるAI関数が入ってきたことで、テキストデータも柔軟に処理できるようになりました。
ここでは、以下のような処理を実装します。
- 新規に登録された問い合わせのテキストをインクリメンタル(新規のファイルのみを読み込む)に読み込むストリーミングテーブルを宣言
- ストリーミングテーブルのデータに対して、緊急度の判別、感情分析、製品名の抽出、解答案の作成を行い、結果をマテリアライズドビューとして格納
ストリーミングテーブル
CREATE OR REPLACE STREAMING TABLE inquiries AS
SELECT
inquiry
FROM
STREAM (
read_files("/Volumes/taka_yayoi_catalog/customer_service/inquiries", format => "csv", header => "true")
)
マテリアライズドビュー
CREATE OR REPLACE MATERIALIZED VIEW analyzed_inquiries AS
SELECT
inquiry,
ai_classify(inquiry, ARRAY("緊急", "通常")) AS urgency, -- 緊急度
ai_analyze_sentiment(inquiry) AS sentiment, -- 感情分析
ai_extract(
inquiry,
array('prodcut', 'service')
) AS product, -- 製品、サービス名
ai_query(
'databricks-meta-llama-3-1-70b-instruct',
CONCAT('問い合わせに対する解答案を作成してください。問い合わせ: ', inquiry)
) AS response_draft
FROM
LIVE.inquiries
これだけで上の処理が実装できました。ai_query
関数では基盤モデルAPIのエンドポイントを指定して、より柔軟な指示を与えています。
パイプラインを実行して動作確認します。
期待した結果が生成されています。
ファイル到着トリガーによる処理の自動化
ここまでで大体出来上がっていますが、パイプライン処理を手動で起動しなくてはならないという難点があります。そこで、Databricksジョブのファイル到着トリガーの出番です。名前の通り、特定の場所にファイルが到着したらジョブを起動する機能です。
ここでは一つのタスクのみですが、ジョブは複数のタスクから構成することができます。上で定義したパイプラインを設定します。
トリガー設定画面でファイル到着トリガーを指定します。/Volumes/taka_yayoi_catalog/customer_service/inquiries/
へのファイル到着を監視して、新規ファイルが作成されたらパイプラインを起動するように設定します。
アプリの画面から問い合わせを登録します。
少しするとジョブが起動します。
問い合わせと処理結果が更新されました。
ダッシュボードによる問い合わせの可視化
あとは、概要を把握できるようにダッシュボードを作成します。テーブルを表示している画面から簡単にダッシュボードを作成することができます。
ここ数年でテキストの活用の幅が本当に広がりましたね。