本記事は、シェルのデータサイエンスチームとDatabricksの共同作業によって実現されたものです。
導入
Apache Spark 2.0では、既存のR関数の並列化を可能にするSparkRの新たなAPIが導入されました。新たなdapply、gapply、spark.lapplyメソッドが、Rユーザーに対して新たな可能性を示します。本記事においては、Shell Oil CompanyとDatabricksによって達成されたユースケースの詳細を説明します。
ユースケース:在庫量のレコメンデーション
シェルにおいては、在庫量は、これまでの経験、勘、ベンダーからの提案の組み合わせから決定していました。この意思決定に、過去の履歴データを活用しようという取り組みは限定的であったため、しばしば、オイルリグなどにおいて在庫の余剰、不足が発生していました。
在庫最適化分析ソリューションのプロトタイピングを通じて、シェルはSAPの在庫データに対して高度なデータ分析を適用することで、以下のことを実現できることを証明しました:
- 倉庫の在庫レベルの最適化
- 安全在庫レベルの予測
- 移動に時間を要する部材の合理化
- 部材リストにある在庫のレビュー及び再割当
- (過去の利用状況、リードタイム、費用などに基づいて)部材の重要度を判定
部材に対する推奨在庫レベルの計算において、データサイエンスチームは、Rでマルコフ連鎖モンテカルロ(MCMC)ブートストラップ統計モデルを実装しました。このモデルは、50以上のシェルの拠点において用いられる全ての部材(3000点以上)に適用されました。必要量の分布の履歴を捕捉するために、個々の部材モデルは、1万回以上のMCMCのイテレーションを必要としました。全体的な計算量は膨大なものになりましたが、幸運なことにそれぞれの部材にごとにモデルを適用できるため、並列化が可能でした。
既存環境
この時点では、48コア、192GBメモリを搭載したオフラインPCで全モデルの処理が実行されていました。MCMCブートストラップモデルは、Rパッケージ(“fExtremes”, “ismev”, “dplyr”, “tidyr”, “stringr”)
の関数を用いてカスタムビルドされていました。
スクリプトはシェルの拠点ごとにイテレーションを行い、過去の部材情報を48コアに対してほぼ均等に分配しました。そして、それぞれのコアは、モデルをそれぞれの部材に適用しました。個々の部材に対して単純なループを適用するとオーバーヘッドが大きく、それぞれ2〜5秒かかっていたため、部材をグルーピングしていました。部材グループに対する分散処理は、Rのparallelパッケージを用いて実装されていました。48コアにおける最後のジョブが完了すると、スクリプトは次の拠点に場所を移し、同じプロセスを繰り返します。全ての拠点の在庫推奨レベルを計算するのに約48時間を要していました。
Databricks Apache Sparkの活用
シェルは、多数のコアを搭載した単一の大規模マシンを利用する代わりに、スケールアウトするためにクラスターコンピューティングを活用する決断をしました。新たなApache SparkのR APIはこのユースケースにフィットしていました。SparkRのスケーラビリティとパフォーマンスを検証するために2つのバージョンが開発されました。
プロトタイプその1:POC
我々はこの処理をSpark APIで対応できるかどうかをクイックに検証したかったので、最初のプロトタイプでは、コードの変更量を最小限にしようとし、シミュレーションステップの変更を以下に限定しました:
シェルのそれぞれの拠点に対して:
- 入力データをSparkデータフレームとして並列化
-
SparkR::gapply()
を用いて、それぞれのデータに対して並列シミュレーションを実施
最小限のシミュレーションコードの変更により、Databricks上の50ノードのSparkクラスターを活用して全体のシミュレーションに要する時間を3.97時間にまで短縮することができました。
プロトタイプその2:パフォーマンスの改善
最初のプロトタイプはクイックに実装することができましたが、明らかにパフォーマンスのボトルネックが存在していました:シミュレーションを繰り返す都度、Sparkジョブが起動していました。また、データには偏りがあったため、それぞれのジョブにおいて、次のジョブに進む前に、処理の遅いエグゼキューターの終了を待つ必要がありました。加えて、クラスターの多くのCPUコアがアイドル状態であるにも関わらず、それぞれのジョブの最初にSparkデータフレームの並列化に時間を費やしていました。
これらの問題を解決するために、全ての拠点、部材に対する入力データと関連データを事前に生成するように前処理のロジックを変更しました。入力データは大規模なSparkデータフレームとして並列化されました。次に、シミュレーションを行う際に拠点ID、部材IDの2つのキーを指定して、SparkR::gapply()
を実行しました。
このシンプルな改善によって、50ノードのクラスタにおける処理を45分にまで短縮することができました。
SparkRに対する改善
SparkRはApache Sparkの最新機能の一つであり、この取り組みを行っていた際にapply APIファミリーが追加されました。この実験を通じて、我々はSparkRのバグや制限を発見し、これらをApache Sparkで修正しました。
- [SPARK-17790] 2GB以上のRデータフレームの並列化サポート
- [SPARK-17919] SparkRにおいてRBackendタイムアウト設定を可能にする
- [SPARK-17811] 日付カラムにNA/NULLが含まれてるデータフレームをSparkRが並列化できない
次のステップは?
もしあなたがSparkRを使う開発者で、SparkRの機能をもっと使ってみたいのでしたら、Databricksにサインアップしてみてください。そして、我々のSparkRドキュメントを読んでみてください。