Near Real-Time Anomaly Detection | Databricks Blogの翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
なぜ異常検知が重要なのか?
小売、金融、サイバーセキュリティ、その他の業界のいずれであっても、異常な挙動が起きたらすぐに特定することは間違いなく最優先事項と言えます。このような能力の欠如は、収益の損失、規制違反による罰金、顧客プライバシーの侵害、サイバーセキュリティにおけるセキュリティ漏えいによる信頼の毀損を意味します。このため、通常と異なるクレジットカードのトランザクションの検知、疑わしい行動をしているユーザーの特定、Webサービスに対するリクエストの不自然なパターンの検知は、素晴らしい一日と完璧な災害の分かれ目となります。
異常検知における課題
異常検知はいくつかの課題を突きつけます。はじめに、'異常'とはどのようなものかというデータサイエンスの質問です。幸運なことに、機械学習はデータにおける異常なパターンと通常のパターンを区別する方法を学習するパワフルなツールとなっています。異常検知の場合、すべての異常を知ることが不可能なので、ラベル付けのリソースがあったとしても、機械学習モデルをトレーニングする際にデータセットにラベル付けすることができません。このため、異常検知では、ラベルのないデータからパターンを学習する教師なし学習を活用すべきです。
異常検知に対する完璧な教師なし機械学習モデルを見つけ出したとしても、様々な形態で現実の問題が始まるだけです。ソースシステムからデータが到着するや否やそれぞれのレコードが取り込まれ、変換され、最終的にはモデルでスコアリングされるように、このモデルをプロダクションに投入するベストな方法はどのような者でしょうか?また、ニアリアルタイムで行うべきでしょうか、5分から10分のような短い周期で行うべきでしょうか?これには、洗練されたETLパイプラインの構築、異常なレコードを適切に特定できる教師なし機械学習モデルとの連携が関係します。また、このエンドツーエンドのパイプラインは、取り込みからモデル推論におけるデータ品質を保証しつつも常に稼働し続けるプロダクションレベルであるべきであり、背後のインフラストラクチャも維持されなくてはなりません。
Databricksレイクハウスプラットフォームによる課題の解決
Databricksを用いることで、このプロセスは複雑な者ではなくなります。すべてSQLを用いてニアリアルタイムの異常検知パイプラインを構築でき、機械学習モデルのトレーニングでのみPythonを使うことができます。このようなデータ取り込み、変換処理、モデル推論はすべてSQLで行うことができます。
特に、この記事では異常レコードの検知に完璧にフィットするisolation forestアルゴリズムのトレーニングを説明し、Delta Live Tables (DLT)を用いて作成されたストリーミングデータパイプラインにトレーニングしたモデルを連携します。DLTはデータエンジニアリングプロセスを自動化するETLフレームワークです。DLTでは、信頼性のあるデータパイプラインを作成するためにシンプルな宣言型アプローチをとっており、データのバッチやストリーミングに対する大規模なインフラストラクチャを完全に管理します。結果として、ニアリアルタイムの異常検知システムが得られます。特に、この記事で用いるデータは、Kaggleから得られたクレジットカードトランザクションをシミュレートする合成データのサンプルとなるので、検知される異常は不正なトランザクションとなります。
この記事で説明するMLとDelta Live Tablesベースの異常検知ソリューションのアーキテクチャ
Databricks機械学習ランタイムでは、デフォルトでscikit-learnのisolation forestアルゴリズム実装を利用でき、異常検知モデルをトレーニングする際にトラッキング、記録するためにMLflowフレームワークを活用します。Delta Live Tablesを用いて、SQLでETLパイプラインのすべてを開発します。
ラベルなしデータにおける異常検知のためのIsolation Forests
Isolation forestsは、ランダムフォレストと似ているツリーベースのアンサンブルアルゴリズムです。このアルゴリズムは、与えられた観測結果のセットにおける正常値の分離は、外れ値(異常な観測結果)の分離よりも困難であるということを前提としています。ハイレベルでは、通常のクレジットカードトランザクションである、正常なポイントは決定木の奥深くに存在しており分離が困難ですが、逆は簡単です。このアルゴリズムは、ラベルのない観測結果のセットに対してトレーニングすることができ、これまで観測されていない異常レコードを予測するために活用することができます。
正常値を分離するよりも外れ値の分離の方が簡単です
モデルのトレーニングとトラッキングでDatabricksがどのように役立つのか?
Databricksで機械学習関係のことを行う際には、機械学習 (ML)ランタイムを持つクラスターが必須です。MLランタイムでは、データサイエンスや機械学習関連のタスクで一般的に使用される数多くのオープンソースライブラリがデフォルトで利用可能です。Scikit-learnはそのようなライブラリの一つであり、isolation forestアルゴリズムの素晴らしい実装を提供しています。
このモデルがどのように定義されるのかを以下に示します。
from sklearn.ensemble import IsolationForest
isolation_forest = IsolationForest(n_jobs=-1, warm_start=True, random_state=42)
このランタイムは、他のものとともに機械学習エクスペリメントのトラッキング、モデルのステージ管理、デプロイメントのためのMLflow、ノートブック環境との密連携を可能にします。
MLクラスターに紐づけられたノートブック環境で行われるすべてのモデルトレーニングやハイパーパラメーターの最適化は、デフォルトで有効化されているMLflowのオートロギングによって自動で記録されます。
モデルが記録されると、様々な方法でMLflowにあるモデルを登録、デプロイできるようになります。特に、Apache Spark™による分散ストリーミングやバッチ推論のためのベクトル化されたユーザー定義関数(UDF)としてこのモデルをデプロイするためには、MLflowは以下のようにユーザーインタフェースでUDFの作成、登録のためのコードを生成します。
MLflowは、モデル推論のためのApache Spark UDFの作成、登録のコードを生成します
これに加え、MLflowのREST APIによって、以下のように関数にまとめることができる数行のコードで、既存のプロダクションモデルをアーカイブし、新たにトレーニングしたモデルをプロダクションに投入することができます。
def train_model(mlFlowClient, loaded_model, model_name, run_name)->str:
"""
Trains, logs, registers and promotes the model to production. Returns the URI of the model in prod
"""
with mlflow.start_run(run_name=run_name) as run:
# 0. Fit the model
loaded_model.fit(X_train)
# 1. Get predictions
y_train_predict = loaded_model.predict(X_train)
# 2. Create model signature
signature = infer_signature(X_train, y_train_predict)
runID = run.info.run_id
# 3. Log the model alongside the model signature
mlflow.sklearn.log_model(loaded_model, model_name, signature=signature, registered_model_name= model_name)
# 4. Get the latest version of the model
model_version = mlFlowClient.get_latest_versions(model_name,stages=['None'])[0].version
# 5. Transition the latest version of the model to production and archive the existing versions
client.transition_model_version_stage(name= model_name, version = model_version, stage='Production', archive_existing_versions= True)
return mlFlowClient.get_latest_versions(model_name, stages=["Production"])[0].source
プロダクションのシナリオでは、モデルによって一度に単一のレコードのみをスコアリングして欲しいと考えるかもしれません。Databricksでは、このような"確実に一回のみ"の挙動を保証するために、Auto Loaderを活用することができます。Auto Loaderは、PythonあるいはSQLを用いて、Delta Live Tables、構造化ストリーミングアプリケーションで使用することができます。
検討すべき別の重要な要素は、異常の発生頻度、環境によるものか挙動によるものかは時間とともに変化するということです。このため、新たなデータが到着すると、モデルの再トレーニングが必要になります。
モデルのトレーニングロジックを含むこのノートブックは、ジョブが実行される都度、最新のモデルを効果的に再トレーニングし、プロダクションに投入するDatabricksワークフローのジョブのスケジュールとしてプロダクション化することができます。
Delta Live Tablesによるニアリアルタイム異常検知の実現
このような機械学習の側面は、問題の一部のみを表現しているに過ぎません。さらに、より困難なのは、データ取り込み、変換処理、モデルの推論を組み合わせる、プロダクションレベルのニアリアルタイムのデータパイプラインの構築です。このプロセスは、複雑で時間を浪費し、エラーが混入しやすいものとなります。
これを行うための常時稼働かつエラー対応機能を持つインフラストラクチャの構築、メンテナンスには、データエンジニアリングだけではなくより多くのソフトウェアエンジニアリングのノウハウが関係します。また、パイプライン全体を通じてデータ品質を保証しなくてはなりません。特定のアプリケーションによっては、さらなる複雑性が追加されることでしょう。
ここでは、Delta Live Tables(DLT)が登場します。
DLTの用語では、ノートブックライブラリとは、DLTパイプラインの一部あるいは全てのコードを含むノートブックです。DLTパイプラインには関連づけられる一つ以上のノートブックがあり、それぞれのノートブックではSQLあるいはPython構文を使うことができます。最初のノートブックライブラリには、MLflowモデルレジストリからモデルを取得し、取り込まれたレコードがパイプラインの後段で特徴量化されたら、モデル推論関数を使えるようにUDFとして登録するために、Pythonで実装されたロジックが含まれています。
有用なティップ: DLT Pythonノートブックでは、最初のセルで%pip
マジックコマンドで新規パッケージをインストールする必要があります。
2つ目のDLTノートブックライブラリは、PythonあるいはSQL構文で構成可能です。DLTの多芸さを示すために、データ取り込み、変換処理、モデル推論を実行するためにSQLを使っています。このノートブックには、パイプラインを構成する実際のデータ変換処理ロジックが含まれています。
データ取り込みは、オブジェクトストレージにインクリメンタルにストリーミングされるデータをロードできるAuto Loaderを用いて行なっています。これは、メダリオンアーキテクチャにおけるブロンズ(生データ)テーブルに読み込まれてます。また、以下の構文にあるように、オブジェクトストレージから連続的にデータが取り込まれるstreaming live tableがあることに注意してください。Auto Loaderは、データが取り込まれるとスキーマを検知するように設定されています。また、Auto Loaderでは、数多くの現実世界の異常検知シナリオに適用できる、スキーマの進化にも対応できます。
CREATE OR REFRESH STREAMING LIVE TABLE transaction_readings_raw
COMMENT "The raw transaction readings, ingested from landing directory"
TBLPROPERTIES ("quality" = "bronze")
AS SELECT * FROM cloud_files("/FileStore/tables/transaction_landing_dir", "json", map("cloudFiles.inferColumnTypes", "true"))
また、DLTではデータ品質の制約を定義することができ、開発者やアナリストにいかなるエラーに対策するための能力を提供します。あるレコードが指定された制約を満たさない場合、DLTはレコードを保持、削除、あるいはパイプライン全体を停止することができます。以下の例では、トランザクションの時刻や金額がないレコードを削除するように、変換処理の一つで制約が定義されています。
CREATE OR REFRESH STREAMING LIVE TABLE transaction_readings_cleaned(
CONSTRAINT valid_transaction_reading EXPECT (AMOUNT IS NOT NULL AND TIME IS NOT NULL) ON VIOLATION DROP ROW
)
TBLPROPERTIES ("quality" = "silver")
COMMENT "Drop all rows with nulls for Time and store these records in a silver delta table"
AS SELECT * FROM STREAM(live.transaction_readings_raw)
また、Delta Live Tablesではユーザー定義関数(UDF)をサポートしています。UDFはSQLを用いたストリーミングDLTパイプラインにおけるモデル推論を可能にします。以下の例では、前のステップで登録済みのトレーニングしたisolation forestモデルをカプセル化するApache Spark™のベクトル化UDFを使用しています。
CREATE OR REFRESH STREAMING LIVE TABLE predictions
COMMENT "Use the isolation forest vectorized udf registered in the previous step to predict anomalous transaction readings"
TBLPROPERTIES ("quality" = "gold")
AS SELECT cust_id, detect_anomaly(<enter by="" column="" commas="" names="" separated="">) as
anomalous from STREAM(live.transaction_readings_cleaned)
SQLを好むSQLアナリストやデータエンジニアは、すべてSQLデータパイプラインで、データサイエンティストがscikit-learn、xgboost、その他の機械学習ライブラリを用いてPythonでトレーニングした機械学習モデルを使うことできるのです!
これらのノートブックは、DLTパイプライン(以下のConfiguration Detailsセクションに詳細が表示されます)を作成するために使用されます。リソース、テーブルのセットアップ、依存関係の特定(そして、DLTがユーザーから抽象化するその他の複雑なオペレーション)のための少々の時間のあとで、DLTパイプラインはUIにレンダリングされ、トレーニングした機械学習モデルを用いてニアリアルタイムで継続処理され、異常なレコードが検知されます。
DLTユーザーインタフェースに表示されるエンドツーエンドのDelta Live Tablesパイプライン
このパイプラインの実行中に、ダッシュボードのリフレッシュ機能を用いて、異常なレコードが特定され、継続的に更新される可視化を行うためにDatabricks SQLを活用することができます。このようなダッシュボードは、以下のように'Predictions'テーブルに対して実行されるクエリーに基づく可視化を用いて構築されます。
推定された異常レコードをインタラクティブに表示するために構築されたDatabricks SQLのダッシュボード
まとめると、この記事では、異常検知のためのisolation forestアルゴリズムをトレーニングするために、Databricks機械学習とワークフローで利用できる機能の詳細と、この機能をニアリアルタイムで実行するための能力を有するDelta Live Tablesパイプラインの定義プロセスを説明しました。Delta Live Tablesは、エンドユーザーからプロセスの複雑性を抽象化し、自動化します。
この記事では、Delta Live Tablesの全ての機能のほんの一部に触れたに過ぎません。キーとなるDatabricksの機能に関しては読みやすいドキュメントがこちらで提供されています: https://docs.databricks.com/ja/delta-live-tables/index.html
ベストプラクティス
Delta Live TablesパイプラインはDatabricksワークフローのユーザーインタフェースで作成できます
ニアリアルタイムで異常検知を実行するには、DLTパイプラインは連続モードで実行される必要があります。公式なクイックスタート( https://docs.databricks.com/ja/delta-live-tables/tutorial-pipelines.html )で説明されているこのプロセスは、この記事のレポジトリにある前述のPython、SQLノートブックを用いて作成するために参照できます。他の設定は必要に応じて行なってください。
ソースシステムからレコードがバッチで収集されるなど、断続的なパイプライン実行が許容できる場合には、10分周期のトリガーモードでパイプラインを実行することができます。このトリガーパイプラインを指定したスケジュールを組み、それぞれの実行ではインクリメンタルな方法でパイプラインを通じてデータが処理されます。
そして、このパイプラインでは(処理のボトルネックなしに、パイプラインを通じて渡される様々な負荷のレコードに対応できるように)クラスターのオートスケーリングを有効化したら保存して、パイプラインを起動します。あるいは、これら全ての設定をJSONフォーマットで記述し、同じ入力フォームに入力することができます。
Delta Live Tablesは、クラスターの設定、背後のテーブルの最適化、エンドユーザーのためのその他の多数の重要な詳細を特定します。パイプラインを実行する際には、繰り返しの開発に貢献する開発モード、あるいはプロダクション向けのプロダクションモードを選択することができます。後者では、DLTは自動でリトライやクラスターの再起動を行います。
上述したすべてのことは、Delta Live TablesのREST APIでも行うことが可能であることを強調しておきます。これは例えば、この記事の前半で述べたようにスケジューリングされたジョブを通じてisolation forestを再トレーニングするようなケースで、ダウンタイムなしに、連続モードで実行中のDLTパイプラインをオンザフライで編集することができるようなプロダクションのシナリオで特に有用です。
この例におけるDelta Live Tablesパイプラインの設定。作成されたDeltaテーブルを保存するにはターゲットデータベース名を入力します
Databricksであなた自身のものを構築しましょう
このソリューションを再現するためのノートブックとステップバイステップの手順は以下のレポジトリに全て含まれています: https://github.com/sathishgang-db/anomaly_detection_using_databricks
モデルトレーニングのタスクでは、Databricks機械学習ランタイムのクラスターを使うようにしてください。ここで提供される例は若干シンプルなものですが、同じ原則はより複雑な変換処理にも適用でき、そのようなパイプラインを構築する際に本質的に内在する複雑性を削減するためにDelta Live Tablesが開発されました。あなたのユースケースにこのアイデアを導入していただけると幸いです。
これに加えて:
DLTの機能の素晴らしいデモとウォークスルー
Databricksにおける包括的なエンドツーエンドの機械学習ワークフロー