Jobs | Databricks on AWS [2022/4/19時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksクイックスタートガイドのコンテンツです。
ジョブは非インタラクティブにDatabricksクラスターを実行するための手段です。例えば、抽出、変換、ロード(ETL)ワークロードをインタラクティブあるいはスケジュール処理として実行することができます。また、ノートブックのUI上でジョブをインタラクティブに実行することもできます。
UIあるいはCLI、Jobs APIを用いてジョブの作成実行が可能です。UIやAPIを用いて失敗あるいはキャンセルしたジョブを復旧し再実行することができます。UI、CLI、API、メールのアラートを用いてジョブの実行結果をモニタリングすることができます。本書では、UIを用いたジョブタスクの実行にフォーカスします。他の方法に関しては、Jobs CLIとJobs CLIを参照ください。
ジョブは単一のタスク、あるいは複雑な依存関係を持つ大規模なマルチタスクワークフローから構成することができます。お使いのジョブ全てのタスクオーケストレーション、クラスター管理、モニタリング、エラーレポートはDatabrickによって管理されます。ジョブを即時実行したり、使いやすいスケジューリングシステムを用いて定期的にジョブを実行することができます。
タスクは、JAR、Databricksノートブック、Delta Live Tablesのパイプライン、そして、Scala、Java、Pythonで記述されたアプリケーションで実装することができます。レガシーなSpark Submitアプリケーションもサポートされています。タスク間の依存関係を指定することでタスクの実行順序を制御することができます。タスクを直列あるいは並列に実行するように設定することができます。以下の図では下のワークフローを示しています。
- 生のクリックストリームデータを取り込み、レコードをセッション化するために変換
- 注文データを取り込み、セッション化されたクリックストリームデータと結合し、分析データを準備
- 準備されたデータから特徴量を抽出
- 特徴量を永続化し、機械学習モデルをトレーニングするために並列でタスクを実行
Databricksジョブを用いて最初のワークフローを作成するには、クイックスタートを参照してください。
重要!
- ジョブはData Science & Engineeringワークスペースでのみ作成できます。
- ワークスペースあたり同時実行ジョブ数は1000に制限されています。要求したリクエストが即時実行できない場合には、
429 Too Many Requests
レスポンスが返されます。 - ワークスペースで1時間あたりで作成できるジョブの数は5000("run now"と"runs submit"を含みます)に制限されています。この制限はREST APIで作成されるジョブ、ノートブックワークフローで作成されるジョブにも適用されます。
ジョブの作成
-
以下のいずれかを実行します。
-
Add a name for your job… にジョブの名前を指定します。
-
Task nameフィールドにタスク名を指定します。
-
実行するタスクのタイプを指定します。Typeドロップダウンで、Notebook、JAR、Spark Submit、Python、Pipelineのいずれかを選択します。
-
Notebook: ファイルブラウザからノートブックを検索し、ノートブックを選択してConfirmをクリックします。
-
JAR: Main classを指定します。メインメソッドを含むクラスの完全なクラス名を使用します。例えば
org.apache.spark.examples.SparkPi
。そして、Dependent Librariesの下のAddをクリックしてタスク実行に必要なライブラリを追加します。これらのライブラリの一つにはメインクラスが含まれる必要があります。JARタスクの詳細に関してはJARジョブを参照ください。
-
Spark Submit: Parametersテキストボックスでは、メインクラス、ライブラリJARへのパス、すべての引数をJSON配列型式で指定します。以下の例では、Apache Sparkの
DFSReadWriteTest
をspark-submitを実行するように設定しています。JSON["--class","org.apache.spark.examples.DFSReadWriteTest","dbfs:/FileStore/libraries/spark_examples_2_12_3_1_1.jar","/dbfs/databricks-datasets/README.md","/FileStore/examples/output/"]
重要!
spark-submitタスクにはいくつかの制限があります。- 新規のクラスターでのみspark-submitを実行できます。
- Spark-submitではクラスターのオートスケーリングはサポートしていません。オートスケーリングに関しては、クラスターのオートスケーリングを参照ください。
- Spark-submitではDatabricks Utilitiesをサポートしていません。Databricksユーティリティを使用する際には、JARタスクを利用してください。
-
Python: Pathテキストボックスには、DBFSあるいはクラウドストレージ上のPythonスクリプトのURIを入力します。例えば、
dbfs:/FileStore/myscript.py
となります。 -
Pipeline: Pipelineドロップダウンで、既存のDelta Live Tablesパイプラインを選択します。
-
Python Wheel: Package nameテキストボックスで、インポートするパッケージを入力します。例えば、
myWheel-1.0-py2.py3-none-any.whl
のようなものです。Entry Pointテキストボックスには、wheelを開始する際に呼び出す関数を指定します。タスクの実行に必要なライブラリを追加するにはDependent Librariesの下のAddをクリックします。
-
-
タスクを実行するクラスターを設定します。ClusterドロップダウンでNew Job ClusterかExisting All-Purpose Clusterを選択します。タスクを実行するクラスターの設定、選択に関してはクラスター設定を参照ください。
- New Job Cluster: ClusterドロップダウンのEditをクリックして、クラスター設定を完了します。
- Existing All-Purpose Cluster: Clusterドロップダウンにある既存のクラスターを選択します。新たなページでクラスターを表示したい場合には、クラスター名、説明の右にあるアイコンをクリックします。
タスクを実行するためのクラスターの選択と設定に関しては、クラスター設定のティップスを参照ください。
-
タスクにパラメーターを指定することができます。どのようなフォーマットでパラメーターを指定するのかはタスクのタイプに依存します。
- ノートブック: Addをクリックし、タスクに渡すパラメーターをキーバリュー形式で指定します。手動でタスクを実行する際、パラメータを指定してジョブを実行でパラメーターを上書き、追加することができます。パラメーターはパラメーターのキーで指定されるノートブックウィジェットの値を設定します。パラメーターの値の一部として、限定的な動的な値のセットを渡すためにはタスクパラメーター変数を使用します。
- JAR: パラメーターを指定するためにJSON配列の文字列を使用します。これらの文字列は、メインクラスのmainメソッドの引数として渡されます。JARジョブのパラメーター設定を参照ください。
- Spark Submit: パラメーターはJSON配列の文字列として指定します。Apache Spark spark-submitの書式に則り、JARパスの後のパラメーターはメインクラスのmainメソッドに引き渡されます。
- Python: パラメーターを指定するためにJSON配列の文字列を使用します。これらの文字列はPythonのargparseモジュールを用いてパースされます。
- Python Wheel: Parametersドロップダウンで、JSONフォーマットの配列としてパラメーターを指定するには *args (string array) を選択し、パラメーターごとのキーバリューを指定するには **kwargs (key-value pairs) を選択します。
-
Dependent Libraries、Retry Policy、Timeoutsなどの追加のオプションにアクセスするには、Advanced Optionsをクリックしてください。タスクの編集を参照ください。
-
Createをクリックします。
-
ジョブのスケジュールを設定する場合には、Edit Scheduleをクリックします。ジョブのスケジュールを参照ください。
-
同じジョブに対して複数の同時実行を行う際には、Settingタブをクリックし、Maximum Concurrent Runsに新たな値を入力します。最大同時実行数を参照ください。
-
ジョブのイベントに対してメールのアラートを設定することができます。Settingタブをクリックし、AlertsセクションのAddをクリックします。アラートを参照ください。
-
ジョブに対するアクセス権を設定する場合にはSettingをクリックします。ジョブのアクセスコントロールを参照ください。
他のタスクを追加するには、作成したタスクの下にあるをクリックします。事前のタスクでNew Job Clusterを設定した場合には、クラスター共有のオプションが提供されます。タスクを作成、編集する際に個別のクラスターを設定することもできます。タスクを実行するためのクラスターの選択と設定に関しては、クラスター設定のティップスをご覧ください。
ジョブの実行
ジョブの即時実行
ティップス
Run Nowをクリックすることでノートブックタスクのテストランを実行することができます。ノートブックに変更を加える必要がある場合には、ノートブックを編集後に再度Run Nowをクリックすることで、最新バージョンのノートブックが実行されます。
異なるパラメータを指定してジョブを実行
異なるパラメーター、既存のパラメーターの異なる値でジョブを再実行するために、Run Now with Different Parametersを利用できます。
-
Run Nowの隣のをクリックし、Run Now with Different Parametersを選択するか、アクティブなランの一覧でRun Now with Different Parametersをクリックします。タスクの種類に応じて新たなパラメーターを入力します。
- ノートブック: キーバリューのペア、あるいはJSONオブジェクトとしてパラメーターを入力します。widgetsの値を設定するためにこのダイアログを利用できます。
- JARあるいはspark-submit: パラメーターのリストあるいはJSONドキュメントを指定します。実行の際にはデフォルトパラメーターと指定されたパラメーターがマージされます。キーを削除した場合には、デフォルトのパラメーターが使用されます。実行に際してタスクパラメーター変数を追加することができます。
-
Runをクリックします。
成功しなかったジョブ実行のリペア
成功しなかったタスクと、全ての依存するタスクのサブセットのみを実行することで、失敗したあるいはキャンセルされたマルチタスクジョブをリペアすることができます。成功したタスクとそれらに依存するタスクは再実行されないので、この機能は成功しなかったジョブ実行からのリカバリーに必要な時間とリソースを削減します。
ジョブ実行をリペアする前にジョブやタスクの設定を変更することができます。例えば、ノートブックのパスやクラスター設定を変更した場合、更新されたノートブックやクラスター設定を用いてタスクが再実行されます。
Task run detailsページで全てのタスク実行の履歴を参照することができます。
注意
- 1つ以上のタスクがジョブクラスターを共有している場合、リペアの実行では新規のジョブクラスターが作成されます。例えば、オリジナルの実行ではジョブクラスター
my_job_cluster
が使用された場合、リペアの実行では新規のジョブクラスターmy_job_cluster_v1
を使用するので、最初の実行といかなるリペアの実行によって使用されたクラスタとクラスター設定を容易に比較することができます。my_job_cluster_v1
の設定は、現状のmy_job_cluster
と同じものになります。 - 2つ以上のタスクをオーケストレートするジョブでのみリペアはサポートされます。
- Runsタブに表示されるDurationの値には、最初の実行の開始から最後のリペアが完了するまでの時間が含まれます。例えば、実行が2回失敗し、3回目の実行で成功した際には、durationには3つ全ての実行の時間が含まれます。
失敗したジョブをリペアするには、以下を実施します。
- サイドバーのJobsをクリックします。
- Nameカラムのジョブ名をクリックします。Runsタブにはアクティブなランと失敗を含む完了したランが表示されます。
- Completed Runs (past 60 days)テーブルのStart timeカラムにある失敗したランのリンクをクリックします。Job run detailsページが表示されます。
- Repair runをクリックします。Repair job runダイアログが表示され、再実行される失敗した全てのタスクと依存する全てのタスクが一覧表示されます。
- リペアするタスクのパラメータの追加、修正を行うには、Repair job runダイアログにパラメータを入力します。Repair job runダイアログに入力された値は既存の値を上書きします。後続のリペアの実行の際には、Repair job runダイアログのキーバリューをクリアすることで、パラメーターをオリジナルの値に戻すことができます。
- Repair job runダイアログRepair runをクリックします。
タスク実行履歴の参照
成功、失敗を含むタスクの実行履歴を参照するには、
- Job run detailsページのタスクをクリックします。Task run detailsページが表示されます。
- run historyドロップダウンからタスクを選択します。
ジョブのスケジュール
ジョブのスケジュールを定義するには以下を実行します:
-
Job detailsパネルのEdit Scheduleをクリックして、Schedule TypeをScheduledを選択します。
-
期間、開始時刻、タイムゾーンを指定しますQuartz Cron Syntaxに従ってスケジュールを表示、編集する場合にはShow Cron Syntaxのチェックボックスを選択します。
注意
- Databricksは、cron表現の秒の設定に関係なく、ジョブのスケジュールによって実行される後続の処理に最低10秒のインターバルを設けます。
- セービングタイムのあるタイムゾーンあるいはUTC時間を選択できます。セービングタイムのあるタイムゾーンを選択した場合には、セービングタイムの開始および終了時に時間単位のジョブがスキップされたり、実行されないように見える現象が起きます。
- ジョブスケジューラーは、Sparkのバッチインタフェース同様、低レーテンシーのジョブ向きではありません。ネットワークやクラウドの問題によって、ジョブ実行が最大数分遅れる可能性があります。このような場合、スケジュールされたジョブはサービスが利用可能になるや否や実行されます。
-
Saveをクリックします。
ジョブの一時停止、再開
以下のいずれを実行してジョブを一時停止します。
- Job detailsパネルのPauseをクリックします。
- Job detailsパネルのEdit scheduleをクリックし、Schedule Typeを Manual (Paused) に設定します。
一時停止したジョブスケジュールを再開するには、Schedule TypeでScheduledを選択します。
ジョブの参照
サイドバーのをクリックするとジョブの一覧が表示されます。ジョブのページでは、定義されたジョブ、クラスター定義、スケジュール、前回の実行結果が表示されます。
ジョブの一覧では以下の方法でフィルタリングできます。
- キーワード指定
- 所有しているジョブのみを表示
- アクセス可能なジョブを表示。このフィルターを利用するにはJobs access controlが有効になっている必要があります。
カラムのヘッダーをクリックすることでジョブ一覧を並び替える(降順、昇順)ことができます。デフォルトの並び替えはジョブ名の昇順です。
ジョブ実行の参照
マトリクスビューに切り替えるにはMatrixをクリックします。マトリクスビューでは個々のタスクを含むジョブの実行履歴が表示されます。
マトリクスビューのJob Runs行には、実行に要した合計時間と実行の状態が表示されます。開始時間、期間、ステータスを含む実行の詳細を参照するには、Job Runs行のバーの上にマウスカーソルを移動します。
Tasks行のそれぞれのセルには、タスクとそれに対応するタスクのステータスが表示されます。開始時間、期間、クラスター、ステータスを含む個々のタスクの詳細を参照するにはタスクのセルの上にマウスカーソルを移動します。
ジョブの実行とタスク実行のバーは実行ステータスを表す色が割り当てられます。成功した実行は緑、失敗した実行は赤、スキップされた実行はピンクになります。それぞれのジョブ実行、タスク実行のバーの高さは実行時間の可視化となります。
Databricksではジョブ実行履歴を60日間保持します。ジョブの実行結果を保存したい場合には、有効期限が切れる前に実行結果をエクスポートすることをお勧めします。詳細はジョブ実行結果のエクスポートを参照ください。
ジョブ実行詳細の参照
ジョブ実行詳細ページには、ジョブにおけるそれぞれのタスクの実行成否の情報が含まれます。ジョブ実行詳細にアクセスするには、ジョブのRunsタブをクリックします。
Runsタブでジョブ実行詳細を参照するには以下の操作を行います:
- Completed Runs (past 60 days)テーブルのRun列のView Detailsリンクをクリックします。
- 以下を参照するためにタスクをクリックします:
- タスクを実行したクラスター
- タスクのSpark UI
- タスクのログ
- タスクのメトリクス
ジョブのRunsタブに戻るにはJob IDの値をクリックします。Job run IDの値をクリックすると、ジョブ実行詳細画面に戻ります。
最近のジョブ実行の参照
Apache AirflowやAzure Data Factoryによって起動されたジョブを含む、ワークスペースでアクセスできる全てのジョブの最近の実行結果、稼働中のジョブの一覧を参照することができます。最近のジョブ実行の一覧を表示するには以下を実行します。
Job runsリストには以下が表示されます。
- 実行の開始時刻
- 実行に関連づけられているジョブの名前
- ジョブを実行するユーザー名
- ジョブ実行がスケジュールによるものか、APIリクエストか、手動によって起動されたのか
- 現在実行中のジョブの経過時間、あるいは、完了した実行の総実行時間
- 実行のステータス。
Pending
、Running
、Skipped
、Succeeded
、Failed
、Terminating
、Terminated
、Internal Error
、Timed Out
、Canceled
、Canceling
、Waiting for Retry
のいずれかとなります。
ジョブ実行詳細を参照するには、実行のStart time
カラムをクリックします。ジョブの操作を参照するには、Jobカラムのジョブ名をクリックします。
ジョブ実行結果のエクスポート
全てのジョブタイプでノートブックの実行結果と実行ログをエクスポートすることができます。
ノートブック実行結果のエクスポート
実行結果をエクスポートすることでジョブ実行結果を保存することができます。ノートブックジョブの実行においては、結果が埋め込まれたノートブックをエクスポートでき、あとでDatabricksワークスペースにインポートすることが可能です。
単一のタスクのジョブの実行結果ノートブックをエクスポートするには以下を実行します。
- ジョブ詳細ページで、Completed Runs (past 60 days)テーブルのRun列のジョブ実行名のView Detailsリンクをクリックします。
- Export to HTMLをクリックします。
複数のタスクのジョブの実行結果ノートブックをエクスポートするには以下を実行します。
- ジョブ詳細ページで、Completed Runs (past 60 days)テーブルのRun列のジョブ実行名のView Detailsリンクをクリックします。
- エクスポートするノートブックタスクをクリックします。
- Export to HTMLをクリックします。
ジョブ実行ログのエクスポート
ジョブ実行のログをエクスポートすることも可能です。このプロセスを自動化するために、Job API経由でログをDBFSやS3に自動で出力するようにジョブを設定できます。Jobs APIにおける新規ジョブ作成オペレーション(POST /jobs/create
)に引き渡すリクエストボディのnew_cluster.cluster_log_conf
オブジェクトを参照して下さい。
ジョブの編集
いくつかのオプションはジョブに対して適用できますが、他のオプションは個々のタスクに対して適用されます。例えば、最大同時実行数はジョブのみに設定できますが、パラメータはそれぞれのタスクに対して定義される必要があります。
ジョブの設定を変更するには:
サイドパネルにJob detailsが表示されます。スケジュール、クラスター設定、アラート、最大同時実行数を変更することができます。また、ジョブアクセスコントロールが有効化されている場合には、ジョブのアクセス権を編集することができます。
クラスター
クラスターに紐づけられているタスクを参照するには、サイドパネルのクラスターの上にマウスカーソルを移動します。関連づけられている全てのタスクに対するクラスターの設定を変更するには、クラスターの下のConfigureをクリックします。関連づけられている全てのタスクに対して新規のクラスターを設定するには、クラスターの下のSwapをクリックします。
最大同時実行数
並列で実行可能な最大数です。新たなジョブを実行する際、アクティブな実行数が最大数に達している場合には、Databricksは処理をスキップします。同じジョブを同時実行したい場合にはデフォルトの1より大きい値を指定します。これは例えば、頻度の高いスケジュール処理を実行する際、連続した処理が互いに被っても問題がない場合、異なる入力パラメーターで複数処理を実行したい場合に有効です。
ジョブアラート
ジョブの開始、完了、失敗時に通知を行う一つ以上のメールアドレスを追加することができます。
- Edit alertsをクリックします。
- Addをクリックします。
- 通知先のメールアドレスを入力し、アドレスに通知したいアラートタイプのチェックボックスを選択します。
- 別のメールアドレスを追加するにはAddをクリックします。
- スキップされたジョブ実行に対するアラートを受け取りたくない場合にはチェックボックスをクリックします。
- Confirmをクリックします。
以下の通知ツールとメールの通知をインテグレーションすることができます。
ジョブに対するアクセスコントロール
ジョブのアクセス制御によって、ジョブのオーナー、管理者はジョブに対してきめ細かいアクセス制御を行えます。ジョブのアクセス制御によって、ジョブのオーナーは他のどのユーザー、グループがジョブの結果を参照できるのかを制御できます。また、誰がジョブの管理(即時実行、ジョブのキャンセルの権限)をできるのかも指定できます。
詳細はJobs access controlを参照ください。
タスクの編集
タスク設定を設定するには、
タスクの依存関係
Depends onドロップダウンを用いてジョブ内のタスクの実行順序を定義することができます。ジョブの1つ以上のタスクをこのフィールドに設定することができます。
注意
タスクが一つのジョブの場合、Depends onは表示されません。
タスクの依存関係を設定することで、ジョブスケジューラにおける実行順序を表現する標準的な方法である有向非巡回グラフ(DAG)を作成します。例えば、4つのタスクから構成される以下のジョブを考えてみます。
- Task1はルートタスクであり他のタスクには依存しない
- Task2とTask3は、Task1の完了に依存する
- 最後に、Task4はTask2とTask3の完了に依存する
Databricksは後段のタスクを実行する前に、最大限の並列性で前段のタスクを実行します。以下の図では、上のタスクの処理順序を示しています。
タスク設定オプション
それぞれのタスクには以下の設定を行えます。
クラスター
タスクを実行するクラスターを設定するには、Clusterドロップダウンをクリックします。共有されているジョブクラスターを編集することはできますが、他のタスクで使用されている共有クラスターを削除することはできません。
タスクを実行するクラスターの設定、選択に関してはクラスター設定のティップスを参照してください。
依存ライブラリ
タスク実行前に依存ライブラリがクラスターにインストールされます。ジョブの実行前に確実にインストールされるようにすべての依存ライブラリを指定する必要があります。
依存ライブラリを追加するには、Advanced optionsをクリックし、Add Dependent Librariesを選択しライブラリ選択画面を開きます。ライブラリを指定するにはライブラリの依存関係の推奨事項に従ってください。
重要!
すべてのクラスターに対する自動インストール設定を行なった、あるいはライブラリがインストール済みの既存の停止クラスターを選択した場合には、ライブラリのインストール完了を待たずにジョブは実行されます。ジョブが特定のライブラリを必要とする場合には、Dependent Librariesフィールドでライブラリをジョブにアタッチする必要があります。
パラメーター変数
タスクのパラメーターの一部として、ジョブのタスクにテンプレート化された変数を渡すことができます。これらの変数はジョブ実行時に適切な値で置換されます。ランIDやジョブ開始時刻などジョブ実行に関するコンテキストを渡すためにタスクパラメーター変数を利用できます。
ジョブの実行の際、二重の中括弧で囲まれたタスクパラメーター変数は置換され、値の一部としてオプション文字列に追加されます。例えば、ジョブID6のジョブに対して、MyJobId
をmy-job-6
で置換するには、以下のタスクパラメーターを追加します。
{
"MyJobID": "my-job-{{job_id}}"
}
二重の中括弧の中身は評価されませんので、この中で計算や関数を使用できません。括弧内の空白は除外されないので{{ job_id }}
は評価されません。
以下のタスクパラメーター変数がサポートされています。
変数 | 説明 | 例 |
---|---|---|
{{job_id}} | ジョブに割り当てられた一意の識別子 | 281 |
{{run_id}} | ジョブ実行に割り当てられた一意の識別子 | 4785 |
{{start_date}} | ジョブが開始した日付。書式はyyyy-MM-dd。UTCタイムゾーン | 2021-02-15 |
{{start_time}} | クラスターが作成され実行可能状態となった、ジョブ実行の開始時点のタイムスタンプ。書式はSystem.currentTimeMillis() で返されるUNIXエポック以来のミリ秒。UTCタイムゾーン |
1551622063030 |
{{task_retry_count}} | 最初の実行で失敗した場合のリトライ回数。最初の実行は0で以降リトライのたびに増える | |
{{parent_run_id}} | 複数のタスクを持つジョブの実行に割り当てられる一位の識別子 | 3447835 |
{{task_key}} | 複数タスクのジョブの一部であるタスクに割り当てられるタスクの一意な名前 | “clean_raw_data” |
これらの変数はジョブの作成、タスクの編集、パラメータを指定してジョブを実行の際に設定できます。
タイムアウト
タスクの終了に要する時間の最大値。この時間内にタスクが完了しなかった場合には、ステータスがTimed Outになります。
リトライ
実行に失敗したジョブを何回リトライするのかのポリシーとなります。デフォルトでは失敗したタスクのリトライを行いません。タスクのリトライを設定するには、Advanced optionsをクリックし、Edit Retry Policyを選択します。
注意
TimeoutとRetriesを両方設定した場合、それぞれのリトライにタイムアウトが適用されます。
ジョブのクローン
既存のジョブ設定をクローンすることで容易に新たなジョブを作成できます。ジョブをクローンすることで、ジョブID以外は全く同一のコピーを作成できます。ジョブのページでジョブの名前の隣にある More … をクリックし、ドロップダウンメニューからCloneを選択します。
タスクのクローン
既存タスクをクローンすることでクイックに新規のタスクを作成することができます。
ジョブの削除
ジョブページでジョブ名をクリック、ジョブ名の隣のMore …をクリックし、ドロップダウンメニューからDeleteを選択します。
タスクの削除
タスクパスのコピー
タスクのパス、例えばノートブックパスをコピーするには以下を実行します。
ベストプラクティス
クラスター設定のティップス
クラスター設定はジョブを本格運用する際に重要です。以下では、ジョブクラスターをどのように選択し設定するのかに関して一般的なガイダンス、特定のジョブ、タスクにおける推奨事項を説明します。
共有ジョブクラスターの活用
複数タスクをオーケストレーションするジョブにおいて使用リソースを最適化するには、共有ジョブクラスターを使用します。共有ジョブクラスターを使うことで、同じジョブの複数のタスクでクラスターを再利用することができます。ジョブの一部である全てのタスクを実行する際に一つのジョブクラスターを使用することができますし、特定のワークロードに対して最適化された複数のジョブクラスターを使用することもできます。
- タスクを作成する際に、New Job Clustersを選択し、クラスター設定を完了します。
- ジョブにタスクを追加する際に上の新規クラスターを選択するか、新たなジョブクラスターを作成します。New Job Clustersを選択して設定したクラスターは、ジョブの他のタスクで利用することができます。
共有ジョブクラスターは単一のジョブ実行のスコープとなっており、他のジョブや同じジョブの他の実行で使用することはできません。
共有ジョブクラスター設定でライブラリを宣言することはできません。タスクの設定で依存ライブラリを追加する必要があります。
ジョブに適したクラスタータイプの選択
- New Job Clustersは、ジョブやタスク実行専用のクラスターです。共有上部クラスターは、クラスターを使用する最初のタスクによって作成、起動され、クラスターを使用する最後のタスクが完了すると停止されます。アイドル状態の際にはクラスターは停止されませんが、クラスターを使用する全てのタスクが完了すると停止されます。共有ジョブクラスターが失敗、あるいは全てのタスクが完了する前に停止された場合、新規クラスターが作成されます。単一のタスクをスコープとしたクラスターが作成され、タスクが開始する際に起動され、タスクが完了すると停止されます。プロダクション環境では、新規の共有クラスター、タスクスコープのクラスターを作成し、それぞれのジョブやタスクが完全に分離された環境で実行されるようにすることをお勧めします。
- 新規のジョブクラスターでジョブを実行する際、このジョブはデータエンジニアリング(タスク)ワークロードとみなされ、ジョブワークロードとして課金されます。既存のall-purposeクラスターでジョブを実行した際には、このジョブはデータアナリティクス(オールパーパス)ワークロードとみなされ、オールパーパスワークロードとして課金されます。
- 既存の停止クラスターを選択し、ジョブのオーナーがCan Restart権限を持っているのであれば、ジョブのスケジュール実行の際にはDatabricksがクラスターを起動します。
- 一定期間でダッシュボードを更新するようなタスクにおいては、既存のオールパーパスクラスターは好適と言えます。
クラスター起動時間削減のためにプールを活用
新規のジョブクラスターの起動時間を短縮するためには、プールを作成してジョブクラスターがプールを利用するように設定します。
自動AZ
自動アベイラビリティゾーン(Auto-AZ)を活用するには、Clusters APIを使用して、awsattributes.zone_id = "auto"
を設定してAuto-AZを有効化する必要があります。Availability zonesを参照ください。
ノートブックジョブのティップス
ノートブックのセルのアウトプットの合計(すべてのノートブックのセルのアウトプットを結合したもの)には20MBのサイズ制限があります。加えて、それぞれのセルのアウトプットサイズには8MBの制限があります。トータルのアウトプットが20MBを超えた場合、個々のセルのアウトプットが8MBを超えた場合にはジョブ実行はキャンセルされ失敗とみなされます。
制限を超えそうなセルを見つける場合には、オールパーパスクラスターでノートブックを実行し、notebook autosave techniqueを活用します。
ストリーミングタスク
Sparkのストリーミングジョブで1より大きい最大同時実行数を設定してはいけません。また、ストリーミングジョブでは、cronのスケジュールで"* * * * * *"
(毎秒)、あるいは毎分を設定する必要があります。さらに、ストリーミングジョブでリトライを有効化してはいけません。
ストリーミングジョブは継続的に実行されるので、常にジョブの最後のタスクとして設定されるべきです。
JARジョブ
JARジョブを実行する際には以下に注意してください。
出力サイズの制限
注意
Databricksランタイム6.3以降で利用できます。
標準出力に吐き出されるログ出力などのジョブのアウトプットは20MBのサイズ制限があります。これを超えた場合、ジョブの実行はキャンセルされ失敗とみなされます。
この制限を超えないようにするには、Spark設定のspark.databricks.driver.disableScalaOutput
をtrue
にすることで、ドライバーからDatabricksに返却される標準出力を抑制します。デフォルトではfalse
に設定されています。このフラグはScala JARジョブとScalaノートブックのセルの出力を制御します。フラグが有効化されるとSparkはジョブの実行結果をクライアントに返却しません。このフラグはクラスターのログファイルに書き込まれるデータには影響を与えません。この設定はノートブックの結果を表示しなくしてしまうため、JARジョブのジョブクラスターにおいてのみ設定することをお勧めします。
共有SparkContext
の活用
Databricksはマネージドサービスであるため、あなたのApache Sparkジョブの実行を適切に行うためにはコードの変更が必要になるケースがあります。JARのジョブプログラムは、SparkContext
を取得するために共有のSparkContext
APIを使用する必要があります。DatabricksがSparkContext
を初期化するので、プログラムによるnew SparkContext()
は失敗します。SparkContext
を取得するためにはDatabricksが作成する共有SparkContext
を使用してください。
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()
共有SparkContext
を使用する際に避けるべきメソッドがいくつかあります。
-
SparkContext.stop()
を呼び出さないでください。 -
Main
プログラムの最後でSystem.exit(0)
やsc.stop()
を呼び出さないでください。予期しない動作を引き起こします。
ジョブのクリーンアップのためにtry-finally
を活用
以下の二つの部分から構成されるJARを考えてみます。」
- ジョブの主要な箇所を含む
jobBody()
-
jobBody()
の正常終了後、例外が呼び出された際に呼び出されるjobCleanup()
例として、jobBody()
がテーブルを作成し、jobCleanup()
が作成されたテーブルを削除するものとします。
try {
jobBody()
} finally {
jobCleanup()
}
sys.addShutdownHook(jobCleanup)
や以下のようなコードでクリーンアップをすべきではありません。
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
DatabricksによってSparkのコンテナーのライフタイムが管理されるため、shutdownのフックは正常に実行されません。
JARジョブのパラメーター設定
JSON文字列の配列でJARジョブのパラメーターを指定できます。詳細は、SparkJarTaskを参照してください。パラメーターにアクセスするには、main
関数に渡されるString
の配列を参照します。
ライブラリの依存関係
Sparkのドライバーには上書き不可の依存ライブラリが存在します。このライブラリはあなたが使用する競合ライブラリよりも高い優先度を持ちます。
ドライバーの依存ライブラリの完全なリストを取得するには、同じSparkのバージョン(あるいは検証したいドライバーのクラスター)のクラスターにアタッチされたノートブックで以下のコマンドを実行します。
%sh
ls /databricks/jars
ライブラリ依存関係の管理
ジョブのJARを作成する際にライブラリの依存関係に対応する良い方法としては、SparkとHadoopを前提の(provided)
依存関係として一覧にすることです。Mavenにおいては、以下のコードでSpark and/or Hadoopを前提の依存関係として追加します。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
sbt
では以下のコードでSparkとHadoopの依存関係を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
ティップス
実行環境のバージョンに基づいて依存関係に対する適切なScalaのバージョンを指定してください。