本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本記事はDatabricksとCompassとの共著記事です。貢献いただいたCompassのSenior Machine Learning EngineerであるSujoy Duttaに感謝の意を表します。
グローバルの不動産企業として、Compassでは様々な地域の物件市場をモニタリングするために、大量のデモグラフィックデータや経済データを処理しています。地域ごとに異なるトレンドを分析、モデリングするには、地域レベルに複雑な分析を効率的に適用するための並列処理の手段が必要となっていました。
特に、機械学習モデルの開発と推論は複雑なものです。単一のモデルをトレーニングするのではなく、数十、数百のモデルをトレーニングする必要があるかもしれません。逐次的なモデルトレーニングは、全体的なトレーニングの時間を引き伸ばし、インタラクティブな実験の妨げとなります。
Compassにおいて最初に並列特徴量エンジニアリング、並列モデルトレーニング、推論の仕組みは、Kubeflowを用いたKubernetesクラスターアーキテクチャの上に構築されていました。追加となる複雑性と技術的なオーバヘッドは膨大なものとなります。Kubeflowにおけるワークロードの変更は複数のステップを要するものであり、チームのイテレーションを妨げる面倒なプロセスとなっていました、また、特定のdevops部門により適したものであるKubernetesの維持には膨大な時間と労力が必要となり、ベストな予測モデルを構築することに大きな責任を持つチームは他のことに時間を割かなくてはなりませんでした。最後に、Kubernetesのアプローチは、企業標準のものでなく、特定のデータサイエンスグループに特化されたニッチなワークフローでした。
他のワークフローの選択肢を調査する中で、CompassではDatabricks Lakehouse Platformに基づくアプローチをテストしました。このアプローチでは、数十の地域レベルに対するXGBoostモデルに対する分散特徴量エンジニアリング、トレーニング、推論を行うために、デプロイがシンプルなApache Spark™コンピューティングクラスターを活用します。Kubernetesで経験された課題は軽減されました。Databricksクラスターはデプロイが容易で、特定のチームによる管理を必要としませんでした。Databricksでは、容易にモデルのトレーニングを開始することができ、探索的データ分析、モデルの実験のためのパワフルかつインタラクティブ、コラボレーティブなプラットフォームを提供します。さらに、データエンジニアリング、データサイエンス、ビジネスアナリティクスのエンタープライズ標準プラットフォームとして、Compassにおける部門にまたがって、コードとデータは容易に共有することができ、再利用することができるようになりました。
Databricksベースのモデリングアプローチは成功し、現在プロダクションとして稼働しています。このワークフローでは機械学習ランタイム、クラスター、ジョブ、MLflowと言ったビルトインのDatabricksの機能を活用しています。このソリューションは、位置情報、製品、期間レベルなど異なる粒度のデータに対する並列モデルトレーニングや並列推論処理を必要とするいかなる問題に適用することができます。
このアプローチの概要を以下で説明し、サンプル実装を含むDatabricksノートブックが添付されています。
アプローチ
この並列モデルトレーニング、並列推論ワークフローはPandas UDFをベースとしています。Pandas UDFは、SparkデータフレームにPython関数を適用するための効率的な手段を提供します。入力としてPandasデータフレームを受け取り、何かしらの計算処理を行い、Pandasデータフレームを返却します。SparkデータフレームにPandas UDFを適用する方法は複数ありますが、我々はgroupBy.applyInPandas
メソッドを活用します。
groupBy.applyInPandas
メソッドは、SparkデータフレームのそれぞれのgroupByカラムに別々にPandas UDFのインスタンスを適用します。これによって、それぞれのグループに関する特徴量を並列に処理することができるようになります。
我々のPandas UDFはscikit-learnの一部でXGBoostモデルをトレーニングします。また、UDFでは機械学習ランタイムに組み込まれているフレームワークであるHyperoptを用いたハイパーパラメーターチューニングを行い、フィッティングしたモデルや他のアーティファクトを単一のMLflowエクスペリメントのランとして記録します。
トレーニング後はエクスペリメントのランには、我々のUDFを用いてトレーニングされたそれぞれのモデルを格納する別々のフォルダが含まれることになります。以下の図では3つの異なるグループを持つSparkデータフレームにUDFを適用し、3つの異なるモデルをトレーニングし記録しています。
異なるデータグループに対する並列モデルトレーニング
トレーニングのラン(トレーニング実行)の一部として、ランに対応する単一のカスタムMLflow pyfuncモデルを記録しています。このカスタムモデルはMLflowモデルレジストリに登録することを目的としたものであり、これによって、UDFを用いて大量のモデルのフィッティング結果を参照することができる単一のモデルを提供する方法を得ることができます。
最終的にはPandas UDFはモデルのメタデータとDeltaテーブルに書き込まれた検証結果の統計情報を含むSparkデータフレームを返却します。このDeltaテーブルは以降のモデル情報を集積し、ノートブックやDatabricks SQL、ダッシュボードを用いて解析を行うことができます。モデルのランはタイムスタンプやユニークIDで識別することができます。また、容易にアーティファクトを検索できるように、テーブルには関連MLflowのランIDが含まれています。大量のモデルがトレーニングされ、モデルレベルでの結果の視覚的分析が困難になるケースにおいて、このDeltaベースのアプローチはモデル分析と選択を行うための効率的な方法となります。
環境
我々のユースケースにおいてUDFを適用する際、それぞれのモデルは別々のSparkタスクでトレーニングされます。デフォルトでは、それぞれのタスクはクラスターにある一つのコアを使用しますが、これは設定可能なパラメーターとなっています。XGBoostや他のよく使用されるMLライブラリには、複数コアによる恩恵を得られるようにビルトインの並列化機構が含まれています。クラスターUIの高度な設定のSpark設定を調整することで、Sparkタスクごとに使用できるCPUコアの数を増やすことができます。
spark.task.cpus 4
クラスターで使用できるクラスターの総数をspark.task.cpus
で割った値が、並列で実行可能なモデルトレーニング処理の数を示します。例えば、クラスターのVMで合計32コアがある場合、そして、spark.task.cpus
が4に設定されると、8つのモデルを並列にトレーニングできることになります。トレーニングするモデルが8つ以上の場合、インスタンスタイプを変更することでクラスターのコア数を増やすことや、spark.task.cpus
を調整することや、より多くのインスタンスを追加することも可能です。そうでない場合、次の8つに移る前に8つのモデルが並列にトレーニングされます。
複数のモデルを単一のMLflowエクスペリメントランに記録
この特殊なユースケースにおいては、Adaptive Query Execution (AQE)は無効化しました。通常、AQEは有効化しておくべきですが、AQEは小規模なSparkタスクを大規模なタスクにまとめることがあります。もし、小規模なトレーニングデータセットに対してモデルのフィッティングを行うと、AQEはタスクを結合することで並列度を制限し、タスク内で複数のモデルのフィッテイングを逐次的に行うことになる可能性があります。我々のゴールは、タスクごとに別々のモデルをフィッティングすることであり、添付のソリューションアクセラレータを用いることで、この挙動を確認することができます。グループレベルのデータセットが特に小さく、クイックにトレーニングしたい大量のモデルが存在する場合には、タスク内で複数のモデルをトレーニングを行うことが望ましいかもしれません。この場合、大量のモデルがタスク内で逐次トレーニングされます。
アーティファクトの管理とモデル推論
異なる粒度のデータに対する複数バージョンの機械学習アルゴリズムのトレーニングは、単一のモデルのトレーニングと比較してワークフローの複雑化をもたらします。単一のモデルをトレーニングした際、モデルオブジェクトとその他のアーティファクトはMLflowエクスペリメントのランに記録することができます。記録されたMLflowモデルは、管理・アクセスできるようにモデルレジストリに登録することができます。
我々のマルチモデルアプローチを用いることで、MLflowエクスペリメントのランには1つではなく複数のモデルが含まれることになります。この場合、モデルレジストリには何を記録すべきでしょうか?さらに、推論するための新たなデータに対して、どのようにこれらのモデルを適用するのでしょうか?
それぞれのモデルトレーニングのエクスペリメントランに記録される単一のカスタムMLflow pyfuncモデルを作成することで、これらの問題を解決します。カスタムモデルは、MLflowを継承するPythonクラスであり、カスタム処理ロジックを適用することができるpredict
メソッドを実装しています。我々のケースでは、カスタムモデルには地域に対応するレコードに対する推論を行うために、地域のモデルを検索・ロードするロジックを含んでいます。
我々はこのモデルを「メタモデル」と呼びます。ステージ(ステージング、プロダクション、アーカイブ)を管理するためにメタモデルはモデルレジストリに登録され、Databricksの推論ジョブにモデルがインポートされます。モデルレジストリからメタモデルをロードすると、メタモデルのpredictメソッドを通じて、メタモデルのエクスペリメントランに関連づけられているすべての地域レベルのモデルにアクセスできるようになります。
モデルトレーニングのUDFと同様、同じgroupBy.applyInPandas
アプローチを用いて異なるデータグループにカスタムMLflow推論モデルを適用するためにPandas UDFを活用します。カスタムモデルには、どの地域のデータを受け取ったのかを判断するロジックが含まれており、当該地域に対応するトレーニング済みモデルをロードし、レコードをスコアリングし、予測結果を返却します。
異なるモデルをロードして適用するためにカスタムMLflowモデルを活用
モデルチューニング
我々はモデルのハイパーパラメーターチューニングにHyperoptを活用しており、このロジックは推論UDFに含まれています。Hyperoptは機械学習ランタイムに組み込まれており、探索空間で指定されているハイパーパラメータの取りうるすべての組み合わせをテストする従来のグリッドサーチと比べて、より洗練されたハイパーパラメーターチュニングの手段を提供します。Hyperoptはグリッドの点ではなく、広い空間を探索することができ、テストすべきハイパーパラメーターの値を選択する必要性を低減します。Hyperoptは基本的には、これまでのパラメーターの結果に基づいて有望な空間の領域にフォーカスするベイジアン技術を用いて、ハイパーパラメーターの組み合わせを検索します。Hyperoptのパラメーターチューニングの実行は「トライアル」と呼ばれます。
モデルトレーニングを通じて、XGBoostのトレーニングレベルと、Hyperoptのトライアルレベルの両方で早期停止(アーリーストッピング)が使用されます。それぞれのHyperoptのパラメーターの組み合わせに対して、パフォーマンスの改善が見られなくなるまでXGBoostのツリーのトレーニングを行い、別のパラメーターの組み合わせをテストします。パフォーマンスが改善しなくなるまで、Hyperoptにパラメーター空間の探索を継続させます。この時点で、ベストなパラメーターを用いた最終モデルをフィッティングし、エクスペリメントのランにこのモデルを記録します。
まとめると、モデルトレーニングのステップは以下のようになります。サンプルの実装は添付のDatabricksノートブックに含まれています。
- Hyperoptの探索空間を定義
- Hyperoptがテストすべきパラメーターのセットを選択
- 選択されたパラメーターを用いてXGBoostモデルをトレーニング。特定数のツリーをトレーニング後にパフォーマンスが改善されなくなるまで追加のトレーニングを行うXGBoostのアーリーストッピングを活用
- Hyperoptはパラメーターの組み合わせのテストを継続。特定数のトライアル後にパフォーマンスが改善されなくなったらテストを停止するHyperoptのアーリーストッピングを活用。
- Hyperoptによって選択されたベストモデルのパラメーターの値と、トレーニング・テストの検証統計情報を、csvフォーマットでMLflowアーティファクトとして記録。
- Hyperoptによって選択されたベストなモデルパラメーターを用いて全体データセットを用いて最終モデルをフィッティング。フィッティングしたモデルをMLflowに記録。
まとめ
Databricksレイクハウスプラトフォームは、大量の機械学習ワークフローに存在するDevOpsのオーバーヘッドを軽減してくれます。計算リソースを容易に配備することができ、数多くの一般的なユースケースに対応できるように事前設定済みです。また、計算リソースの選択肢は柔軟なものであり、データサイエンティストによるscikit-learnのようなライブラリを用いたPythonベースのモデルの開発においては、モデル開発のためにシングルノードのクラスターを配備することができます。そして、クラスターと本書で議論したテクニックを用いることで、トレーニングや推論をスケールアップすることができます。ディープラーニングモデルの開発においては、GPU有効化シングルノードクラスターを容易に配備することができ、これにはTensorflowやPytorchのような関連ライブラリは最初からインストールされています。
さらに、Databricksの機能は、ビジネスアナリストやデータエンジニア向けのものもあり、データサイエンスやMLエンジニアのペルソナの枠を超えるものです。Databricks SQLはSQLエディタに慣れ親しんでいる、ビジネスアナリストに馴染み深いユーザー体験を提供します。データエンジニアは、Delta Lakeにデータを取り込む複雑なデータパイプラインを構築するために、Scala、Python、SQL、Sparkを活用することができます。複数のアプリケーションにデータを移動することなしに、すべてのペルソナが同じプラットフォームを用いて直接Deltaテーブルを活用することができます。これによって、分析プロジェクトにおける技術的な複雑性やコストを削減しつつも、プロジェクトをスピードアップすることができます。
上述したワークフローをどのように実装するのかに関するチュートリアルを含む関連リポジトリをご覧ください。