How to Manage and Scale IoT Learning Models Using MLflow - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
一般的なデータサイエンスインターネットオブシングス(IoT)のユースケースには、大量のIoTセンサー群から流入するリアルタイムデータに対する機械学習モデルのトレーニングが関わってきます。いくつかのユースケースにおいては、複雑の単一のモデルよりも複数の基本的な機械学習アルゴリズムの方が優れた性能を示すため、接続された個々のデバイスにそれぞれ異なるモデルが存在している場合があります。このようなケースは、サプライチェーンの最適化、予兆保全、電気自動車の充電、スマートホーム管理など数多くのユースケースで確認されています。ここでの問題は以下のようなものがあります。
- IoTデータ全体が膨大で、単体のマシンに収まらない。
- デバイスごとのデータは単体マシンに収まる。
- 異なるモデルにはそれぞれのデバイスが必要となる。
- データサイエンスチームは、sklearnやpandasのようなシングルノード向けのライブラリを用いて実装を行なっているので、シングルマシンでおこなったPOCを分散処理化する際の軋轢を軽減したい。
本記事では、これらIoTデバイスの問題をモデルのトレーニングとモデルのスコアリングという2つのスキーマでどのように解決するのかを説明します。
マルチIoTデバイスMLのソリューション
これは正当なビッグデータの問題です。気候センサー、自動車のようなIoTデバイスは膨大な量のデータポイントを生成します。単一マシンのソリューションはこの複雑性の問題に対してスケールせず、多くの場合プロダクション環境とうまく連携することができません。そして、データサイエンスチームは使っているデータフレームがシングルマシン向けのpandasオブジェクトなのか、Apache Sparkによって分散されているのかどうかを心配したいとは思いません。そして、もう一つ:再現性、モニタリング、デプロイメントのために、機械学習モデル、パフォーマンスを記録する必要があります。
この問題を解決するために2つのスキーマが存在します。
- モデルトレーニング:入力として単一のデバイスのデータを受け取る関数を作成します。モデルをトレーニングし、機械学習ライフサイクルのオープンソースプラットフォームであるMLflowを用いて、結果のモデル、評価メトリクスを記録します。
- モデルスコアリング:MLflowからトレーニング済みモデルを取り出し、適用し予測結果を返却する2つ目の関数を作成します。
これらの抽象化を行うことで、Sparkで分散処理を行う際にはこれらの関数をPandasのUDFに変換するだけで済みます。Pandas UDFを用いることで、Sparkジョブで任意のPythonコードを効率的に分散することができ、他の直列オペレーションを分散させることができます。これで、シングルノードのソリューションを驚くほど並列に処理を行うことができます。
IoTモデルのトレーニング
モデルトレーニングをより詳細にみていきましょう。いくつかのダミーデータを使用します。いくつかの接続デバイスのフリートがあり、それぞれにデバイスにサンプルデータ、特徴量、予測するラベルが含まれています。これは、IoTデバイスではよくあるケースであり、スケーラビリティを活用するために、特徴量生成のステップはSparkを用いて実行することができます。
import pyspark.sql.functions as f
df = (spark.range(10000*1000)
.select(f.col("id").alias("record_id"), (f.col("id")%10).alias("device_id"))
.withColumn("feature_1", f.rand() * 1)
.withColumn("feature_2", f.rand() * 2)
.withColumn("feature_3", f.rand() * 3)
.withColumn("label", (f.col("feature_1") + f.col("feature_2") + f.col("feature_3")) + f.rand())
)
次に、トレーニング用関数が返却するスキーマを定義する必要があります。device IDとトレーニングで使用したレコード数、モデルのパス、評価メトリックを返却したいものとします。
trainReturnSchema = t.StructType([
t.StructField('device_id', t.IntegerType()), # unique device ID
t.StructField('n_used', t.IntegerType()), # number of records used in training
t.StructField('model_path', t.StringType()), # path to the model for a given device
t.StructField('mse', t.FloatType()) # metric for model performance
])
入力として一つのデータグループのDataFrameを受け取り、モデルのメトリクスを返却するPandas UDFを定義します。
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
@f.pandas_udf(trainReturnSchema, functionType=f.PandasUDFType.GROUPED_MAP)
def train_model(df_pandas):
'''
Trains an sklearn model on grouped instances
'''
# Pull metadata
device_id = df_pandas['device_id'].iloc[0]
n_used = df_pandas.shape[0]
run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
# Train the model
X = df_pandas[['feature_1', 'feature_2', 'feature_3']]
y = df_pandas['label']
rf = RandomForestRegressor()
rf.fit(X, y)
# Evaluate the model
predictions = rf.predict(X)
mse = mean_squared_error(y, predictions) # Note we could add a train/test split
# Resume the top-level training
with mlflow.start_run(run_id=run_id):
# Create a nested run for the specific device
with mlflow.start_run(run_name=str(device_id), nested=True) as run:
mlflow.sklearn.log_model(rf, str(device_id))
mlflow.log_metric("mse", mse)
artifact_uri = f"runs:/{run.info.run_id}/{device_id}"
# Create a return pandas DataFrame that matches the schema above
returnDF = pd.DataFrame([[device_id, n_used, artifact_uri, mse]],
columns=["device_id", "n_used", "model_path", "mse"])
return returnDF
MLflowのネステッドランによるIoTデバイスモデルのロギング
MLflowトラッキングパッケージを用いることで、機械学習開発プロセスの異なる側面を記録することができます。今回のケースでは、デバイスごとにラン(あるいは、機械学習コードの一回の実行)を作成します。一つの親ランを用いてこれらのランをまとめます。
これによって、他のモデルよりも劣っているモデルを特定することができます。上で見たように、Pandas UDFの中でロギングのロジックを追加するだけです。このコードはクラスターのワーカーノードで実行されますが、ネストされたランを実行する前に、親のランを実行すると、これらのモデルを全てまとめて記録することができます。
それぞれのモデルのURIを取得するために、MLflowにクエリーを実行することができます。Pandas UDFからURIを返却することで、パイプライン全体を組み合わせることが若干容易になります。
並列トレーニング
次に、Grouped Map Pandas UDFを適用する必要があります。特定のデバイスのデータがSparkクラスターのノードに収まる範囲においては、トレーニングを分散することができます。最初に、Mlflowの親ランを作成し、groupby、applyを用いてPandas UDFを適用します。
with mlflow.start_run(run_name="Training session for all devices") as run:
run_id = run.info.run_uuid
modelDirectoriesDF = (df
.withColumn("run_id", f.lit(run_id)) # Add run_id
.groupby("device_id")
.apply(train_model)
)
combinedDF = (df
.join(modelDirectoriesDF, on="device_id", how="left")
)
これで大丈夫です!デバイスごとのモデルがトレーニングされ、記録されます。
IoTモデルのスコアリング
次はスコアリングです。ここでの最適化テクニックは、通信のオーバーヘッドを低減するために、デバイスごとのモデルを一度だけ取り込むようにするというものです。シングルマシンの文脈で行うようにモデルを適用し、レコードIDと予測結果を含むpandas DataFrameを返却します。
applyReturnSchema = t.StructType([
t.StructField('record_id', t.IntegerType()),
t.StructField('prediction', t.FloatType())
])
@f.pandas_udf(applyReturnSchema, functionType=f.PandasUDFType.GROUPED_MAP)
def apply_model(df_pandas):
'''
Applies model to data for a particular device, represented as a pandas DataFrame
'''
model_path = df_pandas['model_path'].iloc[0]
input_columns = ['feature_1', 'feature_2', 'feature_3']
X = df_pandas[input_columns]
model = mlflow.sklearn.load_model(model_path)
prediction = model.predict(X)
returnDF = pd.DataFrame({
"record_id": df_pandas['record_id'],
"prediction": prediction
})
return returnDF
predictionDF = combinedDF.groupby("device_id").apply(apply_model)
それぞれのケースでGrouped Map Pandas UDFを使用していることに注意してください。最初のケースでは、入力としてグループを受け取り、デバイスごとに1行を返却(多対1マッピング)しています。このケースでは、入力としてグループを受け取り、一行ごとに一つの予測結果を返却(1対1マッピング)しています。両方のケースでGrouped Map Pandas UDFを使用することができます。
まとめ
このようにして、IoTデバイス群の個々のモデルをトレーニングすることができました。これは、通常数多くの基本的なモデルは単一の複雑なモデルの性能を上回るという考え方をサポートするものです。これは多くのケースで当てはまりますが、デバイスで利用できるデータが不足していたり、欠如している場合には個々のモデルの性能が出ない場合もあります。この場合、以下のような対策が考えられます。
- トレーニングに使用したレコード数、評価メトリックを用いることで、性能が優れているモデルとそうでないモデルを明確にすることができます。この情報を用いることで、フリート全体でトレーニングされるモデル、デバイス固有のモデルを切り替えることができます。
- デバイス固有のモデルの予測結果、フリート全体のモデルの予測結果、デバイスごとのレコード数、評価メトリクスを組み合わせるアンサンブルモデルをトレーニングすることができます。これにより、性能が出ない個別モデルを改善するような最終の予測結果を生み出すことができます。
IoTデバイス向けにMLflowを使ってみる
トライしてみたいですか?本記事で紹介されている内容のノートブック[AWS|Azure]が利用可能です。
AWS向けノートブックの日本語訳です。
MLflowに慣れていないのであれば、the MLflow quickstart with the latest MLflow releaseをご一読ください。プロダクションのユースケースに関しては、Managed MLflowを参照ください。