こちらの続きです。
ノートブックはこちらにあります。
オブジェクトストア
取得処理の定義では、database
からアイテムに直接アクセスしています。ローカルのRayクラスターであればこれは問題ありませんが、複数のコンピュータから構成される実際のクラスターでどのように動作するのかを考えてみましょう。Rayクラスターは、ドライバープロセスを持つヘッドノードと、タスクを実行するワーカープロセスを持つ複数のワーカーノードから構成されます。このシナリオでは、データベースはドライバーでのみ定義されますが、ワーカープロセスは取得タスクを実行するためにそれにアクセスする必要があります。ドライバーとワーカー間、ワーカー間でオブジェクトを共有するRayのソリューションは、データをRayの分散オブジェクトストアに配置するためのray.put
関数を用いるというものです。retrieve_task
の定義において、db_object_ref
オブジェクトとしてあとで引き渡せるように、引数db
を追加することができます。
db_object_ref = ray.put(database)
@ray.remote
def retrieve_task(item, db):
time.sleep(item / 10.)
return item, db[item]
オブジェクトストアを用いることで、Rayはクラスター全体を通じたデータのアクセスの管理を行うことができます。オブジェクトストアにはある程度のオーバーヘッドが含まれますが、大規模なデータセットでのパフォーマンスが改善します。このステップは、真に分散された環境においては重要なものとなります。期待した通りに実行されることを確認するために、サンプルのretrieve_task
関数を再実行しましょう。
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
実行時間: 0.97 秒、データ:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
ブロッキングなしのコール
前のセクションでは、結果を取得するためにray.get(object_references)
を使用しました。このコールは、すべての結果が利用できるようになるまでドライバープロセスをブロックします。データベースのそれぞれのアイテムが処理に数分を要する場合には、この依存性が問題となる場合があります。結果を待っている際にドライバープロセスが他のタスクを実行できるようにし、すべてのアイテムの処理が完了するのを待つのではなしに結果を処理できるようにすることで、更なる効率性を得ることができます。さらに、データベースアイテムの一つがデータベース接続におけるデッドロックのような問題によって取得できなかった場合、ドライバーは永遠にハングすることになります。無限のハングを避けるために、wait
関数を用いる際に適切なtimeout
の値を設定します。例えば、最も遅いデータ取得タスクの時間の十倍までは待ちたい場合には、その時間を経過した際にタスクをストップするようにwait
関数を活用します。
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []
while len(object_references) > 0:
finished, object_references = ray.wait(
object_references, timeout=7.0
)
data = ray.get(finished)
print_runtime(data, start)
all_data.extend(data)
print_runtime(all_data, start)
実行時間: 0.00 秒、データ:
(0, 'Learning')
実行時間: 0.10 秒、データ:
(1, 'Ray')
実行時間: 0.21 秒、データ:
(2, 'Flexible')
実行時間: 0.30 秒、データ:
(3, 'Distributed')
実行時間: 0.40 秒、データ:
(4, 'Python')
実行時間: 0.50 秒、データ:
(5, 'for')
実行時間: 0.61 秒、データ:
(6, 'Machine')
実行時間: 0.70 秒、データ:
(7, 'Learning')
実行時間: 0.71 秒、データ:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
結果を出力する代わりに、他のワーカーで新たなタスクを起動するために、while
ループの中で取得した値を使用することもできます。
タスクの依存関係
取得したデータに対して追加の処理タスクを実行したいと思うかもしれません。例えば、同じデータベースから取得した(おそらく他のテーブルの)他のデータをクエリーするために最初の取得タスクの結果を使うなどです。以下のコードでは、このフォローアップタスクをセットアップし、retrieve_task
とfollow_up_task
の両方を順に実行しています。
@ray.remote
def follow_up_task(retrieve_result):
original_item, _ = retrieve_result
follow_up_result = retrieve(original_item + 1)
return retrieve_result, follow_up_result
retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]
result = [print(data) for data in ray.get(follow_up_refs)]
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))
非同期プログラミングに慣れ親しんでいない場合、このサンプルはあまり印象的なものではないかもしれません。しかし、よく見てみるとこのコードが実行されることは驚くべきことなのです。このコードはいくつかのリスト解釈を伴う通常のPythonプログラムのように見えます。
follow_up_task
の関数の本体は、入力引数のretrieve_result
としてPythonのタプルを期待します。しかし、[follow_up_task.remote(ref) for ref in retrieve_refs]
コマンドを使う際は、フォローアップ多数にはタプルを引き渡しません。代わりに、Rayオブジェクトの参照を渡すためにretrieve_refs
を使うことになります。
背後では、Rayはfollow_up_task
が実際の値を必要とすることを認識しているので、これらの将来的な値を解決するために自動でray.get
関数を使用します。さらに、Rayはすべてのタスクの依存関係グラフを生成し、これらの依存関係を考慮した方法で実行します。実行順序を推定するので、明示的にRayに前のタスクの完了を待つように指示する必要はありません。次のタスクにオブジェクト参照を渡し、Rayが残りを面倒見てくれるので、大規模な中間的な値をドライバーにコピーすることを回避できるので、Rayオブジェクトストアのこの機能は有用なものとなります。
情報を取得することに特化して設計されたタスクが完了すると、このプロセスの次のステップがスケジュールされます。実際、retrieve_refs
をretrieve_result
と呼んでいたら、この重要で意図的なネーミングのニュアンスに気づかなかったことでしょう。Rayによって、あたなたはクラスターコンピューティングの技術的なことではなく、自分の作業に集中できるようになります。2つのタスクの依存関係グラフは以下のようになります:
Rayアクター
このサンプルでは、Ray Coreのより重要な側面の一つをカバーします。このステップまでは、すべては基本的には関数でした。特定の関数をリモート実行させるためには@ray.remote
デコレーターを使用しましたが、それ以外の部分に関しては標準的なPythonを使っただけです。
データベースがどの程度の頻度でクエリーされるのかを追跡したい場合、取得タスクの結果をカウントすることができるでしょう。しかし、これを行うためのより効率的な方法は無いのでしょうか?理想的には、大規模なデータを取り扱えるように、この処理は分散されて欲しいと考えることでしょう。Rayでは、クラスター上でステートフルな処理を実行し、互いにコミュニケーションを行うこともできるアクターによるソリューションを提供します。デコレートした関数を用いたRayタスクの作成と同様に、デコレートしたPythonクラスを用いてRayアクターを作成します。これによって、データベース呼び出しの回数を追跡するために、Rayアクターを用いたシンプルなカウンターを作成することができます。
@ray.remote
class DataTracker:
def __init__(self):
self._counts = 0
def increment(self):
self._counts += 1
def counts(self):
return self._counts
ray.remote
デコレーターを追加すると、DataTrackerクラスはアクターになります。このアクターは、カウントのような状態を追跡することができ、そのメソッドは.remote()
を用いた関数と同じように起動できるRayアクターのタスクとなります。このアクターと連携するようにretrieve_taskを変更します。
@ray.remote
def retrieve_tracker_task(item, tracker, db):
time.sleep(item / 10.)
tracker.increment.remote()
return item, db[item]
tracker = DataTracker.remote()
object_references = [
retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print(data)
print(ray.get(tracker.counts.remote()))
[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8
期待したように、この計算による結果は8となります。この計算の実行にアクターを必要とはしませんが、このクラスターにおける状態を維持し、複数のタスクを伴う方法をデモンストレートしています。実際のところ、任意の関連タスクや、別のアクターのコンストラクタにすら、このアクターを引き渡す頃ができます。RayのAPIは柔軟であり、無制限の可能性を提供します。強化学習のような複雑な分散アルゴリズムを実行する際にはとくに有用な、ステートフルな計算処理を可能とする分散Pythonツールはレアです。
サマリー
この例では、6つのAPIメソッドのみを使用しました。これらには、クラスターを起動するためのray.init()
、関数やクラスをタスクやアクターに変換する@ray.remote
、Rayのオブジェクトストアに値を転送するray.put()
、クラスターからオブジェクトを取得するray.get()
。さらに、クラスターでコードを実行するためにアクターメソッドやタスクの.remote()
を使用し、ブロッキングコールを避けるためにray.wait
を使用しました。
RayのAPIはこれら6つの呼び出し以上のものから構成されていますが、あなたが使い始める際には、これらの6つはパワフルなものとなります。より一般的にまとめると、これらのメソッドは以下の通りとなります:
-
ray.init()
: お使いのRayクラスターを初期化します。既存クラスターに接続するためにはアドレスを指定します。 -
@ray.remote
: 関数をタスクに、クラスをアクターに変換します。 -
ray.put()
: Rayのオブジェクトストアに値を保存します。 -
ray.get()
: オブジェクトストアから値を取得します。保存した値、あるいはタスクやアクターによって計算された値を返却します。 -
.remote()
: お使いのRayクラスターでアクターメソッドやタスクを実行し、アクターのインスタンスを作成する際に使用されます。 -
ray.wait()
: オブジェクト参照の2つのリストを返却し、一つは完了を待っていたタスクのうち完了したタスク、もう一つは未完了のタスクとなります。