2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksにおけるRayの活用

Last updated at Posted at 2023-02-28

Use Ray on Databricks | Databricks on AWS [2023/2/24時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

プレビュー
本機能はパブリックプレビューです。

Ray 2.3.0以降では、DatabricksのApache Sparkクラスター上でのRayクラスター作成およびRayアプリケーションの実行をサポートしています。チュートリアル、サンプルを含むRayにおける機械学習をスタートするための情報については、Ray documentationをご覧ください。RayとApache Sparkのインテグレーションに関しては、Ray on Spark API documentationをご覧ください。

訳者註
Rayがどういうものかに関してはこちらを参照ください。

要件

  • Databricks Runtime 12.0 ML以降
  • Databricksランタイムクラスターのアクセスモードはassignedあるいはno isolation sharedである必要があります。

Rayのインストール

Rayをインストールするには以下のコマンドを使用します。Rayダッシュボードコンポーネントで[default]拡張が必要になります。

%pip install ray[default]>=2.3.0

Rayクラスターの作成

Rayクラスターを作成するには、ray.util.spark.setup_ray_cluster APIを使用します。

Python
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

ray.util.spark.setup_ray_cluster APIはSpark上にRayクラスターを作成します。内部では、バックグラウンドのSparkジョブを生成します。ジョブのそれぞれのSparkタスクは、Rayワーカーノードを作成し、Rayヘッドノードはドライバーに作成されます。引数num_worker_nodesは作成するRayワーカーノードの数を表現します。それぞれのRayワーカーノードに割り当てられるCPUやGPUコアの数を指定するには、引数num_cpus_per_nodeあるいはnum_gpus_per_nodeを設定します。

Rayクラスターが作成されると、ノートブックで直接Rayアプリケーションコードを実行できるようになります。また、Open Ray Cluster Dashboard in a new tabというリンクが表示され、クラスターのRayダッシュボードを参照することができます。

ティップス
Databricksのassignedモードクラスターを使用している場合、Rayクラスターで利用できるすべてのリソースを使用するように、num_worker_nodesray.util.spark.MAX_NUM_WORKER_NODESを設定することができます。

setup_ray_cluster(
 # ...
 num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Rayクラスターのログを収集する際、引数collect_log_to_pathに格納パスを指定することができます。Rayクラスターがシャットダウンされた後にログ回収が実行されます。Sparkクラスターが停止された後でもログが保持されるように、/dbfs/で始めるパスを設定することをお勧めします。

注意
ray.util.spark.setup_ray_clusterを呼び出すと、作成されたRayクラスターのアドレスが環境変数RAY_ADDRESSに設定されますので、お使いのRayアプリケーションは自動でこのRayクラスターを使用します。ray.init APIのaddress引数を用いて別のクラスターアドレスを指定することができます。

Rayアプリケーションの実行

Rayクラスターが作成されると、DatabricksノートブックからRayアプリケーションコードを実行できるようになります。例えば、以下のようにDatabricksノートブックでシンプルなRayアプリケーションを実行することができます。

Python
import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Sparkデータフレームからのデータロード

RayデータセットとしてSparkデータフレームをロードするには、最初にParquetあるいはDeltaフォーマットでSparkデータフレームを保存する必要があります。DBFSのアクセスをセキュアに制御するために、クラウドオブジェクトストレージをDBFSにマウントすることをお勧めしています。その後で、以下のヘルパーメソッドを用いて保存したSparkデータフレームのパスからray.data.Datasetインスタンスを作成することができます。

Python
import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_df.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Rayクラスターのシャットダウン

Databricksで稼働しているRayクラスターをシャットダウンするには、ray.utils.spark.shutdown_ray_cluster APIを呼び出します。

注意
以下のケースでもRayクラスターはシャットダウンします。

  • Databricksクラスターからインタラクティブノートブックをデタッチする。
  • Databricksジョブが完了する。
  • Databricksクラスターが再起動あるいは停止される。

Rayクラスター設定のチューニング

それぞれのRayワーカーノードで推奨される設定は:

  • Rayワーカーノードあたり最低でも4CPUコア
  • Rayワーカーノードあたり最小でも10GBのヒープメモリー

このため、ray.util.spark.setup_ray_clusterを呼び出す際には、num_cpus_per_nodeの値を>=4に設定することをお勧めします。

Rayワーカーノードごとのヒープメモリーのチューニングに関しては、Rayワーカーノードのメモリー割り当てをご覧ください。

Rayワーカーノードのメモリー割り当て

それぞれのRayワーカーノードは2つのタイプのメモリーを使用します: ヒープメモリーとオブジェクトストアメモリーです。それぞれのタイプに割り当てられるメモリーサイズは以下のように決定されます。

Rayワーカーノードに割り当てられるトータルのメモリーは:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODESはSparkワーカーノードで起動できるRayワーカーノードの最大数です。これは、引数num_cpus_per_nodenum_gpus_per_nodeによって決定されます。

引数object_store_memory_per_nodeを設定していない場合、それぞれのRayワーカーノードに割り当てられるヒープメモリーのサイズとオブジェクトストアメモリーのサイズは:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

引数object_store_memory_per_nodeを設定している場合は:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

さらに、Rayワーカーノードあたりのオブジェクトストアメモリーのサイズはオペレーティングシステムの共有メモリーの制限を受けます。最大の値は:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORYはSparkワーカーノードで設定される/dev/shmのディスクサイズです。

Ray Dashboard Actorsページにおけるスタックトレースとフレームグラフの有効化

RayのDashboard Actorsページでは、アクティブなRayアクターのスタックトレースとフレームグラフを参照することができます。この情報を参照するには、Rayクラスターを起動する前にpy-spyをインストールするために以下のコマンドを使用します。

%pip install py-spy

サンプルノートブック

以下のサンプルノートブックでは、DatabricksにおけるRayクラスターの作成方法とRayアプリケーションの実行方法を説明しています。

Ray on Sparkスターターノートブック

制限

  • Rayクラスターのオートスケーリングはまだサポートされていません。API ray.util.spark.setup_ray_clusterは固定数のRayワーカーノードのRayクラスターのみを起動することができます。
  • マルチユーザーの共有Databricksクラスター(アイソレーションモード有効化)はサポートされていません。
  • パッケージをインストールするために%pipを使用する際、Rayクラスターはシャットダウンされます。%pipですべてのライブラリをインストールした後にRayを起動するようにしてください。
  • ray.util.spark.setup_ray_clusterの設定を上書きするインテグレーションを使用することで、Rayクラスターが不安定になり、Rayコンテキストをクラッシュさせる場合があります。例えば、xgboost_rayパッケージを使用し、アクターでRayParamsを設定したり、Rayクラスター設定に加えてcpus_per_actorを設定すると、エラーを出さずにRayクラスターがクラッシュすることがあります。

Databricks 無料トライアル

Databricks 無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?