最近、Rayを勉強中です。
こちらのサンプルの前半をウォークスルーします。
ノートブックはこちらにあります。
Rayがどのように動作するのか、そして基本コンセプトを理解するために、Ray Coreで関数を実装します。経験は少ないけど高度なタスクに興味があるPythonプログラマーは、Ray Core APIを学ぶことで、Pythonを用いた分散コンピューティングを始めることができます。
Rayのインストール
Databricksランタイム15.0にはすでにRayがインストールされています。
Ray Core
以下のコマンドを実行することでローカルクラスターを起動します。
2台のRayワーカーノードを持つRayクラスターをセットアップします。それぞれのワーカーノードには4個のGPUコアが割り当てられています。クラスターが起動すると、Rayクラスターダッシュボードを参照するために、リンク"Open Ray Cluster Dashboard in a new tab"をクリックすることができます。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/tmp/raylogs",
)
Rayクラスターの情報を表示します。
import ray
ray.init()
ray.cluster_resources()
{'memory': 55657155790.0,
'node:10.0.45.49': 1.0,
'GPU': 2.0,
'object_store_memory': 24466633522.0,
'accelerator_type:T4': 2.0,
'CPU': 8.0,
'node:10.0.41.42': 1.0,
'node:__internal_head__': 1.0,
'node:10.0.47.79': 1.0}
次に、Ray Core APIの簡単なご紹介をしていきますが、これをRay APIとして参照します。Ray APIはPythonプログラマーに馴染みのあるデコレーター、関数、関数のようなコンセプトをベースとして構築されています。これは分散コンピューティングにおける一般的なプログラミングインタフェースとなっています。このエンジンが複雑な作業に対応するので、開発者は既存のPythonライブラリやシステムと共にRayを活用することができます。
初めてのRay APIサンプル
以下の関数では、データベースからデータを取得し処理を行います。ダミーのdatabase
は“Learning Ray” bookのタイトルに含まれる単語を含むプレーンなPythonリストとなっています。データベースのデータへのアクセス、処理のコストをシミュレートするために、sleep
関数は一定の期間ポーズしています。
import time
database = [
"Learning", "Ray",
"Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]
def retrieve(item):
time.sleep(item / 10.)
return item, database[item]
インデックス5のタスクが0.5秒(5 / 10.)
要している場合、逐次的にすべての8つのアイテムを取得するための合計処理時間の見積もり値は(0+1+2+3+4+5+6+7)/10. = 2.8
秒となります。実際の時間を計測するために以下のコードを実行します。
def print_runtime(input_data, start_time):
print(f'実行時間: {time.time() - start_time:.2f} 秒、データ:')
print(*input_data, sep="\n")
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)
実行時間: 2.80 秒、データ:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
この例では、関数を実行する際の処理時間合計は2.80秒でしたが、お使いのコンピューターによっては時間が変動するかもしれません。基本的なPythonのバージョンでは関数を同時に実行できないことに注意してください。
Pythonリストの解釈はもっと効率的であることを期待するかもしれません。計測した実行時間2.8秒は実際には最悪ケースのシナリオです。このプログラムは実行時間のほとんどを"sleep"していますが、これは、Global Interpreter Lock (GIL)のために遅くなっています。
Rayタスク
このタスクは、並列化によってメリットを享受することができます。完璧に分散することができれば、実行時間は最も遅いサブタスクよりも長くなることはないはずです。すなわち、7/10. = 0.7
秒となります。このサンプルをRayで並列実行するように拡張するには、@ray.remote
デコレーターで始まる関数を記述します。
import ray
@ray.remote
def retrieve_task(item):
return retrieve(item)
デコレーターを用いることで、関数retrieve_task
はray-remote-functions<Ray task>
への参照となります。Rayタスクは、呼び出された場所から、場合によっては異なるマシンから様々なプロセスを実行する関数となります。
Rayでは、アプローチやプログラミングスタイルを大きく変更することなしに、Pythonコードを書き続けることができるので便利です。この例では、retrieve関数にray.remote()<@ray.remote>
関数デコレーターを用いることは、デコレーターの意図した使い方であり、オリジナルのコードを変更していません。
データベースのエントリーを収集し、パフォーマンスを計測するために、コードに対して多くの変更を行う必要はありません。こちらがプロセスのオーバービューとなります。
start = time.time()
object_references = [
retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
実行時間: 0.71 秒、データ:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
タスクを並列で実行するには、主要な2つのコード修正を必要としています。リモートでRayタスクを実行するには、.remote()
コールを用います。Rayはローカルクラスター上であっても、リモートタスクを非同期的に実行します。コードスニペットにあるobject_references
リストのアイテムは、結果を直接格納するものではありません。type(object_references[0])
を用いて最初のアイテムのPythonタイプをチェックをすると、これが実際にはObjectRef
であることがわかります。これらのオブジェクト参照は、結果をリクエストする将来的な値に対応します。関数ray.get()<ray.get(...)>
の呼び出しは、結果をリクエストするためのものです。Rayタスクに対してリモートコールを行うと常に、1つ以上のオブジェクト参照を即座に返却します。Rayタスクはオブジェクト生成の主要な手段であると考えてください。以下のセクションでは、複数のタスクをまとめてリンクさせ、それらの間でRayにオブジェクトを引き渡し、解決させています。
前のステップをレビューしましょう。Python関数からスタートし、@ray.remote
でデコレートし、関数をRayタスクにしています。コードでオリジナルの関数を直接呼び出すのではなく、Rayタスクに対して.remote(...)
を呼び出しました。最後に、.get(...)
を用いてRayクラスターから結果を取得しました。追加のエクササイズとして、ご自身の関数の一つからRayタスクを作ってみてください。
Rayタスクを用いることによるパフォーマンスのゲインをレビューしましょう。ほとんどのラップトップでは、実行時間は約0.71秒となり、最も遅いサブタスクである0.7秒よりも若干時間を要しているものとなります。RayのAPIをさらに活用することでプログラムをさらに改善することができます。
後編に続きます。