Migrate production workloads to Databricks | Databricks on AWS [2022/10/25時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
このガイドでは、他のプラットフォーム上のApache SparkからプロダクションジョブをどのようにDatabricks上のApache Sparkに移行するのかを説明します。
コンセプト
バンドルしてサブミットできるコードの単一のユニットです。Databricksジョブは単一のSparkContext
のSparkアプリケーションと同じものとなります。エントリーポイントはライブラリ(JAR、egg、wheelなど)やノートブックとなります。洗練されたリトライ、アラート機構を用いてDatabricksジョブをスケジュール実行することができます。ジョブ実行の主要なインタフェースはJobs APIとUIとなります。
Databricksによって管理されるあなたのアカウント上のインスタンスのセットですが、アイドル状態の際にはDatabricksの課金は発生しません。プールで複数のジョブをサブミットすることで、ジョブがクイックに起動します。プールのインスタンスにガードレール(インスタンスタイプ、インスタンス制限など)とオートスケーリングのポリシーを設定することができます。プールは、他のSparkプラットフォームにおけるオートスケーリングクラスターと同じものです。
移行ステップ
このセクションでは、お使いのプロダクションジョブをDatabricksに移行するステップを説明します。
ステップ1: プールの作成
オートスケーリングプールを作成します。これは他のSparkプラットフォームでオートスケーリングクラスターを作るのと同じことです。他のプラットフォームにおいてオートスケーリングクラスターは数分あるいは数時間アイドル状態になったとしても、課金が発生します。Databricksは無料ではインスタンスプールを管理します。すなわち、これらのマシンが使用されていない場合には、Databricksに料金を支払う必要がありません。なお、クラウドプロバイダーの課金は発生します。Databricksでは、インスサンスでジョブが実行されている時にのみ課金が発生します。
重要な設定は以下の通りとなります。
- Min Idle: プールが管理するジョブで未使用のスタンバイ状態のインスタンスの数です。0に設定することができます。
- Max Capacity: これはオプションのフィールドです。すでにクラウドプロバイダーによる制限が設定されているのであれば、このフィールドは空のままでも構いません。さらに最大数の制限を設定したいのであれば、大量のジョブがプールを共有できる様に大きな値を設定してください。
-
Idle Instance Auto Termination: 指定された時間アイドル状態であった場合、
Min Idle
を超えたインスタンスはクラウドプロバイダーに返却されます。値が大きいほど、多くのインスタンスがレディ状態で保持されるので、ジョブの起動が高速になります。
ステップ2: プールでのジョブの実行
Jobs APIあるいはUIを用いてプールでジョブを実行することができます。それぞれのジョブに対してクラスターのスペックを指定する必要があります。ジョブが起動しようとすると、Databricksは自動でプールから新規クラスターを作成します。ジョブが完了するとクラスターは自動で停止します。ジョブが実行された時間分だけ課金されることになります。これは、Databricksでジョブを実行する際のコスト効率が最も高い方法となります。それぞれの新規クラスターは以下を持ちます:
- 他のSparkプラットフォームのSparkアプリケーションと同じ一つの
SparkContext
が関連づけられます。 - 一つのドライバーノードと指定された数のワーカーノード。単一のジョブに対して、ワーカー数の範囲を指定することができます。Databricksは当該ジョブが必要とするリソースに基づいて、単一のSparkジョブをオートスケールさせます。Databricksによるベンチマークでは、ジョブの性質によりますが、これによって最大30%クラウドコストを削減することができます。
プールでジョブを実行するには、API/CLI、Airflow、UIという三つの方法があります。
API / CLI
-
Databricks CLIをダウンロードして設定します。
-
一回、以下のコマンドをサブミットして実行します。このAPIは、ジョブ実行の進捗を追跡するために使用するURLを返却します。
Bashdatabricks runs submit --json { "run_name": "my spark job", "new_cluster": { "spark_version": "7.3.x-scala2.12", "instance_pool_id": "0313-121005-test123-pool-ABCD1234", "num_workers": 10 }, "libraries": [ { "jar": "dbfs:/my-jar.jar" } ], "timeout_seconds": 3600, "spark_jar_task": { "main_class_name": "com.databricks.ComputeModels" } }
-
ジョブをスケジュールするには、以下のサンプルを使用します。このメカニズムを通じて作成されたジョブはジョブ一覧ページに表示されます。すべてのジョブ実行のステータスの参照に使用する
job_id
が返却されます。Bashdatabricks jobs create --json { "name": "Nightly model training", "new_cluster": { "spark_version": "7.3.x-scala2.12", ... "instance_pool_id": "0313-121005-test123-pool-ABCD1234", "num_workers": 10 }, "libraries": [ { "jar": "dbfs:/my-jar.jar" } ], "email_notifications": { "on_start": ["john@foo.com"], "on_success": ["sally@foo.com"], "on_failure": ["bob@foo.com"] }, "timeout_seconds": 3600, "max_retries": 2, "schedule": { "quartz_cron_expression": "0 15 22 ? \* \*", "timezone_id": "America/Los_Angeles" }, "spark_jar_task": { "main_class_name": "com.databricks.ComputeModels" } }
Sparkジョブをサブミットするのにspark-submitを使っている場合、以下のテーブルではspark-submitのパラメータがJobs APIで新規ジョブ作成オペレーション(POST /jobs/create
)の引数にどのようにマッピングされるのかを示しています。
spark-submitパラメーター | Databricksでどのように適用するのか |
---|---|
–class | メインクラスの名前とパラメーターを指定するためにspark_jar_task structureを使います。 |
–jars | 依存関係のリストを指定するためにlibraries を使います。 |
–py-files | Pythonジョブに対しては、spark_python_task structureを使います。eggやwheelの依存関係を指定するためにlibraries 引数を使用することができます。 |
–master | クラウドでは、長時間処理を実行するマスターノードを管理する必要がありません。すべてのインスタンスとジョブはDatabricksサービスによって管理されます。このパラメーターは無視してください。 |
–deploy-mode | Databricksではこのパラメーターは無視してください。 |
–conf |
new_cluster structureでは、spark_conf 引数を使ってください。 |
–num-executors |
new_cluster structureでは、num_workers 引数を使ってください。また、範囲を指定するためにautoscale オプションを使うこともできます(推奨)。 |
–driver-memory, –driver-cores | 必要とするドライバーメモリーとコアに基づいて、適切なインスタンスタイプを選んでください。プール作成時にドライバーのインスタンスタイプを指定するので、ジョブのサブミットの際にはこのパラメーターを無視してください。 |
–executor-memory, –executor-cores | 必要とするエグゼキューターメモリーとコアに基づいて、適切なインスタンスタイプを選んでください。プール作成時にエグゼキューターのインスタンスタイプを指定するので、ジョブのサブミットの際にはこのパラメーターを無視してください。 |
–driver-class-path | spark_conf引数では、spark.driver.extraClassPath に適切な値を設定してください。 |
–driver-java-options | spark_conf引数では、spark.driver.extraJavaOptions に適切な値を設定してください。 |
–files | spark_conf引数では、spark.files に適切な値を設定してください。 |
–name |
submit job run request (POST /jobs/runs/submit )では、run_name 引数を使ってください。create job request(POST /jobs/create )では、name 引数を使ってください。 |
Airflow
DatabricksでジョブをサブミットするためにAirflowを使いたい場合、DatabricksではAirflow operatorを提供しています。DatabricksのAirflowオペレーターは、DatabricksにジョブをサブミットするためにJobs APIのTrigger a new job runオペレーション(POST /jobs/run-now
)をコールします。Orchestrate Databricks jobs with Apache Airflowをご覧ください。
UI
Databricksではジョブをサブミットしスケジュールするために、シンプルかつ直感的な使いやすいUIを提供しています。UIを用いてジョブを作成しサブミットするには、ステップバイステップのガイドに従ってください。
ステップ3: ジョブのトラブルシュート
Databricksでは、ジョブのトラブルシュートに役立つツールを数多く提供しています。
ログとSpark UIへのアクセス
Databricksでは、それぞれのジョブ実行におけるすべてのSparkログやSpark UIにアクセスできるようにする完全マネージドのヒストリーサーバーを管理しています。これらには、ジョブ実行の詳細ページやジョブの実行ページからアクセスすることができます。
ログの転送
お使いのクラウドストレージロケーションにクラスターログを転送することができます。選択したロケーションにログを転送するには、new_cluster structureのcluster_log_conf
パラメーターを使います。
メトリクスの参照
ジョブの実行中、クラスターページに移動し、MetricsタブでライブのGangliaメトリクスを参照することができます。また、Databricksはこれらのメトリクスのスナップショットを15分ごとに作成して格納しますので、ジョブが完了した後でもこれらのメトリクスを参照することができます。メトリクスをご自身のメトリクスサーバーに送信するために、クラスターにカスタムエージェントをインストールすることができます。パフォーマンス監視をご覧ください。
アラートの設定
ジョブ失敗時にアラートを受け取れる様に、Jobs APIのCreate a new jobオペレーション(POST /jobs/create
)でemail_notifications
を使います。
また、これらのメールのアラートをPagerDuty、Slackや他のモニタリングシステムに転送することもできます。
FAQ
プールなしにジョブを実行できますか?
はい。プールはオプションです。新規クラスターでジョブを直接実行することができます。この様な場合、Databricksがクラウドプロバイダーに必要なインスタンスをリクエストすることでクラスターが作成されます。プールを用いることで、プールでインスタンスが利用できる場合には、クラスターの起動時間は30秒程度になります。
ノートブックジョブとは何ですか?
Databricksでは、JAR、Python、ノートブックといった異なるジョブタイプがあります。ノートブックジョブタイプは、指定されたノートブックの中でコードを実行します。
JARジョブと比べた際、いつノートブックジョブを使うべきですか?
JARジョブはspark-submitジョブと同じものです。JARを実行し、トラブルシュートの際にはログやSpark UIを参照することができます。ノートブックジョブは指定されたノートブックを実行します。ノートブックでライブラリをインポートし、ノートブックからライブラリをコールすることもできます。main
エントリーポイントとしてノートブックジョブを使うことの利点は、ノートブックの出力領域でジョブの中間結果を容易にデバッグできるということです。JARジョブをご覧ください。
自分のHiveメタストアに接続できますか?
はい、Databricksでは外部のHiveメタストアとGlueカタログをサポートしています。