概要
Databricks にて Python により並列でノートブックワークフロー(dbutils.notebook.run
によるノートブック実行)を実施する方法を共有します。公式ドキュメントには Scala での実装例が紹介されていますが、Python での具体的なコード例は示されていません。この記事では、concurrent.futures
ライブラリを活用して、複数のノートブックを同時に実行する手順を解説します。
引用元:Databricksノートブックを別のノートブックから実行する | Databricks on AWS
プログラムと処理結果
処理内容を保持した被実行対象のノートブックを作成
並列実行されるノートブックは、input_text
というパラメータを受け取り、指定された値に基づいて処理を行います。以下のコードは、ウィジェットを設定し、その値を変数に割り当てた後、10秒間の待機時間を設けてデータフレームを表示する例です。
import json
import time
# ウィジェットを設定してウィジェットの値を変数にセット
dbutils.widgets.text("input_text", "abc")
input_text = dbutils.widgets.get("input_text")
# 10 秒待機後にデータフレームを表示
time.sleep(10)
spark.sql(f"SELECT '{input_text}'").display()
# 終了処理
dbutils.notebook.exit(json.dumps({"input_text": input_text}))
並列で処理を行うノートブックにてクラスを定義
並列処理を管理するためのクラス ExecuteMultiNotebooks
を定義します。このクラスは、ノートブックの実行をサブミットする submit_notebook
メソッドと、複数のノートブックを並列で実行する run_notebooks_in_parallel
メソッドを含んでいます。
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession
class ExecuteMultiNotebooks():
def submit_notebook(self, notebooks_conf: Dict[str, Any]) -> Any:
spark = SparkSession.getActiveSession()
dbutils = DBUtils(spark)
notebook_path = notebooks_conf.get("notebook_path")
parameters = notebooks_conf.get("parameters", {})
timeout = notebooks_conf.get("timeout", 3600)
retry = notebooks_conf.get("retry", 0)
try:
return dbutils.notebook.run(notebook_path, timeout, parameters)
except Exception:
if retry < 1:
raise
notebooks_conf["retry"] = retry - 1
self.submit_notebook(notebooks_conf)
def run_notebooks_in_parallel(
self,
notebooks_conf: List[Dict[str, Any]],
parallel_num: int,
timeout: int = 3600,
) -> List[Any]:
with ThreadPoolExecutor(max_workers=parallel_num) as ec:
submitted_notebooks = [ec.submit(self.submit_notebook, nb_info) for nb_info in notebooks_conf]
return [submitted_nb.result(timeout=timeout) for submitted_nb in submitted_notebooks]
並列で処理を行うノートブックの設定値を変数に定義
並列数と実行するノートブックに関する次の設定値を変数に定義します。
# | 項目 | 概要 |
---|---|---|
1 | notebook_path | ノートブックのパス |
2 | timeout | ノートブック実行のタイムアウト |
3 | parameters | ノートブックに渡すパラメータ |
4 | retry | エラー時のリトライ処理 |
# 並列数を指定
parallel_num = 2
# 実行するノートブックに関する情報をセット
notebooks_confs = [
{
"notebook_path": "./print_test",
"timeout": 60,
"parameters": {},
"retry": 3,
},
{
"notebook_path": "./print_test",
"timeout": 60,
"parameters": {"input_text": "1つ目"},
"retry": 3,
},
{
"notebook_path": "./print_test",
"timeout": 60,
"parameters": {"input_text": "2つ目"},
"retry": 3,
},
{
"notebook_path": "./print_test",
"timeout": 60,
"parameters": {"input_text": "3つ目"},
"retry": 3,
},
{
"notebook_path": "./print_test",
"timeout": 60,
"parameters": {"input_text": "4つ目"},
"retry": 3,
},
]
並列で処理を行うノートブックにて並列処理を実行
ExecuteMultiNotebooks
クラスのインスタンスを作成し、定義した設定値を使用して並列処理を実行します。
exe_nbs = ExecuteMultiNotebooks()
results = exe_nbs.run_notebooks_in_parallel(
notebooks_confs,
parallel_num=parallel_num,
)
並列で処理を行うノートブックにて並列処理の結果を確認
実行されたノートブックの結果としては、次のような結果となります。
並列処理の出力結果としては、次のような結果がリターンされます。
print(results)
まとめ
この記事では、Databricks上でPythonを使用してノートブックワークフローを並列で実行する方法について説明しました。Scalaでの実装例は公式ドキュメントで紹介されていますが、Pythonでの具体的なコード例は示されていないため、ここでconcurrent.futures
ライブラリを用いた実装方法を提供しました。
まず、並列実行されるノートブックを作成し、input_text
パラメータを受け取って指定された値に基づいて処理を行うように設定します。
次に、並列処理を行うノートブックにて並列処理を管理するExecuteMultiNotebooks
クラスを定義し、ノートブックの実行をサブミットするメソッドと複数のノートブックを並列で実行するメソッドを定義します。設定値を変数に定義した後、ExecuteMultiNotebooks
クラスのインスタンスを作成し、定義した設定値を使用して並列処理を実行します。最後に、並列処理の結果を出力して確認します。
このプロセスを通じて、Databricks上でPythonを使用して効率的に複数のノートブックを同時に実行する方法を理解することができます。これにより、データ処理や分析のワークフローを高速化し、生産性を向上させることが可能になります。