Batch Fuzzy Matching for Data Engineers with AI Functions | by AI on Databricks | Apr, 2025 | Mediumの翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
著者: Dan Pechi, Sr. GenAI Product Specialist @ Databricks
データパイプラインへのAIの適用
数十年の間、データエンジニアリングは変化しておらず、構造化、非構造化データを洞察や予測を意味出すことができるテーブル形式に変換しています。(テキスト文書、画像、音声ファイルなどの)非構造化データに関しては、データから価値を得るのに必要な構造を構築するためのデータエンジニアの選択肢は限られていました。多くの場合、サポートチケットのフリーテキストのフィールド、電子請求の画像、顧客打ち合わせの音声記録は、データパイプラインインテグレーションへの道筋を持たないblobストレージに鎮座するだけになります。
最近の人工知能の進歩によって、データエンジニアは進化する機会を得ました。AIを活用することで、自身のデータパイプラインにインテリジェンスを埋め込むことができ、データインテリジェンスエンジニアリングと呼ばれる新たな能力セットを開放することになります。
これは特に、非構造化データの重複排除、データマッピング、ファジーマッピングのユースケースで真となります。これまでは、少しだけフォーマットが崩れたテキストデータであっても、データエンジニアは他のデータとある非構造化データセットをマッチングするためには、ごちゃ混ぜの正規表現を組み合わせ、距離指標や補完ロジックを修正することを強いられていました。多くの場合、これらのシステムはデータエンジニアリングロジックのエッジケースによって不完全なものとなり、真のリソースにデータが投入されないことになってしまっていました。これらのシステムで効果を得るには、データエンジニアのさらなるオーバヘッドを必要とし、データフローを稼働させることにフォーカスできていませんでした。
このブログでは、データエンジニアによるオーダーメードの非構造化データパイプラインにおけるロジックのハードコードの必要性を回避し、データマッチングを実行するためにデータインテリジェンスを活用するスケーラブルなパターンを説明します。未知j会ドキュメントのファジーマッチングを構築する顧客からインスパイアを受けたユースケースをウォークスルーします。このプロセスでは、ドキュメントのフォーマット、構築、格納、マッピングの検証を行うために、Databricks Vector Search、Unity Catalog、サーバレスバッチ推論を活用します。
DBSQL + バッチ推論による文書要約
ファジーマッチングプロセスをベストなものにするには、われわれの非構造化文書をどのように表現するのがベストなのかを検討することが重要です。ある場合には、ファジーマッチングするエンティティには前処理が不要である場合があります。例えば、製品の説明文は、簡潔かつ他の製品から区別するために十分な説明をしているかもしれませんが、顧客情報においては、どの属性がマッチングを特定するために最も有用なのかに関して幾つかの検討が必要かもしれません。このケースでは、我々は非常に長いドキュメントを持っているので、ドキュメントの要約を作成することでそれぞれの表現を圧縮することができます。
これは、1行のSQLで大規模なドキュメントの要約を作成することができる、Databricksにおけるサーバレスバッチ推論を活用することで容易に達成することができます。2025/4月時点では、サーバレスバッチ推論は(以下に示す)Llama、Claude、GTEモデルをサポートしています。生成時間とコストを削減するために、それぞれの文書の要約を数文に限定します。サーバレスバッチ推論を用いることで、我々はこれらの要約を生成するのに必要な時間とコストをさらに削減することができ、入力における任意のトークン長や行数にスケールすることができます。
command = f"""
SELECT text,
ai_query(
‘databricks-meta-llama-3-3-70b-instruct', -- endpoint name
CONCAT('Provide a 2-3 sentence synopsis of the characters lines from the episode:', text)
) AS act_synopsis
FROM batch.dpechi.star_trek_episode_text
"""
result = spark.sql(command)
文書要約のエンベディングを用いたベクトルストアの構築
これらの要約を手に入れたら、新規文書とこのセットにおける類似文書をマッチングする手段が必要となります。これを行う最も簡単な方法は、文書のエンベディングを生成し、以降検索できるようにベクトルストアに格納することです。最初に要約をDeltaテーブルに格納し、それをベースにVector SearchのDelta Syncインデックスを作成することで、これを行うことができます。
from databricks.vector_search.client import VectorSearchClient
#building vector store
vsc = VectorSearchClient()
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD"
)
endpoint = vsc.get_endpoint(
name=vector_search_endpoint_name)
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=delta_table_name,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="act_synopsis",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()
新規文書のエンベディングの生成およびベクトルストアからの類似検索
これで、「推論モード」で新規データのパイプラインを実行することができます。このケースでは、ベクトルインデックスにすでに格納されている文書とマッチングしたいと考える、新規文書のバッチを受け取ります。この文書の最新バッチは、要約の抽出およびそれぞれの要約のエンベディング生成の同様の処理が行われます。マッチする文書のを特定するための、要約抽出、エンベディング生成、既存の文書ストアからの収集においてバッチ推論を活用することができます。ここで使用する適切な3つのDatabricksのAI関数は、要約生成のためにLlamaモデルを用いたai_query、エンベディングのためのGTEとai_query、検索のためのvector_searchです。これら最後の2つのステップは、vector_search関数にquery_embeddings
ではなくquery_text
を引き渡すことで、1つのステップに統合することができます。以下のコードスニペットでは、最初の要約生成ステップは除外していますが、「文書要約」セクションのスニペットと同じものです。
エンベディングを作成します:
create table if not exists batch.dpechi.star_trek_episode_text_emb as
select ai_query('databricks-gte-large-en', act_synopsis) as search_input, act_synopsis from batch.dpechi.star_trek_episode_text_delta
これらエンベディングが生成された文書を検索します:
select batch.dpechi.star_trek_episode_text_emb.search_input as search_input, batch.dpechi.star_trek_episode_text_emb.act_synopsis as search_input_text, search.act_synopsis as search_result from batch.dpechi.star_trek_episode_text_emb,
lateral(
SELECT * FROM vector_search(index => 'batch.dpechi.star_trek_episode_text_index, query_vector => batch.dpechi.star_trek_episode_text_emb.search_input, num_results => 1
)) as search
大規模にベクトルインデックスから結果を取得するために、複数のエンベディングの入力をvector_search
DBSQL AI関数に渡すことができます。
AIクエリーによるファジーマッチの検証
この時点では、マッチする可能性のあるものを取得していますが、ベクトル空間で他の既存文書と近いという不明確な感覚意外に、新規文書が実際にマッチしているということを保証する現実的な手段を有していません。これらのマッチに任意の閾値を設けることを選択することができます。しかし、このハイパーパラメータをチューニングするのではなく、これらの2つのレコードが同じであることを検証する最終段階として、マッチする可能性のある文書のペアをai_queryに容易に引き渡すことができます。この最終検証チェックによって、我々のファジーマッチングプロセスは完成します。
matching_prompt = "Validate that these 2 summaries are matches. Provide only a yes or no."
final_matches = _sqldf.selectExpr("search_input_text", "search_result", f"ai_query('{endpoint_name}', CONCAT('{matching_prompt}', 'Summary 1:', search_input_text, 'Summary 2:', search_result)) as output")
final_matches.display()
まとめ
要約、Vector Search、LLMによる検証をDatabricks上の統合されたパイプラインに統合することで、企業規模での非構造化データのファジーマッチングをどのように行えるのかを説明しました。このアプローチによって、不安定でハードコードされ、経験則に基づく必要性を排除し、さまざまなデータタイプやドメインに適用可能な、モジュール化され、AIを活用したコンポーネントで置き換えることができます。
より広範には、これはデータエンジニアリングに関する考え方におけるシフトを示しています。構造化データの変換やETLジョブに限定されることはなくなり、モダンなデータエンジニアは自身の作業に直接インテリジェンスを埋め込むことができ、非構造化データから構造を抽出することができます。この新たなパラダイムはデータインテリジェンスエンジニアリング、従来のデータエンジニアリングと生成AI、機械学習の能力の融合です。企業が非構造化データソースからさらなる価値を開放すると、Databricksのデータインテリジェンスプラットフォームの完全なスイートを活用することが、より革新的で堅牢なデータパイプラインの提供において重要となります。
リポジトリ: