Use Ray on Databricks | Databricks on AWS [2023/2/24時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
本機能はパブリックプレビューです。
Ray 2.3.0以降では、DatabricksのApache Sparkクラスター上でのRayクラスター作成およびRayアプリケーションの実行をサポートしています。チュートリアル、サンプルを含むRayにおける機械学習をスタートするための情報については、Ray documentationをご覧ください。RayとApache Sparkのインテグレーションに関しては、Ray on Spark API documentationをご覧ください。
訳者註
Rayがどういうものかに関してはこちらを参照ください。
要件
- Databricks Runtime 12.0 ML以降
- Databricksランタイムクラスターのアクセスモードは
assigned
あるいはno isolation shared
である必要があります。
Rayのインストール
Rayをインストールするには以下のコマンドを使用します。Rayダッシュボードコンポーネントで[default]
拡張が必要になります。
%pip install ray[default]>=2.3.0
Rayクラスターの作成
Rayクラスターを作成するには、ray.util.spark.setup_ray_cluster APIを使用します。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
ray.util.spark.setup_ray_cluster
APIはSpark上にRayクラスターを作成します。内部では、バックグラウンドのSparkジョブを生成します。ジョブのそれぞれのSparkタスクは、Rayワーカーノードを作成し、Rayヘッドノードはドライバーに作成されます。引数num_worker_nodes
は作成するRayワーカーノードの数を表現します。それぞれのRayワーカーノードに割り当てられるCPUやGPUコアの数を指定するには、引数num_cpus_per_node
あるいはnum_gpus_per_node
を設定します。
Rayクラスターが作成されると、ノートブックで直接Rayアプリケーションコードを実行できるようになります。また、Open Ray Cluster Dashboard in a new tabというリンクが表示され、クラスターのRayダッシュボードを参照することができます。
ティップス
Databricksのassigned
モードクラスターを使用している場合、Rayクラスターで利用できるすべてのリソースを使用するように、num_worker_nodes
にray.util.spark.MAX_NUM_WORKER_NODES
を設定することができます。
setup_ray_cluster(
# ...
num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)
Rayクラスターのログを収集する際、引数collect_log_to_path
に格納パスを指定することができます。Rayクラスターがシャットダウンされた後にログ回収が実行されます。Sparkクラスターが停止された後でもログが保持されるように、/dbfs/
で始めるパスを設定することをお勧めします。
注意
ray.util.spark.setup_ray_cluster
を呼び出すと、作成されたRayクラスターのアドレスが環境変数RAY_ADDRESS
に設定されますので、お使いのRayアプリケーションは自動でこのRayクラスターを使用します。ray.init APIのaddress
引数を用いて別のクラスターアドレスを指定することができます。
Rayアプリケーションの実行
Rayクラスターが作成されると、DatabricksノートブックからRayアプリケーションコードを実行できるようになります。例えば、以下のようにDatabricksノートブックでシンプルなRayアプリケーションを実行することができます。
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
Sparkデータフレームからのデータロード
RayデータセットとしてSparkデータフレームをロードするには、最初にParquetあるいはDeltaフォーマットでSparkデータフレームを保存する必要があります。DBFSのアクセスをセキュアに制御するために、クラウドオブジェクトストレージをDBFSにマウントすることをお勧めしています。その後で、以下のヘルパーメソッドを用いて保存したSparkデータフレームのパスからray.data.Dataset
インスタンスを作成することができます。
import ray
import os
from urllib.parse import urlparse
def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
spark_df.write.mode('overwrite').parquet(dbfs_tmp_path)
fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
return ray.data.read_parquet(fuse_path)
# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")
# Provide a dbfs location to write the table to
data_location_2 = (
"dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)
# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
spark_dataframe=spark_df,
dbfs_tmp_path=data_location_2
)
Rayクラスターのシャットダウン
Databricksで稼働しているRayクラスターをシャットダウンするには、ray.utils.spark.shutdown_ray_cluster APIを呼び出します。
注意
以下のケースでもRayクラスターはシャットダウンします。
- Databricksクラスターからインタラクティブノートブックをデタッチする。
- Databricksジョブが完了する。
- Databricksクラスターが再起動あるいは停止される。
Rayクラスター設定のチューニング
それぞれのRayワーカーノードで推奨される設定は:
- Rayワーカーノードあたり最低でも4CPUコア
- Rayワーカーノードあたり最小でも10GBのヒープメモリー
このため、ray.util.spark.setup_ray_cluster
を呼び出す際には、num_cpus_per_node
の値を>=4
に設定することをお勧めします。
Rayワーカーノードごとのヒープメモリーのチューニングに関しては、Rayワーカーノードのメモリー割り当てをご覧ください。
Rayワーカーノードのメモリー割り当て
それぞれのRayワーカーノードは2つのタイプのメモリーを使用します: ヒープメモリーとオブジェクトストアメモリーです。それぞれのタイプに割り当てられるメモリーサイズは以下のように決定されます。
Rayワーカーノードに割り当てられるトータルのメモリーは:
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
はSparkワーカーノードで起動できるRayワーカーノードの最大数です。これは、引数num_cpus_per_node
やnum_gpus_per_node
によって決定されます。
引数object_store_memory_per_node
を設定していない場合、それぞれのRayワーカーノードに割り当てられるヒープメモリーのサイズとオブジェクトストアメモリーのサイズは:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
引数object_store_memory_per_node
を設定している場合は:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
さらに、Rayワーカーノードあたりのオブジェクトストアメモリーのサイズはオペレーティングシステムの共有メモリーの制限を受けます。最大の値は:
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
はSparkワーカーノードで設定される/dev/shm
のディスクサイズです。
Ray Dashboard Actorsページにおけるスタックトレースとフレームグラフの有効化
RayのDashboard Actorsページでは、アクティブなRayアクターのスタックトレースとフレームグラフを参照することができます。この情報を参照するには、Rayクラスターを起動する前にpy-spy
をインストールするために以下のコマンドを使用します。
%pip install py-spy
サンプルノートブック
以下のサンプルノートブックでは、DatabricksにおけるRayクラスターの作成方法とRayアプリケーションの実行方法を説明しています。
Ray on Sparkスターターノートブック
制限
- Rayクラスターのオートスケーリングはまだサポートされていません。API
ray.util.spark.setup_ray_cluster
は固定数のRayワーカーノードのRayクラスターのみを起動することができます。 - マルチユーザーの共有Databricksクラスター(アイソレーションモード有効化)はサポートされていません。
- パッケージをインストールするために
%pip
を使用する際、Rayクラスターはシャットダウンされます。%pip
ですべてのライブラリをインストールした後にRayを起動するようにしてください。 -
ray.util.spark.setup_ray_cluster
の設定を上書きするインテグレーションを使用することで、Rayクラスターが不安定になり、Rayコンテキストをクラッシュさせる場合があります。例えば、xgboost_ray
パッケージを使用し、アクターでRayParams
を設定したり、Rayクラスター設定に加えてcpus_per_actor
を設定すると、エラーを出さずにRayクラスターがクラッシュすることがあります。