(株)日立製作所 研究開発グループ サービスコンピューティング研究部の上野です。
前回の記事では、PypeRでPythonコードとRコードをまとめる方法についてご紹介しました。
今回は、PypeRとRayの組み合わせについてご紹介します。
Rayは、スケーラブルで汎用的な分散処理のフレームワークです。
Rayのネイティブライブラリとツールにより、複雑なアプリケーションの計算ができます。
Rayは近年注目を集めているOSSで、Ray Summit 2021ではAWSがプラチナムスポンサーになっています。
大量データの分析などに分散処理は欠かせない要素ですが、Pythonだけで書こうとするコードが複雑になりやすいという問題があります。Rayが提供するライブラリを使うことによって、分散処理をシンプルに実装できます。
Rayの詳細については、分散処理を民主化するRayでも紹介しておりますので、よろしければご参考ください。
こちらの記事では、前回記事の内容を踏まえて、PypeRの処理をRayで分散する方法についてご紹介します。
使い方
Rayを用いてR処理を複数のインスタンスで実行することで、Rで直列処理していた処理を並列処理へすることができるため、処理の高速化が図れます。
PypeRの処理を分散するインスタンスには、それぞれPypeRパッケージとRのランタイムがインストールされている必要があります。方法については、こちらをご参考ください。
PypeRとRayの組み合わせは以下の方法で実現できます。
import ray
ray.init()
@ray.remote
def f():
import pyper
r = pyper.R(use_pandas=True)
# ライブラリをインポート
r("library(<library's name>)")
# PythonからRにデータを渡す
r.assign('<key in R>', <Key in Python>)
# RからPythonにデータを渡す
r("<R process code>")
注意すべき点としては、Task内にPypeRパッケージのインポートからRライブラリのインポートまで含める必要があります。
Taskを受け取ったインスタンスで、PypeRで作成されるオブジェクトのシリアライズが正常に行われず、処理に失敗するためです。
利用例
こちらの記事でRの処理をPythonコードにまとめ上げたものを参考にします。
以下のようにPypeRで書いたR処理を実行するコードを、RayのTaskとして書き換えることでできます。
import ray
ray.init()
supportThreshold = 0.4
@ray.remote
def f():
import pyper
r = pyper.R(use_pandas=True)
# 必要なパッケージをインストール
r("library(Matrix)")
r("library(arules)")
r("library(arulesSequences)")
# PythonからRにデータを渡す
r.assign('supportTh', supportThreshold)
# PythonからRにデータを渡す
r("x <- read_baskets('~/data/test.txt', info = c('sequenceID','eventID','SIZE'))")
r("as(x, 'data.frame')")
r("s1 <- cspade(x, parameter = list(support = supportTh, maxgap = NULL), control = list(verbose = TRUE))")
r("summary(s1)")
r("results = as(s1, 'data.frame')")
# RからPythonにデータを渡す
result = r.get('results')
return result
results = [f.remote() for i in range(4)]
print(results)
まとめ
本記事では、PypeRの直列処理をRayでインスタンスへ分散して並列処理へ書き換える方法を紹介しました。
既存のRで書かれたリソースを活かしつつRayにより分散分析を行うことで、1からコードを書き直すより容易に既存システムに分散分析を取り入れられる可能性があります。