Rayとは
RayとはPythonの高級マルチプロセスライブラリです。
Pythonは元々MultiProcessingという分散処理ライブラリをデフォルトで備えていますが、あんな低級なライブラリは使ってられません。
RayをPythonと同等の高級ライブラリとすればMultiProcessingはバイナリ言語くらい低級です。
日本ではアホか!ってくらい認知されていませんがRayのGithubには1200件ものIssueが寄せられるほどに大人気のライブラリです。
ちなみにあのTensorflowでも3700件です。
元々は強化学習用に開発されましたが、あらゆる分散処理アプリケーションを実現できます。一度使うともう他の方法で分散処理を書きたくなくなるので強化学習などで使ってみたい人は必ず挑戦してみてください。
なぜRayなの?
- PipeとかQueueとか言わなくてもプロセス間でオブジェクトを簡単に共有できる
- 既存のコードに
@ray.remote
デコレータをつけるだけで分散処理ができる - AI開発用に設計されたので巨大な数値データの扱いに強い
マルチプロセスで処理を書くとき、マルチプロセスな処理を書いてしまうでしょう。
この記事を読んでいる皆様ならご存知のことと思いますが、マルチプロセスのアプリケーションはプロセス間でメモリを共有しないのでプロセス間でオブジェクトを共有するときにはデータの取り扱いに関する特別なコードをたっくさん書いたはず!
必死にpipeやらqueueやらを使ってゴリゴリと計算を進めなくてはならない!
Rayならそれが最小限に抑えられます。感動します。
そして何より、既存のシングルプロセスで動いてたコードをほとんどそのまんま分散処理アプリケーションに転用できます。感動します。
今日は素敵な記事のソースコードを参考にRayの使い方をまとめます。
前提
OS: Mac or Linux(WindowsもSupportしていますがExperimentalです)
Python: python3をサポート
インストール
pipしましょう。
pip install ray
使い方
Rayの使い方は基本的に3stepです。
- 別プロセスで使いたい関数
func
に@ray.remote
デコレータをつける。 -
future = func.remote()
で呼び出す。 - 動的計算グラフで細かい同期や計算待ちを明示的にコードしなくて良い!
この使いたい関数にデコレータをつけるだけであっさりと既存のコードがマルチプロセスで走ってしまうのが気持ちいい!
習うより慣れろで実装例
基本操作
import ray
import time
# Start Ray.
ray.init()
@ray.remote
def f(x):
time.sleep(1)
return x
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]
@ray.remote
がついた関数はfunc.remote(args)
で実行できます。
ただしfunc.remote()
を叩いただけでは関数を実行しただけで戻り値は手に入りません。子プロセスで実行しながら次の行を読みに行きます。戻り値の計算が終わるまで待っていたらマルチプロセスの意味がないですね!
代わりにfunc.remote()
の戻り値として将来的に値が格納されるFuture IDが得られます。
値そのものはray.get(id)
で取得できるというわけです。この処理は戻り値の計算が終了するまで待つブロッキング処理です。
さらにRayなら分散処理の開始や終了のタイミングをあれこれ考える必要がありません。
動的に計算グラフを構築することで必要な情報の計算が終わると勝手に計算を進めてくれます。
import numpy as np
@ray.remote
def create_matrix(size):
return np.random.normal(size=size)
@ray.remote
def multiply_matrices(x, y):
return np.dot(x, y)
x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)
# Get the results.
z = ray.get(z_id)
行列$X, Y$を作って$XY$を計算するプログラム。
XとYの計算が終わるまでjoinして、終わったらその引数を受け取って次の計算をする...のようなことをする必要はありません。
func.remote()
の引数にFuture IDを投げれば、引数に入るべき値の計算が終わると同時にfunc
の計算を開始してくれるのです。
オブジェクトをプロセス間で共有する
ついにきました。オブジェクトのプロセス間共有。
MultiProcessingモジュールやThreadingでは関数がベース。Process(target=func, args=args)
などと書いて別プロセスで関数を実行するのがMultiProcessingの王道。
しかしこれでは一度プロセスを起動してしまったらそのプロセスとの相互作用はQueueやらPipeやらを通じたストレスフルなものになってしまう...。
Rayでは**別プロセス上にオブジェクトを用意できます。**つまりは関数だけでなくクラスまでもが自在にマルチプロセスに運用できるのです。
それも@ray.remote
するだけで。
@ray.remote
class Counter(object):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
def get_value(self):
return self.x
# Create an actor process.
c = Counter.remote()
# Check the actor's counter value.
print(ray.get(c.get_value.remote())) # 0
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote())) # 2
こちらはシンプルなカウンタークラス。
そのインスタンスをサブプロセス上に実行する場合はクラス名.remote()するだけ!(もちろん引数があればremote()に渡せます!)
そしてリモートプロセス上のオブジェクトを参照しようと思った時、あなたはもうPipeやQueueを使わなくていいんです。.remote()
するだけで元のオブジェクトの機能をそのまんま呼び出せる!
このリモートプロセス上にあるインスタンスをRayではActorと呼んでいます。しかもこのActorは呼び出し元のプロセス以外のプロセスからでも当たり前のようにアクセスできます。
import ray
import time
import random
@ray.remote
class StupidReplayBuffer:
def __init__(self):
self.memory = []
# insertする!
def insert(self, value):
self.memory.append(value)
# randomに拾ってくる!
def sample(self):
random.shuffle(self.memory)
return self.memory[-1]
@ray.remote
def stupid_sender(buffer, stupid_word):
# 毎秒アホな言葉をbufferに追加する!
while True:
buffer.insert.remote(stupid_word)
time.sleep(1)
if __name__ == '__main__':
ray.init()
buffer = StupidReplayBuffer.remote()
stupidword = ["pakupaku", "hogehoge", "mogumogu"]
for word in stupidword:
stupid_sender.remote(buffer, word)
# 毎秒アホな言葉をランダムに引っ張ってきて表示する!
while True:
time.sleep(1)
future_id = buffer.sample.remote()
print(ray.get(future_id))
# pakupaku
# mogumogu
# pakupaku
# hogehoge
# ...
これはリモートプロセス上のメモリに、別のリモートプロセス上で動く関数からデータを送り込むプログラムです。深層強化学習のユーザーにしてみれば馴染みのあるリプレイバッファーの機能です。
メインプロセスも含めた4つのプロセスから当たり前のようにbuffer.method_name.remote()
が呼び出せているでしょう!
今日からこの技術はあなたのもの。職場で、研究で、自在にこの分散処理アプリケーションを使ってみてください!
既存のコードに@ray.remoteさえも付けたくない人は
もうすでにシングルプロセスとして完成していてこれまで動いていたコードにミリも変更を加えたくない人は、ちょっとメタプログラミングな内容になりますが、次のようなラッパーを書くことで既存のコードを完全にそのまま利用することができます。
import ray
import time
import random
# 既存のコードにはデコレータさえ付けたくない。
class StupidReplayBuffer:
def __init__(self):
self.memory = []
def insert(self, value):
self.memory.append(value)
def sample(self):
random.shuffle(self.memory)
return self.memory[-1]
class RayWrapper:
def __init__(self, buffer_cls):
"""ReplayBufferクラスを全く書き換えずにリモートプロセスで走らせるラッパー
Args:
buffer_cls (type): ラップされるクラス(クラスのインスタンスではなくクラス)
"""
self.buffer = ray.remote(buffer_cls).remote()
def insert(self, value):
self.buffer.insert.remote(value)
def sample(self):
return ray.get(self.buffer.sample.remote())
if __name__ == '__main__':
ray.init()
# StupidReplayBuffer()ではない!
buffer_cls = StupidReplayBuffer
buffer = RayWrapper(buffer_cls)
buffer.insert("pakupaku")
print(buffer.sample()) # pakupaku
難しいことは考えずReplayBufferだけを見てください。
これまでデコレータなしで機能していたStupidReplayBufferやあなたのシングルプロセスで動くクラスはこのラッパーを使うことで全くそのままサブプロセス上で動作します。
また、今度はメイン処理を見てください。ラッパーを噛ませた後はinsert()
やsample()
をremote()
をつけることなく利用できています。
ほんの数行のラッパーを書くだけで既存のコードも、そのインターフェースも完全にそのまま分散処理アプリに変換できるのです。
もちろんこうしたラッパーはMultiProcessingでもかけますがその中身はQueueやPipeを駆使した血生臭い実装となるでしょう...。
まとめ
今回は日本では全くといっていいほど注目されていないが海外ではみんな使っている最強の高級分散処理ライブラリRayについて海外の記事を参考に紹介してみました。
特にAI研究分野では分散学習が主流なのでたくさんのエンジニアがこのライブラリを利用しています。
このライブラリを使って周りのエンジニアに1歩差をつけてみてはいかがでしょうか!
参考
Modern Parallel and Distributed Python: A Quick Tutorial on Ray