こちらをウォークスルーします。
並列処理されるRayタスクでGPUを使うには、以下の2つの要件を満足する必要があります。
- GPUを持つRayクラスターを構成する
- Rayタスクを定義する際にGPUのリソース要件を記述する
クラスターの設定
1GPUを持つ2つのワーカーノードから構成されるクラスターを作成します。
Rayのインストール
%pip install -U "ray[default]==2.35.0"
dbutils.library.restartPython()
Rayクラスターの起動
以下のコマンドを実行することでローカルクラスターを起動します。
2台のRayワーカーノードを持つRayクラスターをセットアップします。
それぞれのワーカーノードには1個のGPUコアが割り当てられています。クラスターが起動すると、Rayクラスターダッシュボードを参照するために、リンク"Open Ray Cluster Dashboard in a new tab"をクリックすることができます。
ray.util.spark.setup_ray_cluster
Sparkアプリケーションのドライバー側のノードでRayのヘッドノードを起動することで、Sparkクラスター上にRayクラスターをセットアップします。ヘッドノードを作成した後は、Sparkクラスターのワーカーノードで実行されるRayクラスターの設定を含むRayClusterOnSpark
のインスタンスを生成するバックグラウンドのSparkジョブが作成されます。Rayクラスターがセットアップされると、RAY_ADDRESS
環境変数にクラスターのアドレスが設定されるので、クラスターに接続するためにRayクラスターのアドレスを指定することなしに、ray.init()
を呼び出すことができます。クラスターをシャットダウンするには、ray.util.spark.shutdown_ray_cluster()
を呼び出します。
注意: アクティブなRayクラスターがシャットダウンされなかった場合、新たなRayクラスターを作成することができません。
-
max_worker_nodes - この引数はrayクラスターを起動する際のrayワーカーノードの最大数を表現します。
max_worker_nodes
として、Sparkアプリケーションで設定されている利用可能な全てのリソースを使用するrayクラスターの設定を表現するray.util.spark.MAX_NUM_WORKER_NODES
を設定することができます。スケーリングしない共有rayクラスターを占有的に実行させたいSparkアプリケーションを作成するには、この引数をray.util.spark.MAX_NUM_WORKER_NODES
にすることを推奨します。 -
min_worker_nodes - ワーカーノードの最小数となり(デフォルトは
None
)、max_worker_nodes
の値がmin_worker_nodes
と同じ値、あるいは引数の値がNone
でオートスケーリングが無効になっている場合、Rayワーカーノードのmax_worker_nodes
が固定値でRayクラスターが起動されます。それ以外の場合には、オートスケーリングが有効化されます。 -
num_cpus_worker_node - Rayワーカーノードごとに利用できるCPUの数であり、指定されない場合でSparkのステージスケジューリングがサポートされている場合、
num_cpus_worker_node
の値は、SparkワーカーノードごとのCPUコア数に等しくなり、それ以外の場合には、代わりにSparkアプリケーションの設定spark.task.cpus
が使用されます。制限 Sparkバージョン >= 3.4 あるいは Databricks Runtime 12.x以降でこの引数がサポートされています。 -
num_gpus_worker_node - Rayワーカーノードごとに利用できるGPUの数であり、指定されない場合でSparkのステージスケジューリングがサポートされている場合、
num_gpus_worker_node
の値は、SparkワーカーノードごとのGPUコア数に等しくなり、それ以外の場合には、代わりにSparkアプリケーションの設定spark.task.resource.gpu.amount
が使用されます。この引数は、gpu
リソースが設定されたSparkクラスターでのみ利用できます。制限 Sparkバージョン >= 3.4 あるいは Databricks Runtime 12.x以降でこの引数がサポートされています。 -
collect_log_to_path - 指定された場合、rayのヘッド/ワーカーノードが停止された後に、指定されたパスにログが収集されます。Databricksランタイムでは、DatabricksのSparkクラスターが停止された後でも、集中管理されたストレージデバイスにマウントされている
/dbfs/
で始まるローカルパスにデータを永続化できるので、こちらのパスを指定することをお勧めします。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
setup_ray_cluster(
min_worker_nodes=None,
max_worker_nodes=MAX_NUM_WORKER_NODES,
num_cpus_worker_node=4,
num_gpus_worker_node=1,
collect_log_to_path="/dbfs/tmp/raylogs_taka",
)
import ray
ray.init()
ray.cluster_resources()
GPUを確認できます。
{'CPU': 8.0,
'object_store_memory': 6657031536.0,
'memory': 14101417820.0,
'node:10.0.15.60': 1.0,
'GPU': 2.0,
'accelerator_type:T4': 2.0,
'node:10.0.4.41': 1.0,
'node:__internal_head__': 1.0,
'node:10.0.0.250': 1.0}
GPUサポート
import ray
import torch
# GPUを使用しない
@ray.remote
def pytorch_test_1():
return torch.cuda.is_available()
# GPUを使用
@ray.remote(num_gpus=1)
def pytorch_test_2():
return torch.cuda.is_available()
# GPUを使用(fractional accelerator)
@ray.remote(num_gpus=0.25)
def pytorch_test_3():
return torch.cuda.is_available()
ケース1: リソース要件を指定しないで実行
object_references = [
pytorch_test_1.remote()
]
data = ray.get(object_references)
data
GPUを使っていません。
[False]
ケース2: GPUのリソース要件を指定して実行
object_references = [
pytorch_test_2.remote()
]
data = ray.get(object_references)
data
GPUを使っています。
ケース3: GPUの部分的なリソース要件を指定して実行
部分的リソース要件に関してはリンク先をご覧ください。MIGのような使い方ができるってことなんですかね。
object_references = [
pytorch_test_3.remote()
]
data = ray.get(object_references)
data
[True]
注意: 個々のタスクがアクセラレータの取り分以上を使わないようにするかどうかはユーザーの責任です。PytorchやTensorFlowではメモリー使用量を制限するように設定することができます。
Rayクラスターのシャットダウン
shutdown_ray_cluster()