LoginSignup
0
0

Implement data pipeline fixes — Databricks Generative AI Cookbook [2024/6/26時点]の翻訳です。

本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Databricks生成AIクックブックのコンテンツです。

データパイプライン修正の実装

以下を行うために、あなたのデータパイプラインを修正するための以下のステップに従います:

  1. 新規ベクトルインデックスの作成
  2. データパイプラインのメタデータを持つMLflowランの作成

結果として得られるMLflowランはB_quality_iteration/02_evaluate_fixesノートブックによって参照されます。

データパイプラインの修正には2つのアプローチが存在します:

  1. 一度に一つの修正を実装: このアプローチでは、一度に単一のデータパイプラインを設定して実行します。単一のエンべディングモデルを試し、単体のパーサーをテストするようなケースでは、このモードがベストとなります。これらのノートブックに慣れるために、ここからスタートすることをお勧めします。
  2. 一度に複数の修正を実装: スイープと呼ばれるこのアプローチでは、あなたはそれぞれが異なる設定を持つ複数のデータパイプラインを並列で実行します。3つのPDFパーサーを評価したり、さまざまなチャンクサイズを評価するように、数多くの戦略を"スイープ"したい場合にはベストなモードとなります。

コードレポジトリ
こちらからこのセクションで参照しているすべてのサンプルコードにアクセスすることができます。

アプローチ1: 一度に一つの修正を実装

  1. B_quality_iteration/data_pipeline_fixes/single_fix/00_configノートブックを開きます。
  2. 以下のいずれかを行います:
    • このクックブックで提供される新たな構成を実装するために、ここでの手順に従います。
    • パーシングやチャンキングのカスタムコードを実装するためにこれらのステップに従います。
  3. 以下のいずれかの方法でパイプラインを実行します:
    • 00_Run_Entire_Pipelineノートブックを開いて実行します。
    • パイプラインのそれぞれのステップを手動で実行するためにこれらのステップに従います 。
  4. 出力されるMLflowランの名前を、B_quality_iteration/02_evaluate_fixesノートブックの変数DATA_PIPELINE_FIXES_RUN_NAMESに追加します。

注意
このデータ準備パイプラインは、インクリメンタルにファイルをロード、処理するためにSparkの構造化ストリーミングを活用しています。これにより、すでにロードされ、準備されたファイルはチェックポイントで追跡され、再度処理されることはありません。新たに追加されたファイルのみがロード、準備され、対応するテーブルに追加されます。

このため、最初からパイプラインすべてを再実行し、全てのドキュメントを再処理したい場合には、チェックポイントとテーブルを削除する必要があります。reset_tables_and_checkpointsノートブックを用いることでこれを行うことができます。

アプローチ2: 一度に複数の修正を実装

  1. B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelinesノートブックを開きます。
  2. 実行するデータパイプラインの2つ以上の構成を追加するために、ノートブックの指示に従います。
  3. これらのパイプラインを実行するためにノートブックを実行します。
  4. 出力されるMLflowランの名前を、B_quality_iteration/02_evaluate_fixesノートブックの変数DATA_PIPELINE_FIXES_RUN_NAMESに追加します。

付録

注意
一度に単一の修正を行うのか、複数の修正を行うのかに応じて、以下で参照されるノートブックはsingle_fixあるいはmultiple_fixesディレクトリで確認することができます。

構成設定のディープダイブ

データパイプラインに関して、事前に実装されているさまざまな構成オプションを以下に一覧します。あるいは、カスタムのパーサー/チャンカーを実装することができます。

  • vectorsearch_config: vector searchエンドポイント(稼働中である必要あり)と、作成するインデックス名を指定します。さらに、ソーステーブルとインデックスの同期タイプを定義します(デフォルトはTRIGGERED)。

    • embedding_config: 使用するエンべディングモデルとトークナイザーを指定します。オプションの完全なリストはsupporting_configs/embedding_modelsを参照してください。エンべディングモデルは稼働中のモデルサービングエンドポイントにデプロイされる必要があります。チャンキング戦略に応じて、分割の過程でトークナイザーもまた、チャンクがエンべディングモデルのトークン制限を超えないようにする必要があります。ここでは、選択したエンべディングモデルの最大コンテキスト長を超えないように、テキストチャンクのトークン数をカウントするために活用されます。以下のように、HuggingFaceにあるトークナイザーやTikTokenを選択することができます。
    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }
    
    or
    
    "embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
    
    • pipeline_config: ソースフィールドにファイルパーサーやチャンカーを定義します。パーサーとチャンカーは、parser_librarychunker_libraryノートブックのそれぞれで定義されます。これらは、single_fixあるいはmultiple_fixesディレクトリで確認することができます。オプションの完全なリストについては、同様にsingle、multipleディレクトリにあるsupporting_configs/parser_chunker_strategiesノートブックをご覧ください。以下のように、異なるパーサー、チャンカーでは異なる設定パラメータを必要とします。

          "chunker": {
          "name": <chunker-name>,
          "config": {
              "<param 1>": "...",
              "<param 2>": "...",
              ...
          }
      }
      

      ここでは、<param x>は特定のチャンカーで必要となるパラメーターを表現しています。パーサーでも同じフォーマットを用いて、設定値を引き渡すことができます。

カスタムパーサー/チャンカーの実装

このプロジェクトは、データ準備パイプラインへのカスタムパーサーとチャンカーの追加を促進するように構造化されています。

新規パーサーの追加

パースしたテキストをマークダウンフォーマットに変換するために、PyMuPDFライブラリを用いた新規のパーサーを取り込みたいものとします。以下のステップに従います:

  1. single_fixあるいはmultiple_fixディレクトリのparser_libraryノートブックに以下のコードを追加することで、必要な依存関係をインストールします:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. single_fixあるいはmultiple_fixディレクトリのparser_libraryノートブックで、PyMuPdfMarkdownパーサーのための新たなセクションを追加し、パーシングの関数を実装します:

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    

    この関数のアウトプットが、ノートブックの最初で定義されているParserReturnValueクラスに準拠するようにしてください。これによって、Spark UDFとの互換性を保つことができます。try/exceptブロックによって、single_fixあるいはmultiple_fixディレクトリにある02_parse_docsノートブックで、UDFとしてパーサーを適用する際、個々のドキュメントのエラーでパースジョブ全体が失敗することを防いでいます。このノートブックでは、いずれかのドキュメントでパーシングが失敗した際には、対応する行を検疫して警告を出します。

  3. 00_configノートブックのpipeline_configで設定できるように、あなたの新規のパーシング関数をsingle_fixあるいはmultiple_fixディレクトリにあるparser_libraryノートブックのparser_factoryに追加します。

  4. 02_parse_docsノートブックでは、パーサー関数はSpark Python UDF(DBR >= 14.0ではarrow最適化)に変換され、新たなバイナリーPDFファイルを含むデータフレームに適用されます。テストや開発では、test-document.pdfファイルをロードするparser_library notebookにシンプルなテスト用関数を追加し、パーシングが成功することを検証します:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

新規チャンカーの追加

新規チャンカーを追加するプロセスは、上述した新規パーサーのステップと似ています。

  1. chunker_libraryノートブックに必要な依存関係を追加します。
  2. あなたのチャンカーの新規セクションを追加し、chunk_parsed_content_newchunkernameのような関数を実装します。新規のチャンカー関数の出力は、chunker_libraryノートブックの最初で定義されているChunkerReturnValueクラスに準拠しなくてはなりません。この関数は少なくとも、チャンクされるパース後のテキスト文字列を受け付ける必要があります。あなたのチャンカーで追加のパラメーターが必要な場合、関数のパラメーターとして追加することができます。
  3. chunker_libraryノートブックで定義されているchunker_factory関数に新規のチャンカーを追加します。あなたの関数で追加のパラメーターを受け取る場合には、事前設定するためにfunctoolsのpartialを使います。UDFは1つの入力パラメーターのみを受け付け、この場合ではパースされたテキストとなるため、この手順が必要となります。chunker_factoryによって、pipeline_configで異なるチャンカーメソッドを設定し、Spark Python UDF(DBR >= 14.0に最適化)を返却することができます。
  4. あなたの新規チャンキング関数にシンプルなテストセクションを追加します。このセクションでは、文字列として渡される事前定義済みのテキストをチャンクすべきです。

パフォーマンスチューニング

Sparkは処理の並列化にパーティションを使用します。データは行のチャンクに分割され、それぞれのパーティションは、デフォルトでは単一のコアで処理されます。しかし、Apache Sparkによって初回にデータが読み込まれる際、特に我々のUDFがパーシングやチャンキングのタスクを実行する際に期待する処理に最適化されたパーティションが作成されない場合があります。効率的な並列のために十分に小さいパーティションを作成することと、管理のオーバーヘッドがメリットを上回ってしまうほど小さすぎるパーティションにならないバランスを見極めることが重要です。

df.repartitions(<パーティションの数>)でパーティションの数を調整することができます。UDFを適用する際は、ワーカーノードで利用できるコアの倍数になるように狙いましょう。例えば、02_parse_docsノートブックでは、利用できるワーカーのコアの数の2倍のパーティションを作成するように、df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)を含めることができます。通常は、1から3倍の数で満足する性能を達成することができます。

手動でのパイプライン実行

あるいは、ステップバイステップで個々のノートブックを実行することができます:

  1. 01_load_filesノートブックを用いた生のファイルのロード。これによって、destination_tables_configで定義されているブロンズテーブル(raw_files_table_name)にそれぞれのドキュメントのバイナリーが1レコードとして保存されます。ファイルはインクリメンタルにロードされ、最後の処理から追加されたドキュメントのみが処理されます。
  2. 02_parse_docsノートブックを用いたドキュメントのパース。このノートブックはparser_libraryノートブックを実行し、さまざまなパーサーや関連ユーティリティを利用できるようにします。それぞれのドキュメントをプレーンなテキストにパースするために、pipeline_configで指定されたパーサーを使用します。
  3. 03_chunk_docsノートブックを用いたパースドキュメントのチャンク。パーシングと同様に、このノートブックはchunker_libraryノートブックを実行します(ここでも最初のセルとして実行します)。パースされたドキュメントのそれぞれを、pipeline_configで指定されたチャンカーを用いて小さなチャンクに分割します。それぞれのチャンクには、ベクトル検索インデックスとの同期に必要なMD5ハッシュを用いたユニークなIDが割り当てられます。最終的なチャンクはゴールドテーブル(chunked_docs_table_name)にロードされます。
  4. 04_vector_indexを用いたベクトル検索インデックスの作成/同期。このノートブックは、vectorsearch_configで指定されたベクトルサーチエンドポイントの準備状況を検証します。設定されたインデックスがすでに存在する場合、ゴールドテーブルとの同期を起動します。それ以外の場合には、インデックスを作成し同期を起動します。Vector Searchのエンドポイントとインデックスが作成されていない場合、これにはある程度の時間を要します。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0