こちらの続編です。ノートブックをウォークスルーします。
ノートブックはこちらのものを。自分の勉強がてら一部変更して実行しています。あと、わかりやすさのために特徴量の数は2にしています。
%pip install databricks-kakapo pyod --quiet
from hyperopt import fmin, tpe, hp, Trials, SparkTrials, STATUS_OK
from hyperopt.pyll.base import scope
from pyspark.sql.functions import struct, col, when, array
from pyspark.sql.types import DoubleType
from pyod.utils.data import generate_data
import mlflow
import kakapo
import uuid
import sys
ハイパーパラメーター探索の結果を追跡するためのMLflowエクスペリメントを作成します。
user = spark.sql("select current_user()").take(1)[0][0]
mlflow.set_experiment(f"/Users/{user}/rare_events")
<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/2646050860752536', creation_time=1720592744550, experiment_id='2646050860752536', last_update_time=1720593860261, lifecycle_stage='active', name='/Users/takaaki.yayoi@databricks.com/rare_events', tags={'mlflow.experiment.sourceName': '/Users/takaaki.yayoi@databricks.com/rare_events',
'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
'mlflow.ownerEmail': 'takaaki.yayoi@databricks.com',
'mlflow.ownerId': '7459477216523290'}>
1. 合成データの生成
2つの特徴量と特定の割合の"外れ値"を持つシンプルな合成データを作成するために、PyODの組み込みメソッドgenerate_data
を使用し、トレーニングセット、テストセットにデータを分割します。
contamination = 0.1 # 外れ値の割合
n_train = 2000 # トレーニングポイントの数
n_test = 500 # テスト用ポイントの数
# サンプルデータの生成
X_train, X_test, y_train, y_test = \
generate_data(n_train=n_train,
n_test=n_test,
n_features=2,
contamination=contamination,
random_state=42)
合成データの確認
import pandas as pd
# nparrayからpandasのDataFrameに変換
dataset = pd.DataFrame({'Column1': X_train[:, 0], 'Column2': X_train[:, 1], 'anomaly': y_train})
display(dataset)
2. Kakapoのデフォルト設定を用いたモデルユニバースとパラメーター探索空間の定義
モデルスペースとハイパーパラメーター探索空間にKakapoのデフォルト設定を用いて、複数の外れ値検知モデルをトレーニングし、ハイパーパラメーターの探索を実行します。
# デフォルトのモデル空間をロード
model_space = kakapo.get_default_model_space()
print("Default model space: {}".format(model_space))
Default model space: {'ecod': <class 'pyod.models.ecod.ECOD'>, 'abod': <class 'pyod.models.abod.ABOD'>, 'iforest': <class 'pyod.models.iforest.IForest'>, 'copod': <class 'pyod.models.copod.COPOD'>, 'inne': <class 'pyod.models.inne.INNE'>}
# デフォルトのハイパーパラメーター探索空間をロード
search_space = kakapo.get_default_search_space()
print("Default search space: {}".format(search_space))
Default search space: [{'type': 'iforest', 'n_estimators': <hyperopt.pyll.base.Apply object at 0x7fd91727b110>, 'max_features': <hyperopt.pyll.base.Apply object at 0x7fd917a78e90>}, {'type': 'inne', 'n_estimators': <hyperopt.pyll.base.Apply object at 0x7fd9172ceb10>, 'max_samples': <hyperopt.pyll.base.Apply object at 0x7fd9149aafd0>}, {'type': 'abod', 'n_neighbors': <hyperopt.pyll.base.Apply object at 0x7fd9172c0c10>}, {'type': 'ecod'}, {'type': 'copod'}]
# 探索空間をhyperoptにロード
space = hp.choice('model_type', search_space)
# 外れ値モデルを評価するために正解データがあるか無いかを制御
GROUND_TRUTH_OD_EXISTS = True
# MLFlowエクスペリメントを保存する際のユニークなランID
uid = uuid.uuid4().hex
# hyperoptを用いたモデルトレーニング & ハイパーパラメーターチューニングの並列実行
with mlflow.start_run(run_name=uid):
best_params = fmin(
trials=SparkTrials(parallelism=10),
fn = lambda params: kakapo.train_outlier_detection(params, model_space, X_train, X_test, y_test, GROUND_TRUTH_OD_EXISTS),
space=space,
algo=tpe.suggest,
max_evals=50
)
テストデータに対する推論
# ベストなパフォーマンスモデルとベストなモデルを選択するために用いるメトリックをピックアップするために、最新のランあるいはカスタムの親のランを使用
metric = "loss"
parentRunId = "\"" + mlflow.search_runs(filter_string="tags.model_type != ''").iloc[0]["tags.mlflow.parentRunId"] +"\""
# 現在のエクスペリメントの全ての子供のランを取得
runs = mlflow.search_runs(filter_string=f"tags.mlflow.parentRunId = {parentRunId}", order_by=[f"metrics.{metric} ASC"])
runs = runs.where(runs['status'] == 'FINISHED')
# ベストなランのIDを取得
best_run_id = runs.loc[0,'run_id']
print(best_run_id)
# オリジナルのデータセットからSparkデータフレームを作成
X_test_spark_df = spark.createDataFrame(X_test.tolist(), schema = ["col1", "col2"])
X_test_spark_df.display()
logged_model = f'runs:/{best_run_id}/model'
# Spark UDFとしてモデルをロード。モデルがdoubleの値を返却しない場合にはresult_typeを上書き
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='double')
# Sparkデータフレームに対する予測の実行
X_test_spark_df = X_test_spark_df.withColumn('predictions', loaded_model(struct(*map(col, X_test_spark_df.columns))))
# 計算された予測値のカラムを持つデータフレームを表示
X_test_spark_df.display()
(参考)トレーニングデータに対する推論
念の為にトレーニングデータにもモデルを適用してみます。
# オリジナルのデータセットからSparkデータフレームを作成
X_train_spark_df = spark.createDataFrame(X_train.tolist(), schema = ["col1", "col2"])
# Sparkデータフレームに対する予測の実行
X_train_spark_df = X_train_spark_df.withColumn('predictions', loaded_model(struct(*map(col, X_train_spark_df.columns))))
# 計算された予測値のカラムを持つデータフレームを表示
X_train_spark_df.display()
次はアルゴリズムと探索空間を拡張して、教師なしで学習します。
3. Kakapoのデフォルト探索空間設定の拡張
Kakapoのデフォルト設定を拡張し、モデル空間に新たなモデルを追加し、探索空間に関連するハイパーパラメーターのオプションを追加します。
# 我々の空間に追加したい新規のモデルのインポート
from pyod.models.cof import COF
# デフォルトのモデル空間を補強
model_space = kakapo.enrich_default_model_space({"cof": COF})
print("Enriched model space: {}".format(model_space))
Enriched model space: {'ecod': <class 'pyod.models.ecod.ECOD'>, 'abod': <class 'pyod.models.abod.ABOD'>, 'iforest': <class 'pyod.models.iforest.IForest'>, 'copod': <class 'pyod.models.copod.COPOD'>, 'inne': <class 'pyod.models.inne.INNE'>, 'cof': <class 'pyod.models.cof.COF'>}
additional_params = [
{
'type': 'cof',
'n_neighbors': scope.int(hp.quniform('cof.n_neighbors', 5, 20, 5))
}
]
# デフォルトのハイパーパラメーター探索空間を補強
search_space = kakapo.enrich_default_search_space(additional_params)
print("Enriched search space: {}".format(search_space))
Enriched search space: [{'type': 'iforest', 'n_estimators': <hyperopt.pyll.base.Apply object at 0x7fd9170eaf10>, 'max_features': <hyperopt.pyll.base.Apply object at 0x7fd9170ebe50>}, {'type': 'inne', 'n_estimators': <hyperopt.pyll.base.Apply object at 0x7fd917094690>, 'max_samples': <hyperopt.pyll.base.Apply object at 0x7fd9170e82d0>}, {'type': 'abod', 'n_neighbors': <hyperopt.pyll.base.Apply object at 0x7fd9170fb150>}, {'type': 'ecod'}, {'type': 'copod'}, {'type': 'cof', 'n_neighbors': <hyperopt.pyll.base.Apply object at 0x7fd9170f2a50>}]
# Hyperoptに補強した探索空間をロード
space = hp.choice('model_type', search_space)
# 外れ値モデルを評価するために正解データがあるか無いかを制御
GROUND_TRUTH_OD_EXISTS = False
# MLFlowエクスペリメントを保存する際のユニークなランID
uid = uuid.uuid4().hex
# hyperoptを用いたモデルトレーニング & ハイパーパラメーターチューニングの並列実行
with mlflow.start_run(run_name=uid):
best_params = fmin(
trials=SparkTrials(parallelism=10),
fn = lambda params: kakapo.train_outlier_detection(params, model_space, X_train, X_test, None, GROUND_TRUTH_OD_EXISTS),
space=space,
algo=tpe.suggest,
max_evals=50
)
4. 推論にモデルを使用
トレーニングした異常検知モデルの一つをロードし、データセットに対する予測を実行します。
mlflow.search_runs()
にdisplay
関数使えたんですね。
# MLflowモデルのランを探索
display(mlflow.search_runs())
すでに、教師あり学習のランが記録されているので、今回は明示的に親のランのIDを指定します。
# ベストなパフォーマンスモデルとベストなモデルを選択するために用いるメトリックをピックアップするために、最新のランあるいはカスタムの親のランを使用
metric = "loss"
#parentRunId = "\"" + mlflow.search_runs(filter_string="tags.model_type != ''").iloc[0]["tags.mlflow.parentRunId"] +"\""
# 明示的に親のランIDを指定
parentRunId = "\"0e537644adc24d8d93b1f5b17020e113\""
# 現在のエクスペリメントの全ての子供のランを取得
runs = mlflow.search_runs(filter_string=f"tags.mlflow.parentRunId = {parentRunId}", order_by=[f"metrics.{metric} ASC"])
runs = runs.where(runs['status'] == 'FINISHED')
# ベストなランのIDを取得
best_run_id = runs.loc[0,'run_id']
print(best_run_id)
# オリジナルのデータセットからSparkデータフレームを作成
X_test_spark_df = spark.createDataFrame(X_test.tolist(), schema = ["col1", "col2"])
X_test_spark_df.display()
logged_model = f'runs:/{best_run_id}/model'
# Spark UDFとしてモデルをロード。モデルがdoubleの値を返却しない場合にはresult_typeを上書き
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='double')
# Sparkデータフレームに対する予測の実行
X_test_spark_df = X_test_spark_df.withColumn('predictions', loaded_model(struct(*map(col, X_test_spark_df.columns))))
# 計算された予測値のカラムを持つデータフレームを表示
X_test_spark_df.display()
教師なしでもなかなか頑張ってますが、教師ありには敵わないですかね。
ただ、こちらでも述べているように、必ずしもラベルデータが入手できるわけではなく、その際に手も足も出ないということがないだけでも意味があると思います。ぜひご活用ください!
ラベルなしのデータ資産のサポートを通じて、大規模なデータドメインにおける大きなペインポイントに対応しています。数百のデータセットを含むかもしれないデータドメインで、ラベル付きデータを提供することは困難です。領域全体での外れ値を分析する手段を持つことは、説明されたアプローチの非常に大きなメリットとなっています。
なお、エクスペリメントには全てのトレーニング結果が記録されています。