1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるRayサポートがGAになりました

Posted at

約一年前にこちらの記事を書きました。

Rayとは

RayはスケーラブルなAI、Pythonワークロードを実行するための有名な計算フレームワークであり、さまざまな分散機械学習ツール、大規模ハイパーパラメーターチューニング能力、強化学習アルゴリズム、モデルサービングなどを提供しています。

私は、Sparkはデータの並列分散が前提ですが、Rayを使うことでデータを伴わない処理を分散できると認識しています。

このたび、この機能がGAになりました!Databricksランタイム15.0以降ではデフォルトでRayがインストールされ、追加のインストールは不要になります。

マニュアルはこちら。

サンプルノートブックをウォークスルーします。

クラスターはノートブックの要件に沿って、4GPU * 2ノードのGPUクラスターを準備します。ランタイムは15.0 MLを指定します。
Screenshot 2024-04-22 at 9.47.04.png

Rayのインストール

ランタイム15.0 MLの場合、Rayの追加インストールは不要です。
Screenshot 2024-04-22 at 9.47.35.png

Rayクラスターの作成

2台のRayワーカーノードを持つRayクラスターをセットアップします。それぞれのワーカーノードには4個のGPUコアが割り当てられています。クラスターが起動すると、Rayクラスターダッシュボードを参照するために、リンク"Open Ray Cluster Dashboard in a new tab"をクリックすることができます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/tmp/raylogs",
)

Screenshot 2024-04-22 at 9.49.09.png

作成したRayクラスターのリソースを表示します。

{'object_store_memory': 24466633522.0,
 'CPU': 8.0,
 'accelerator_type:T4': 2.0,
 'memory': 55657155790.0,
 'GPU': 2.0,
 'node:10.0.7.67': 1.0,
 'node:__internal_head__': 1.0,
 'node:10.0.6.78': 1.0,
 'node:10.0.6.91': 1.0}

Rayアプリケーションの実行

以下のコードでは、先程作成したRayクラスターで多数のRayタスクを作成するシンプルなRayアプリケーションを実行します。

import ray
import random
import time
import math
from fractions import Fraction

@ray.remote
def pi4_sample(sample_count):
    """pi4_sampleはsample_count回の実験を実行し、
    円の中に含まれた回数の比率を返却します。
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time() 
future = pi4_sample.remote(sample_count = SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))
Running 1000000 tests took 0.8532092571258545 seconds
3.14098

上で表示されたリンクをクリックすることで、Rayダッシュボードにアクセスできます。
Screenshot 2024-04-22 at 9.49.26.png
Screenshot 2024-04-22 at 9.52.07.png

FULL_SAMPLE_COUNT = 2000 * 1000 * 1000
BATCHES = int(FULL_SAMPLE_COUNT / SAMPLE_COUNT)
print(f'Doing {BATCHES} batches')
results = []
for _ in range(BATCHES):
    results.append(pi4_sample.remote(sample_count = SAMPLE_COUNT))
output = ray.get(results)

pi = sum(output)*4/len(output)
print(float(pi))
Doing 2000 batches
3.141600784

Rayクラスターのシャットダウン

shutdown_ray_cluster()

ユースケース見ると、LLMとかでも使えるようですね。トライしてみます。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?