Notebook workflows | Databricks on AWS [2021/9/14]の翻訳です。
%runコマンドを用いることで、ノートブックで別のノートブックをインクルードすることができます。例えば、別のノートブックにヘルパー関数を記述するなど、コードをモジュール化するために%run
を使用することができます。%run
を使用する際、呼び出されたノートブックは即時に実行され、呼び出し元のノートブックで呼び出し先で定義された関数、変数をすぐに利用できるようになります。
ノートブックワークフローは、パラメーターの引き渡し、ノートブックから値を戻すことができ、%run
を補完するものとなります。これにより、依存関係を持った複雑なワークフロー、パイプラインを構築することができます。例えば、ディレクトリ内のファイルのリストを取得し、別のノートブックにファイル名を渡すことができます。これは、%run
では不可能です。また、戻り値に基づくif-then-elseのワークフローを作成したり、相対パスを用いて別のノートブックを呼び出すこともできます。
ノートブックワークフローを実装するには、dbutils.notebook.*
メソッドを使用します。%run
と異なり、dbutils.notebook.run()
メソッドは、ノートブックを実行するために新たなジョブを起動します。
これらのメソッドは、全てのdbutils
APIのようにPythonとScalaでのみ利用できます。しかし、Rノートブックを起動するために、dbutils.notebook.run()
を使用することができます。
注意
30日以内に完了するノートブックワークフローのジョブのみをサポートしています。
API
ノートブックワークフローを構築するためにdbutils.notebook
APIで利用できるメソッドは、run
とexit
です。パラメーター、戻り値は両方とも文字列である必要があります。
run(path: String, timeout_seconds: int, arguments: Map): String
ノートブックを実行し、終了時の値を戻します。このメソッドは、短期間のジョブを即時実行します。
timeout_seconds
パラメーターは、処理のタイムアウト(0はタイムアウトしないことを意味します)を制御します。run
は、指定した時間に処理を完了しない場合、例外をスローします。10分以上Databricksがダウンした場合、timeout_seconds
に関係なく、ノートブックの実行は失敗します。
arguments
パラメーターは、ターゲットノートブックのウィジェットの値を設定します。特に、実行しているノートブックにA
というウィジェットがある場合、run()
を呼び出す際、argumentsパラメーターの一部としてキーバリューのペア("A": "B")
を指定すると、ウィジェットA
の値を取得した際、"B"
が返却されます。ウィジェットの作り方、使い方に関しては、Widgetsを参照ください。
警告!
arguments
パラメーターは、ラテン文字(ASCII文字セット)のみを受け付けます。非ASCII文字はエラーを返します。非ASCII文字の例は、中国語、日本語の漢字、絵文字などが挙げられます。
run
の使い方
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
のサンプル
foo
という名前のウィジェットを持ち、ウィジェットの値を表示するworkflows
というノートブックがあるとします。
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print dbutils.widgets.get("foo")
dbutils.notebook.run("workflows", 60, {"foo": "bar"})
を実行することで、以下の結果が得られます。
ウィジェットは、デフォルト値ではなく、ワークフローを通じて渡した値"bar"
を持っていました。
exit(value: String): void
は、値ともにノートブックを修了します。run
を用いてノートブックを呼び出したのであれば、これが戻り値となります。
dbutils.notebook.exit("returnValue")
ジョブでdbutils.notebook.exit
を呼び出すと、ノートブックは処理に成功したとして完了します。ジョブを失敗させたい場合には、例外をスローしてください。
サンプル
以下のサンプルでは、DataImportNotebook
に引数を渡し、DataImportNotebook
の結果に基づいて異なるノートブック(DataCleaningNotebook
かErrorHandlingNotebook
)を実行しています。
ノートブックワークフローを実行すると、実行中のノートブックへのリンクが現れます。
ノートブックリンクNotebook job #xxxxをクリックすると、処理の詳細を参照することができます。
構造化データの引き渡し
ここでは、ノートブック間でどのように構造化データを引き渡すのかを説明します。
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.parquet(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.parquet("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.parquet(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
エラーのハンドリング
ここでは、ノートブックワークフローでどのようにエラーをハンドリングするのかを説明します。
# Errors in workflows thrown a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
// Errors in workflows thrown a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
複数ノートブックの同時実行
Threads(Scala, Python)やFutures(Scala, Python)のような標準的なScala、Pythonのコンストラクタを用いて、複数のノートブックを同時に実行することができます。こちらのノートブックでは、これらのコンストラクタの使い方をデモンストレーションしています。こちらはScalaノートブックですが、簡単に同じものをPythonで記述することができます。使い方は以下の通りとなります。
- 上のリポジトリをReposでワークスペースに連携
- Concurrent Notebooksノートブックを実行