3
5

DatabricksでPythonを使ってノートブックを並列実行する方法

Posted at

概要

Databricks にて Python により並列でノートブックワークフロー(dbutils.notebook.run によるノートブック実行)を実施する方法を共有します。公式ドキュメントには Scala での実装例が紹介されていますが、Python での具体的なコード例は示されていません。この記事では、concurrent.futures ライブラリを活用して、複数のノートブックを同時に実行する手順を解説します。

image.png

引用元:Databricksノートブックを別のノートブックから実行する | Databricks on AWS

プログラムと処理結果

処理内容を保持した被実行対象のノートブックを作成

並列実行されるノートブックは、input_text というパラメータを受け取り、指定された値に基づいて処理を行います。以下のコードは、ウィジェットを設定し、その値を変数に割り当てた後、10秒間の待機時間を設けてデータフレームを表示する例です。

import json
import time

# ウィジェットを設定してウィジェットの値を変数にセット
dbutils.widgets.text("input_text", "abc")
input_text = dbutils.widgets.get("input_text")

# 10 秒待機後にデータフレームを表示
time.sleep(10)
spark.sql(f"SELECT '{input_text}'").display()
# 終了処理
dbutils.notebook.exit(json.dumps({"input_text": input_text}))

image.png

並列で処理を行うノートブックにてクラスを定義

並列処理を管理するためのクラス ExecuteMultiNotebooks を定義します。このクラスは、ノートブックの実行をサブミットする submit_notebook メソッドと、複数のノートブックを並列で実行する run_notebooks_in_parallel メソッドを含んでいます。

import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List

from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession


class ExecuteMultiNotebooks():
    def submit_notebook(self, notebooks_conf: Dict[str, Any]) -> Any:
        spark = SparkSession.getActiveSession()
        dbutils = DBUtils(spark)
        notebook_path = notebooks_conf.get("notebook_path")
        parameters = notebooks_conf.get("parameters", {})
        timeout = notebooks_conf.get("timeout", 3600)
        retry = notebooks_conf.get("retry", 0)
        try:
            return dbutils.notebook.run(notebook_path, timeout, parameters)
        except Exception:
            if retry < 1:
                raise
            notebooks_conf["retry"] = retry - 1
            self.submit_notebook(notebooks_conf)

    def run_notebooks_in_parallel(
        self,
        notebooks_conf: List[Dict[str, Any]],
        parallel_num: int,
        timeout: int = 3600,
    ) -> List[Any]:
        with ThreadPoolExecutor(max_workers=parallel_num) as ec:
            submitted_notebooks = [ec.submit(self.submit_notebook, nb_info) for nb_info in notebooks_conf]
        return [submitted_nb.result(timeout=timeout) for submitted_nb in submitted_notebooks]

image.png

並列で処理を行うノートブックの設定値を変数に定義

並列数と実行するノートブックに関する次の設定値を変数に定義します。

# 項目 概要
1 notebook_path ノートブックのパス
2 timeout ノートブック実行のタイムアウト
3 parameters ノートブックに渡すパラメータ
4 retry エラー時のリトライ処理
# 並列数を指定
parallel_num = 2

# 実行するノートブックに関する情報をセット
notebooks_confs = [
    {
        "notebook_path": "./print_test",
        "timeout": 60,
        "parameters": {},
        "retry": 3,
    },
    {
        "notebook_path": "./print_test",
        "timeout": 60,
        "parameters": {"input_text": "1つ目"},
        "retry": 3,
    },
    {
        "notebook_path": "./print_test",
        "timeout": 60,
        "parameters": {"input_text": "2つ目"},
        "retry": 3,
    },
    {
        "notebook_path": "./print_test",
        "timeout": 60,
        "parameters": {"input_text": "3つ目"},
        "retry": 3,
    },
    {
        "notebook_path": "./print_test",
        "timeout": 60,
        "parameters": {"input_text": "4つ目"},
        "retry": 3,
    },
]

image.png

並列で処理を行うノートブックにて並列処理を実行

ExecuteMultiNotebooks クラスのインスタンスを作成し、定義した設定値を使用して並列処理を実行します。

exe_nbs = ExecuteMultiNotebooks()

results = exe_nbs.run_notebooks_in_parallel(
    notebooks_confs,
    parallel_num=parallel_num,
)

image.png

並列で処理を行うノートブックにて並列処理の結果を確認

実行されたノートブックの結果としては、次のような結果となります。

image.png

image.png

並列処理の出力結果としては、次のような結果がリターンされます。

print(results)

image.png

まとめ

この記事では、Databricks上でPythonを使用してノートブックワークフローを並列で実行する方法について説明しました。Scalaでの実装例は公式ドキュメントで紹介されていますが、Pythonでの具体的なコード例は示されていないため、ここでconcurrent.futuresライブラリを用いた実装方法を提供しました。

まず、並列実行されるノートブックを作成し、input_textパラメータを受け取って指定された値に基づいて処理を行うように設定します。

次に、並列処理を行うノートブックにて並列処理を管理するExecuteMultiNotebooksクラスを定義し、ノートブックの実行をサブミットするメソッドと複数のノートブックを並列で実行するメソッドを定義します。設定値を変数に定義した後、ExecuteMultiNotebooksクラスのインスタンスを作成し、定義した設定値を使用して並列処理を実行します。最後に、並列処理の結果を出力して確認します。

このプロセスを通じて、Databricks上でPythonを使用して効率的に複数のノートブックを同時に実行する方法を理解することができます。これにより、データ処理や分析のワークフローを高速化し、生産性を向上させることが可能になります。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5