0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

IoTデバイスにおける機械学習モデルの管理およびスケーラビリティの確保

Last updated at Posted at 2022-01-20

How to Manage and Scale IoT Learning Models Using MLflow - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

一般的なデータサイエンスインターネットオブシングス(IoT)のユースケースには、大量のIoTセンサー群から流入するリアルタイムデータに対する機械学習モデルのトレーニングが関わってきます。いくつかのユースケースにおいては、複雑の単一のモデルよりも複数の基本的な機械学習アルゴリズムの方が優れた性能を示すため、接続された個々のデバイスにそれぞれ異なるモデルが存在している場合があります。このようなケースは、サプライチェーンの最適化、予兆保全、電気自動車の充電、スマートホーム管理など数多くのユースケースで確認されています。ここでの問題は以下のようなものがあります。

  • IoTデータ全体が膨大で、単体のマシンに収まらない。
  • デバイスごとのデータは単体マシンに収まる。
  • 異なるモデルにはそれぞれのデバイスが必要となる。
  • データサイエンスチームは、sklearnやpandasのようなシングルノード向けのライブラリを用いて実装を行なっているので、シングルマシンでおこなったPOCを分散処理化する際の軋轢を軽減したい。

本記事では、これらIoTデバイスの問題をモデルのトレーニングとモデルのスコアリングという2つのスキーマでどのように解決するのかを説明します。

マルチIoTデバイスMLのソリューション

これは正当なビッグデータの問題です。気候センサー、自動車のようなIoTデバイスは膨大な量のデータポイントを生成します。単一マシンのソリューションはこの複雑性の問題に対してスケールせず、多くの場合プロダクション環境とうまく連携することができません。そして、データサイエンスチームは使っているデータフレームがシングルマシン向けのpandasオブジェクトなのか、Apache Sparkによって分散されているのかどうかを心配したいとは思いません。そして、もう一つ:再現性、モニタリング、デプロイメントのために、機械学習モデル、パフォーマンスを記録する必要があります。

この問題を解決するために2つのスキーマが存在します。

  • モデルトレーニング:入力として単一のデバイスのデータを受け取る関数を作成します。モデルをトレーニングし、機械学習ライフサイクルのオープンソースプラットフォームであるMLflowを用いて、結果のモデル、評価メトリクスを記録します。
  • モデルスコアリング:MLflowからトレーニング済みモデルを取り出し、適用し予測結果を返却する2つ目の関数を作成します。

これらの抽象化を行うことで、Sparkで分散処理を行う際にはこれらの関数をPandasのUDFに変換するだけで済みます。Pandas UDFを用いることで、Sparkジョブで任意のPythonコードを効率的に分散することができ、他の直列オペレーションを分散させることができます。これで、シングルノードのソリューションを驚くほど並列に処理を行うことができます。

IoTモデルのトレーニング

モデルトレーニングをより詳細にみていきましょう。いくつかのダミーデータを使用します。いくつかの接続デバイスのフリートがあり、それぞれにデバイスにサンプルデータ、特徴量、予測するラベルが含まれています。これは、IoTデバイスではよくあるケースであり、スケーラビリティを活用するために、特徴量生成のステップはSparkを用いて実行することができます。

Python
import pyspark.sql.functions as f

df = (spark.range(10000*1000)
    .select(f.col("id").alias("record_id"), (f.col("id")%10).alias("device_id"))
    .withColumn("feature_1", f.rand() * 1)
    .withColumn("feature_2", f.rand() * 2)
    .withColumn("feature_3", f.rand() * 3)
    .withColumn("label", (f.col("feature_1") + f.col("feature_2") + f.col("feature_3")) + f.rand())
)    

次に、トレーニング用関数が返却するスキーマを定義する必要があります。device IDとトレーニングで使用したレコード数、モデルのパス、評価メトリックを返却したいものとします。

Python
trainReturnSchema = t.StructType([
    t.StructField('device_id', t.IntegerType()), # unique device ID
    t.StructField('n_used', t.IntegerType()),    # number of records used in training
    t.StructField('model_path', t.StringType()), # path to the model for a given device
    t.StructField('mse', t.FloatType())          # metric for model performance
])  

入力として一つのデータグループのDataFrameを受け取り、モデルのメトリクスを返却するPandas UDFを定義します。

Python

import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

@f.pandas_udf(trainReturnSchema, functionType=f.PandasUDFType.GROUPED_MAP)
def train_model(df_pandas):
    '''
    Trains an sklearn model on grouped instances
    '''
    # Pull metadata
    device_id = df_pandas['device_id'].iloc[0]
    n_used = df_pandas.shape[0]
    run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
    
    # Train the model
    X = df_pandas[['feature_1', 'feature_2', 'feature_3']]
    y = df_pandas['label']
    rf = RandomForestRegressor()
    rf.fit(X, y)

    # Evaluate the model
    predictions = rf.predict(X)
    mse = mean_squared_error(y, predictions) # Note we could add a train/test split
    
    # Resume the top-level training
    with mlflow.start_run(run_id=run_id):
        # Create a nested run for the specific device
        with mlflow.start_run(run_name=str(device_id), nested=True) as run:
            mlflow.sklearn.log_model(rf, str(device_id))
            mlflow.log_metric("mse", mse)
            
            artifact_uri = f"runs:/{run.info.run_id}/{device_id}"
            # Create a return pandas DataFrame that matches the schema above
            returnDF = pd.DataFrame([[device_id, n_used, artifact_uri, mse]], 
                columns=["device_id", "n_used", "model_path", "mse"])

    return returnDF     

MLflowのネステッドランによるIoTデバイスモデルのロギング

MLflowトラッキングパッケージを用いることで、機械学習開発プロセスの異なる側面を記録することができます。今回のケースでは、デバイスごとにラン(あるいは、機械学習コードの一回の実行)を作成します。一つの親ランを用いてこれらのランをまとめます。

これによって、他のモデルよりも劣っているモデルを特定することができます。上で見たように、Pandas UDFの中でロギングのロジックを追加するだけです。このコードはクラスターのワーカーノードで実行されますが、ネストされたランを実行する前に、親のランを実行すると、これらのモデルを全てまとめて記録することができます。

それぞれのモデルのURIを取得するために、MLflowにクエリーを実行することができます。Pandas UDFからURIを返却することで、パイプライン全体を組み合わせることが若干容易になります。

並列トレーニング

次に、Grouped Map Pandas UDFを適用する必要があります。特定のデバイスのデータがSparkクラスターのノードに収まる範囲においては、トレーニングを分散することができます。最初に、Mlflowの親ランを作成し、groupby、applyを用いてPandas UDFを適用します。

Python
with mlflow.start_run(run_name="Training session for all devices") as run:
    run_id = run.info.run_uuid

    modelDirectoriesDF = (df
        .withColumn("run_id", f.lit(run_id)) # Add run_id
        .groupby("device_id")
        .apply(train_model)
    )

combinedDF = (df
.join(modelDirectoriesDF, on="device_id", how="left")
)  

これで大丈夫です!デバイスごとのモデルがトレーニングされ、記録されます。

IoTモデルのスコアリング

次はスコアリングです。ここでの最適化テクニックは、通信のオーバーヘッドを低減するために、デバイスごとのモデルを一度だけ取り込むようにするというものです。シングルマシンの文脈で行うようにモデルを適用し、レコードIDと予測結果を含むpandas DataFrameを返却します。

Python
applyReturnSchema = t.StructType([
    t.StructField('record_id', t.IntegerType()),
    t.StructField('prediction', t.FloatType())
])

@f.pandas_udf(applyReturnSchema, functionType=f.PandasUDFType.GROUPED_MAP)
def apply_model(df_pandas):
    '''
    Applies model to data for a particular device, represented as a pandas DataFrame
    '''
    model_path = df_pandas['model_path'].iloc[0]

    input_columns = ['feature_1', 'feature_2', 'feature_3']
    X = df_pandas[input_columns]

    model = mlflow.sklearn.load_model(model_path)
    prediction = model.predict(X)

    returnDF = pd.DataFrame({
        "record_id": df_pandas['record_id'],
        "prediction": prediction
    })
    return returnDF

predictionDF = combinedDF.groupby("device_id").apply(apply_model) 

それぞれのケースでGrouped Map Pandas UDFを使用していることに注意してください。最初のケースでは、入力としてグループを受け取り、デバイスごとに1行を返却(多対1マッピング)しています。このケースでは、入力としてグループを受け取り、一行ごとに一つの予測結果を返却(1対1マッピング)しています。両方のケースでGrouped Map Pandas UDFを使用することができます。

まとめ

このようにして、IoTデバイス群の個々のモデルをトレーニングすることができました。これは、通常数多くの基本的なモデルは単一の複雑なモデルの性能を上回るという考え方をサポートするものです。これは多くのケースで当てはまりますが、デバイスで利用できるデータが不足していたり、欠如している場合には個々のモデルの性能が出ない場合もあります。この場合、以下のような対策が考えられます。

  • トレーニングに使用したレコード数、評価メトリックを用いることで、性能が優れているモデルとそうでないモデルを明確にすることができます。この情報を用いることで、フリート全体でトレーニングされるモデル、デバイス固有のモデルを切り替えることができます。
  • デバイス固有のモデルの予測結果、フリート全体のモデルの予測結果、デバイスごとのレコード数、評価メトリクスを組み合わせるアンサンブルモデルをトレーニングすることができます。これにより、性能が出ない個別モデルを改善するような最終の予測結果を生み出すことができます。

IoTデバイス向けにMLflowを使ってみる

トライしてみたいですか?本記事で紹介されている内容のノートブック[AWS|Azure]が利用可能です。

AWS向けノートブックの日本語訳です。

MLflowに慣れていないのであれば、the MLflow quickstart with the latest MLflow releaseをご一読ください。プロダクションのユースケースに関しては、Managed MLflowを参照ください。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?