LoginSignup
4
2

More than 1 year has passed since last update.

Databricksにおけるノートブックワークフロー

Last updated at Posted at 2021-10-23

Notebook workflows | Databricks on AWS [2021/9/14]の翻訳です。

%runコマンドを用いることで、ノートブックで別のノートブックをインクルードすることができます。例えば、別のノートブックにヘルパー関数を記述するなど、コードをモジュール化するために%runを使用することができます。%runを使用する際、呼び出されたノートブックは即時に実行され、呼び出し元のノートブックで呼び出し先で定義された関数、変数をすぐに利用できるようになります。

ノートブックワークフローは、パラメーターの引き渡し、ノートブックから値を戻すことができ、%runを補完するものとなります。これにより、依存関係を持った複雑なワークフロー、パイプラインを構築することができます。例えば、ディレクトリ内のファイルのリストを取得し、別のノートブックにファイル名を渡すことができます。これは、%runでは不可能です。また、戻り値に基づくif-then-elseのワークフローを作成したり、相対パスを用いて別のノートブックを呼び出すこともできます。

ノートブックワークフローを実装するには、dbutils.notebook.*メソッドを使用します。%runと異なり、dbutils.notebook.run()メソッドは、ノートブックを実行するために新たなジョブを起動します。

これらのメソッドは、全てのdbutils APIのようにPythonとScalaでのみ利用できます。しかし、Rノートブックを起動するために、dbutils.notebook.run()を使用することができます。

注意
30日以内に完了するノートブックワークフローのジョブのみをサポートしています。

API

ノートブックワークフローを構築するためにdbutils.notebook APIで利用できるメソッドは、runexitです。パラメーター、戻り値は両方とも文字列である必要があります。

run(path: String, timeout_seconds: int, arguments: Map): String

ノートブックを実行し、終了時の値を戻します。このメソッドは、短期間のジョブを即時実行します。

timeout_secondsパラメーターは、処理のタイムアウト(0はタイムアウトしないことを意味します)を制御します。runは、指定した時間に処理を完了しない場合、例外をスローします。10分以上Databricksがダウンした場合、timeout_secondsに関係なく、ノートブックの実行は失敗します。

argumentsパラメーターは、ターゲットノートブックのウィジェットの値を設定します。特に、実行しているノートブックにAというウィジェットがある場合、run()を呼び出す際、argumentsパラメーターの一部としてキーバリューのペア("A": "B")を指定すると、ウィジェットAの値を取得した際、"B"が返却されます。ウィジェットの作り方、使い方に関しては、Widgetsを参照ください。

警告!
argumentsパラメーターは、ラテン文字(ASCII文字セット)のみを受け付けます。非ASCII文字はエラーを返します。非ASCII文字の例は、中国語、日本語の漢字、絵文字などが挙げられます。

runの使い方

Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

runのサンプル

fooという名前のウィジェットを持ち、ウィジェットの値を表示するworkflowsというノートブックがあるとします。

Scala
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")

dbutils.notebook.run("workflows", 60, {"foo": "bar"})を実行することで、以下の結果が得られます。

ウィジェットは、デフォルト値ではなく、ワークフローを通じて渡した値"bar"を持っていました。

exit(value: String): voidは、値ともにノートブックを修了します。runを用いてノートブックを呼び出したのであれば、これが戻り値となります。

Scala
dbutils.notebook.exit("returnValue")

ジョブでdbutils.notebook.exitを呼び出すと、ノートブックは処理に成功したとして完了します。ジョブを失敗させたい場合には、例外をスローしてください。

サンプル

以下のサンプルでは、DataImportNotebookに引数を渡し、DataImportNotebookの結果に基づいて異なるノートブック(DataCleaningNotebookErrorHandlingNotebook)を実行しています。

ノートブックワークフローを実行すると、実行中のノートブックへのリンクが現れます。

ノートブックリンクNotebook job #xxxxをクリックすると、処理の詳細を参照することができます。

構造化データの引き渡し

ここでは、ノートブック間でどのように構造化データを引き渡すのかを説明します。

Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.parquet(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

エラーのハンドリング

ここでは、ノートブックワークフローでどのようにエラーをハンドリングするのかを説明します。

Python
# Errors in workflows thrown a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
Scala
// Errors in workflows thrown a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

複数ノートブックの同時実行

Threads(Scala, Python)やFutures(Scala, Python)のような標準的なScala、Pythonのコンストラクタを用いて、複数のノートブックを同時に実行することができます。こちらのノートブックでは、これらのコンストラクタの使い方をデモンストレーションしています。こちらはScalaノートブックですが、簡単に同じものをPythonで記述することができます。使い方は以下の通りとなります。

  1. 上のリポジトリをReposでワークスペースに連携
  2. Concurrent Notebooksノートブックを実行

Databricks 無料トライアル

Databricks 無料トライアル

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2