Implement data pipeline fixes — Databricks Generative AI Cookbook [2024/6/26時点]の翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricks生成AIクックブックのコンテンツです。
データパイプライン修正の実装
以下を行うために、あなたのデータパイプラインを修正するための以下のステップに従います:
- 新規ベクトルインデックスの作成
- データパイプラインのメタデータを持つMLflowランの作成
結果として得られるMLflowランはB_quality_iteration/02_evaluate_fixes
ノートブックによって参照されます。
データパイプラインの修正には2つのアプローチが存在します:
- 一度に一つの修正を実装: このアプローチでは、一度に単一のデータパイプラインを設定して実行します。単一のエンべディングモデルを試し、単体のパーサーをテストするようなケースでは、このモードがベストとなります。これらのノートブックに慣れるために、ここからスタートすることをお勧めします。
- 一度に複数の修正を実装: スイープと呼ばれるこのアプローチでは、あなたはそれぞれが異なる設定を持つ複数のデータパイプラインを並列で実行します。3つのPDFパーサーを評価したり、さまざまなチャンクサイズを評価するように、数多くの戦略を"スイープ"したい場合にはベストなモードとなります。
アプローチ1: 一度に一つの修正を実装
-
B_quality_iteration/data_pipeline_fixes/single_fix/00_config
ノートブックを開きます。 - 以下のいずれかを行います:
- 以下のいずれかの方法でパイプラインを実行します:
- 00_Run_Entire_Pipelineノートブックを開いて実行します。
- パイプラインのそれぞれのステップを手動で実行するためにこれらのステップに従います 。
- 出力されるMLflowランの名前を、
B_quality_iteration/02_evaluate_fixes
ノートブックの変数DATA_PIPELINE_FIXES_RUN_NAMES
に追加します。
注意
このデータ準備パイプラインは、インクリメンタルにファイルをロード、処理するためにSparkの構造化ストリーミングを活用しています。これにより、すでにロードされ、準備されたファイルはチェックポイントで追跡され、再度処理されることはありません。新たに追加されたファイルのみがロード、準備され、対応するテーブルに追加されます。
このため、最初からパイプラインすべてを再実行し、全てのドキュメントを再処理したい場合には、チェックポイントとテーブルを削除する必要があります。reset_tables_and_checkpointsノートブックを用いることでこれを行うことができます。
アプローチ2: 一度に複数の修正を実装
-
B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines
ノートブックを開きます。 - 実行するデータパイプラインの2つ以上の構成を追加するために、ノートブックの指示に従います。
- これらのパイプラインを実行するためにノートブックを実行します。
- 出力されるMLflowランの名前を、
B_quality_iteration/02_evaluate_fixes
ノートブックの変数DATA_PIPELINE_FIXES_RUN_NAMES
に追加します。
付録
注意
一度に単一の修正を行うのか、複数の修正を行うのかに応じて、以下で参照されるノートブックはsingle_fix
あるいはmultiple_fixes
ディレクトリで確認することができます。
構成設定のディープダイブ
データパイプラインに関して、事前に実装されているさまざまな構成オプションを以下に一覧します。あるいは、カスタムのパーサー/チャンカーを実装することができます。
-
vectorsearch_config
: vector searchエンドポイント(稼働中である必要あり)と、作成するインデックス名を指定します。さらに、ソーステーブルとインデックスの同期タイプを定義します(デフォルトはTRIGGERED
)。-
embedding_config
: 使用するエンべディングモデルとトークナイザーを指定します。オプションの完全なリストはsupporting_configs/embedding_models
を参照してください。エンべディングモデルは稼働中のモデルサービングエンドポイントにデプロイされる必要があります。チャンキング戦略に応じて、分割の過程でトークナイザーもまた、チャンクがエンべディングモデルのトークン制限を超えないようにする必要があります。ここでは、選択したエンべディングモデルの最大コンテキスト長を超えないように、テキストチャンクのトークン数をカウントするために活用されます。以下のように、HuggingFaceにあるトークナイザーやTikTokenを選択することができます。
"embedding_tokenizer": { "tokenizer_model_name": "BAAI/bge-large-en-v1.5", "tokenizer_source": "hugging_face", } or "embedding_tokenizer": { "tokenizer_model_name": "text-embedding-small", "tokenizer_source": "tiktoken", }
-
pipeline_config
: ソースフィールドにファイルパーサーやチャンカーを定義します。パーサーとチャンカーは、parser_library
とchunker_library
ノートブックのそれぞれで定義されます。これらは、single_fix
あるいはmultiple_fixes
ディレクトリで確認することができます。オプションの完全なリストについては、同様にsingle、multipleディレクトリにあるsupporting_configs/parser_chunker_strategies
ノートブックをご覧ください。以下のように、異なるパーサー、チャンカーでは異なる設定パラメータを必要とします。"chunker": { "name": <chunker-name>, "config": { "<param 1>": "...", "<param 2>": "...", ... } }
ここでは、
<param x>
は特定のチャンカーで必要となるパラメーターを表現しています。パーサーでも同じフォーマットを用いて、設定値を引き渡すことができます。
-
カスタムパーサー/チャンカーの実装
このプロジェクトは、データ準備パイプラインへのカスタムパーサーとチャンカーの追加を促進するように構造化されています。
新規パーサーの追加
パースしたテキストをマークダウンフォーマットに変換するために、PyMuPDFライブラリを用いた新規のパーサーを取り込みたいものとします。以下のステップに従います:
-
single_fix
あるいはmultiple_fix
ディレクトリのparser_library
ノートブックに以下のコードを追加することで、必要な依存関係をインストールします:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
-
single_fix
あるいはmultiple_fix
ディレクトリのparser_library
ノートブックで、PyMuPdfMarkdown
パーサーのための新たなセクションを追加し、パーシングの関数を実装します:import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
この関数のアウトプットが、ノートブックの最初で定義されている
ParserReturnValue
クラスに準拠するようにしてください。これによって、Spark UDFとの互換性を保つことができます。try
/except
ブロックによって、single_fix
あるいはmultiple_fix
ディレクトリにある02_parse_docs
ノートブックで、UDFとしてパーサーを適用する際、個々のドキュメントのエラーでパースジョブ全体が失敗することを防いでいます。このノートブックでは、いずれかのドキュメントでパーシングが失敗した際には、対応する行を検疫して警告を出します。 -
00_config
ノートブックのpipeline_config
で設定できるように、あなたの新規のパーシング関数をsingle_fix
あるいはmultiple_fix
ディレクトリにあるparser_library
ノートブックのparser_factory
に追加します。 -
02_parse_docs
ノートブックでは、パーサー関数はSpark Python UDF(DBR >= 14.0ではarrow最適化)に変換され、新たなバイナリーPDFファイルを含むデータフレームに適用されます。テストや開発では、test-document.pdfファイルをロードするparser_library notebookにシンプルなテスト用関数を追加し、パーシングが成功することを検証します:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
新規チャンカーの追加
新規チャンカーを追加するプロセスは、上述した新規パーサーのステップと似ています。
- chunker_libraryノートブックに必要な依存関係を追加します。
- あなたのチャンカーの新規セクションを追加し、
chunk_parsed_content_newchunkername
のような関数を実装します。新規のチャンカー関数の出力は、chunker_libraryノートブックの最初で定義されているChunkerReturnValue
クラスに準拠しなくてはなりません。この関数は少なくとも、チャンクされるパース後のテキスト文字列を受け付ける必要があります。あなたのチャンカーで追加のパラメーターが必要な場合、関数のパラメーターとして追加することができます。 -
chunker_libraryノートブックで定義されている
chunker_factory
関数に新規のチャンカーを追加します。あなたの関数で追加のパラメーターを受け取る場合には、事前設定するためにfunctoolsのpartialを使います。UDFは1つの入力パラメーターのみを受け付け、この場合ではパースされたテキストとなるため、この手順が必要となります。chunker_factory
によって、pipeline_configで異なるチャンカーメソッドを設定し、Spark Python UDF(DBR >= 14.0に最適化)を返却することができます。 - あなたの新規チャンキング関数にシンプルなテストセクションを追加します。このセクションでは、文字列として渡される事前定義済みのテキストをチャンクすべきです。
パフォーマンスチューニング
Sparkは処理の並列化にパーティションを使用します。データは行のチャンクに分割され、それぞれのパーティションは、デフォルトでは単一のコアで処理されます。しかし、Apache Sparkによって初回にデータが読み込まれる際、特に我々のUDFがパーシングやチャンキングのタスクを実行する際に期待する処理に最適化されたパーティションが作成されない場合があります。効率的な並列のために十分に小さいパーティションを作成することと、管理のオーバーヘッドがメリットを上回ってしまうほど小さすぎるパーティションにならないバランスを見極めることが重要です。
df.repartitions(<パーティションの数>)
でパーティションの数を調整することができます。UDFを適用する際は、ワーカーノードで利用できるコアの倍数になるように狙いましょう。例えば、02_parse_docsノートブックでは、利用できるワーカーのコアの数の2倍のパーティションを作成するように、df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
を含めることができます。通常は、1から3倍の数で満足する性能を達成することができます。
手動でのパイプライン実行
あるいは、ステップバイステップで個々のノートブックを実行することができます:
-
01_load_files
ノートブックを用いた生のファイルのロード。これによって、destination_tables_config
で定義されているブロンズテーブル(raw_files_table_name
)にそれぞれのドキュメントのバイナリーが1レコードとして保存されます。ファイルはインクリメンタルにロードされ、最後の処理から追加されたドキュメントのみが処理されます。 -
02_parse_docs
ノートブックを用いたドキュメントのパース。このノートブックはparser_library
ノートブックを実行し、さまざまなパーサーや関連ユーティリティを利用できるようにします。それぞれのドキュメントをプレーンなテキストにパースするために、pipeline_config
で指定されたパーサーを使用します。 -
03_chunk_docs
ノートブックを用いたパースドキュメントのチャンク。パーシングと同様に、このノートブックはchunker_library
ノートブックを実行します(ここでも最初のセルとして実行します)。パースされたドキュメントのそれぞれを、pipeline_config
で指定されたチャンカーを用いて小さなチャンクに分割します。それぞれのチャンクには、ベクトル検索インデックスとの同期に必要なMD5ハッシュを用いたユニークなIDが割り当てられます。最終的なチャンクはゴールドテーブル(chunked_docs_table_name
)にロードされます。 -
04_vector_index
を用いたベクトル検索インデックスの作成/同期。このノートブックは、vectorsearch_config
で指定されたベクトルサーチエンドポイントの準備状況を検証します。設定されたインデックスがすでに存在する場合、ゴールドテーブルとの同期を起動します。それ以外の場合には、インデックスを作成し同期を起動します。Vector Searchのエンドポイントとインデックスが作成されていない場合、これにはある程度の時間を要します。
- 目次
- 前のセクション: ステップ6: 繰り返しの実装 & 品質改善の評価
- 次のセクション: ステップ6: デプロイと監視