0
0

More than 1 year has passed since last update.

Delta Live TablesとDatabricksマシンラーニングによるニアリアルタイム不正検知

Posted at

Near Real-Time Anomaly Detection with Delta Live Tables and Databricks Machine Learning - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

なぜ不正検知が重要なのか?

小売、金融、サイバーセキュリティなどいかなる業界においても、不正な挙動が発生したらすぐに検知することは間違いなく優先度が高いと言えます。この様な能力の欠如は、利益の損失、規制機関からの罰金、顧客プライバシーの侵害、サイバーセキュリティの場合にはセキュリティ侵害によって信頼が損なわれます。このため、このような比較的不自然な大量のクレジットカードのトランザクションの発見、怪しいユーザーの行動の特定、Webサービスにおける不自然なリクエストボリュームのパターンの識別によって、日々の業務を素晴らしいものにするのか、あるいはひどい災害になるのかを二分します。

不正検知における課題

不正検知はいくつかの課題を突きつけます。1つ目は、「不正」とはどの様なものかというデータサイエンスの課題です。幸運なことに、機械学習がデータから正常なものから不正なパターンをどの様に区別するのかを学習するパワフルなツールを備えています。不正検知の場合、全ての不正がどの様なものであるのかを知ることは不可能なので、このためのリソースが利用できる場合でも機械学習モデルのトレーニングのためのデータセットをラベル付けすることは不可能です。このため、不正を検知するには、ラベル無しのデータからパターンを学習する教師無し学習が用いられてきました。

完璧な教師なし不正検知機械学習モデルが分かったとしても、多くの場合、現実的な問題がスタートします。ソースシステムからデータが到着するとすぐに新たな観測事項が投入され、変換処理を経てモデルによって最終的なスコアが計算される様なプロダクション環境にモデルを投入するベストな方法とはどのようなものでしょうか?そして、5-10分のように短い周期あるいはニアリアルタイムで処理を行うにはどうしたらいいのでしょうか?これには、洗練されたETLパイプラインの構築、不正レコードを適切に識別することができる教師無し機械学習モデルのインテグレーションが含まれます。また、このエンドツーエンドのパイプラインは、プロダクションレベルである必要があり、データ取り込みからモデル推論に至るデータ品質を保証し、背後のインフラストラクチャを維持管理しなくてはなりません。

Databricksレイクハウスプラットフォームによる課題解決

Databricksを用いることで、このプロセスはシンプルなものとなります。SQLで全てのニアリアルタイム不正検知パイプラインを構築し、機械学習モデルをトレーニングするためにPythonのみを活用することができます。データ取り込み、変換処理、モデル推論すべてのをSQLで行うこともできます。

特に、この記事では不正レコードの検知に適しているアイソレーションフォレストアルゴリズムのトレーニング、トレーニング済みモデルのDelta Live Tables(DLT)で作成したストリーミングデータパイプラインへのインテグレーションを説明します。DLTはデータエンジニアリングプロセスを自動化するETLフレームワークです。DLTでは高信頼データパイプラインを作成するためのシンプルな宣言型アプローチを採用しており、バッチ、ストリーミングデータにおける大規模インフラストラクチャを完全に管理します。成果物はニアリアルタイムの不正検知システムとなります。特に、この記事で使用するデータは、Kaggleのクレジットカードトランザクションをシミュレートする目的のもと生成した合成データのサンプルであり、検知された異常値は不正トランザクションとなります。

本記事で説明される不正検知ソリューションをベースとしたMLとDelta Live Tablesのアーキテクチャ

scikit-learnのアイソレーションフォレストアルゴリズム実装は、Databricks機械学習ランタイムでデフォルトで利用でき、不正検知モデルがトレーニングされた際に追跡、記録酢するためにMLflowフレームワークを使用します。ETLパイプラインはDelta Live Tablesを用いてすべてSQLで開発されます。

ラベル無しデータにおける不正検知のためのアイソレーションフォレスト

アイソレーションフォレストは、ランダムフォレストと同様なツリーベースのアンサンブルアルゴリズムです。このアルゴリズムでは、指定された観測セットにおいて正常値は外れ値(不正な観測)よりも分離が困難であることを前提としています。ハイレベルにおいては、不正でないポイント、すなわち、正常なクレジットカードトランザクションが分離が困難な決定木の深いところに存在しており、外れ値に関してはその逆が成り立つことを意味します。このアルゴリズムはラベル無しのデータに対してトレーニングを行い、これまでに観測していないデータに対して不正レコードを予測するために活用することができます。

外れ値の分離は正常値の分離より簡単です

モデルトレーニングとトラッキングにおいてDatabricksはどの様に役に立つのか?

Databricksで機械学習に関わることを行う際には、機械学習(ML)ランタイムのクラスターが必須となります。MLランタイムでは、データサイエンスや機械学習に関連するタスクで一般的に使用される数多くのオープンソースライブラリがデフォルトで利用できます。scikit-learnはこれらのライブラリの一つであり、アイソレーションフォレストアルゴリズムの素晴らしい実装を提供しています。

どの様にモデルを定義するのかを以下に示します。

Python
from sklearn.ensemble import IsolationForest
isolation_forest = IsolationForest(n_jobs=-1, warm_start=True, random_state=42)

このランタイムによって、機械学習エクスペリメントのトラッキング、モデルのステージング、デプロイメントのためのノートブック環境とMLflowの密なインテグレーションが実現されます。

MLクラスターにアタッチされたノートブック環境で行われる全てのモデルトレーニンやハイパーパラメーターの最適化処理は、デフォルトで有効化されているMLflowのオートロギングによって自動で記録されます。

モデルが記録されると、さまざまな方法でMLflowのモデルを登録、デプロイすることが可能になります。特に、Apache Spark™による分散ストリーム、バッチ推論におけるベクトル化ユーザー定義関数(UDF)としてこのモデルをデプロイするために、以下の画像のように、MLflowはUDFを作成、登録するためのコードをユーザーインタフェース(UI)上に生成します。

MLflowはモデル推論のためのApache Spark UDFを作成、登録するためのコードを生成します

これに加え、MLflowのREST APIを用いることで、以下の様に関数にきちんとまとめられる数行のコードで、プロダクション状態の既存モデルをアーカイブし、新たにトレーニングしたモデルをプロダクションに移行することができます。

Python
def train_model(mlFlowClient, loaded_model, model_name, run_name)->str:
  """
  Trains, logs, registers and promotes the model to production. Returns the URI of the model in prod
  """
  with mlflow.start_run(run_name=run_name) as run:

    # 0. Fit the model 
    loaded_model.fit(X_train)

    # 1. Get predictions 
    y_train_predict = loaded_model.predict(X_train)

    # 2. Create model signature 
    signature = infer_signature(X_train, y_train_predict)
    runID = run.info.run_id

    # 3. Log the model alongside the model signature 
    mlflow.sklearn.log_model(loaded_model, model_name, signature=signature, registered_model_name= model_name)

    # 4. Get the latest version of the model 
    model_version = mlFlowClient.get_latest_versions(model_name,stages=['None'])[0].version

    # 5. Transition the latest version of the model to production and archive the existing versions
    client.transition_model_version_stage(name= model_name, version = model_version, stage='Production', archive_existing_versions= True)


    return mlFlowClient.get_latest_versions(model_name, stages=["Production"])[0].source

プロダクションのシナリオにおいては、モデルに一度に1レコードのみのスコアリングを行わせたいと思うかもしれません。Databricksでは、「確実に一度のみ」の挙動を保証するためにAuto Loaderを使うことができます。Auto Loaderは、PythonやSQLを用いて、Delta Live Tablesや構造化ストリーミングアプリケーションと連携します。

検討すべき別の重要な要素は、環境的なものであれ行動的なものであれ、不正が発生する特性が時間と共に変化するということです。このため、新たなデータが到着するたびにモデルを再トレーニングする必要があります。

モデルトレーニングのロジックを持つノートブックは、ジョブが実行されるたびに最新モデルを効率的に再トレーニングし、プロダクションに投入するDatabricksワークフローのスケジュールされたジョブとして、プロダクションに移行することができます。

Delta Live Tablesによるニアリアルタイム不正検知の実現

ここでの機械学習の側面は、課題の一部を表現しているに過ぎません。おそらく、最も困難なのはデータ取り込み、変換処理、モデル推論を組み合わせるプロダクションレベルのニアリアルタイムデータパイプラインの構築です。このプロセスは、複雑で時間を浪費し、エラーに脆弱なものになることがあります。

このための常時稼働かつエラーハンドリングを伴うインフラストラクチャの構築と運用は、データエンジニアリングよりもソフトウェアエンジニアリングのノウハウが必要となります。また、パイプライン全体でデータ品質が保証されなくてはなりません。特定のアプリケーションによっては、さらに複雑性の次元が増加することになります。

ここでDelta Live Tables(DLT)が登場します。

DLTの用語では、ノートブックライブラリとはDLTパイプラインの全体あるいは部分的なコードを含むノートブックです。DLTパイプラインは1つ以上のノートブックが関連づけられ、それぞれのノートブックはSQLあるいはPython構文を使うことができます。最初のノートブックライブラリには、MLflowモデルレジストリからモデルを取得し、取り込まれたレコードがパイプラインの後段で特徴量生成されたらモデル推論関数を使用できる様にUDFを登録するためにPythonで実装されたロジックが含まれています。

ティップ
DLT Pythonノートブックでは、最初のセルで%pipマジックコマンドを用いてパッケージをインストールする必要があります。

2つ目のDLTライブラリノートブックは、PythonあるいはSQL構文で構成することができます。DLTの汎用性を示すために、データ取り込み、変換処理、モデル推論を実行するためにSQLを使用しました。このノートブックには、パイプラインを構成する実際のデータ変換ロジックが含まれています。

取り込みはAuto Loaderで行われ、オブジェクトストレージにインクリメンタルにデータがロードされます。これはメダリオンアーキテクチャのブロンズ(生データ)テーブルに読み込まれます。また、以下に示す構文では、ストリーミングライブテーブルがオブジェクトストレージから連続的にデータが取り込んでいることに注意してください。Auto Loaderはデータが取り込まれるとスキーマを検知する様に設定されています。また、Auto Loaderは多くの現実世界における不正検知シナリオで適用されるスキーマの変化に対応することができます。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE transaction_readings_raw
COMMENT "The raw transaction readings, ingested from landing directory"
TBLPROPERTIES ("quality" = "bronze")
AS SELECT * FROM cloud_files("/FileStore/tables/transaction_landing_dir", "json", map("cloudFiles.inferColumnTypes", "true"))

また、DLTでは、データ品質制約を定義し、開発者やアナリストにいかなるエラーを修正する能力を提供します。特定のレコードが指定された制約に合致しない場合、DLTはレコードを保持するか、削除するか、パイプライン全体を停止します。以下の例では、トランザクションの時刻や金額が存在しない場合にはレコードを削除するように変換処理の一つで制約が定義されています。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE transaction_readings_cleaned(
  CONSTRAINT valid_transaction_reading EXPECT (AMOUNT IS NOT NULL AND TIME IS NOT NULL) ON VIOLATION DROP ROW
)
TBLPROPERTIES ("quality" = "silver")

COMMENT "Drop all rows with nulls for Time and store these records in a silver delta table"
AS SELECT * FROM STREAM(live.transaction_readings_raw)

また、Delta Live Tablesはユーザー定義関数(UDF)をサポートしています。SQLを用いたストリーミングDLTパイプラインでモデル推論を行うためにUDFを使うことができます。以下のサンプルでは、前のステップで登録したトレーニング済みアイソレーションフォレストモデルをカプセル化するApache Spark™のベクトル化UDFを使用しています。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE predictions
COMMENT "Use the isolation forest vectorized udf registered in the previous step to predict anomalous transaction readings"
TBLPROPERTIES ("quality" = "gold")
AS SELECT cust_id, detect_anomaly(<enter by="" column="" commas="" names="" separated="">) as 
anomalous from STREAM(live.transaction_readings_cleaned)
</enter>

これは、SQLを好むSQLアナリストやデータエンジニアが、データサイエンティストがscikit-learnやxgboostなどの機械学習ライブラリを用いてPythonでトレーニングした機械学習モデルを、推論のためにSQLのパイプラインで活用できるのでエキサイティングなことです!

これらのノートブックはDLTパイプラインを作成するために使用します(以下の設定詳細のセクションで詳細を説明しています)。リソース、テーブルのセットアップ、依存関係の特定の後にDLTパイプラインがUIに表示され、連続的にデータが処理され、トレーニングした機械学習モデルを用いてニアリアルタイムで不正レコードが検知されます。

DLTユーザーインタフェースに表示されるエンドツーエンドのDelta Live Tablesパイプライン

パイプラインが実行されている間、検知された不正レコードを可視化するためにDatabricks SQLを活用することができ、Databricks SQLダッシュボードのリフレッシュ機能によって不正レコードを連続的に特定、アップデートすることができます。以下の様に、Predictionsテーブルに対して実行されるクエリーに基づいて可視化されたダッシュボードを参照することができます。

予測された不正レコードをインタラクティブに表示するためのDatabricks SQLダッシュボード

まとめると、この記事では不正検知のためのアイソレーションフォレストアルゴリズムのトレーニングに使用したDatabricksマシンラーニングとワークフローの機能の詳細を説明し、この機能をニアリアルタイムで実行することができるDetla Live Tablesパイプラインを定義するプロセスを説明しました。Delta Live Tablesはエンドユーザーのプロセスを抽象化し、自動化します。

この記事では、Delta Live Tablesのすべての機能の表面をなぞっただけです。このキーとなるDatabricksの機能のドキュメントに関しては以下を参照ください。

ベストプラクティス


DatabricksワークフローのユーザーインタフェースからDelta Live Tablesパイプラインを作成できます

ニアリアルタイムで不正検知を行うには、DLTパイプラインを連続モードで実行する必要があります。このプロセスは公式のクイックスタートで説明されており、このブログ記事のリポジトリからアクセスできる説明済みのPython、SQLノートブックの作成に進むことができます。必要であれば他の設定を行うこともできます。

ソースシステムから取得されるレコードの異常検知がバッチで行われ、断続的なパイプラインが許容できる場合には、パイプラインを10分といった周期のトリガーモードで実行することができます。そして、このトリガーパイプラインを実行するためにスケジュールを指定すると、データはパイプラインでインクリメンタルに処理されます。

そして、(ボトルネックを処理することなしにパイプラインに渡されるレコードのの負荷変動に対応するために)オートスケーリングが有効化されたパイプライン設定を保存し、パイプラインをスタートします。あるいは、これらすべての設定をJSONで記述し、同じ入力フォームに入力します。

Delta Live Tablesはクラスター設定、背後のテーブル最適化処理、エンドユーザーにとって重要な数多くの詳細を特定します。稼働中のパイプラインに対して、繰り返しが発生する開発に適したDevelopmentモード、あるいはプロダクションに特化したProductionモードを選択することができます。後者のモードでは、DLTは自動でリトライやクラスターの再起動を行いまうs。

上で説明したことのすべてはDelta Live Tables REST APIを通じて行うことができることを強調しておきます。これは、この記事の前半で述べた様にスケジュールジョブを通じて毎回アイソレーションフォレストを再トレーニングするなど、ダウンタイムなしに連続モードの実行中パイプラインをその場で編集するというプロダクションシナリオでは有用となります。

この例におけるDelta Live Tablesの設定。作成されたDeltaテーブルを格納するデータベース名を入力します。

Databricksで構築を始めてみる

このソリューションを再現するステップバイステップの手順とノートブックはすべて以下のリポジトリに格納されています。

モデルトレーニングのタスクではDatabricks機械学習ランタイムのクラスターを使う様にしてください。ここで示したサンプルは比較的シンプルですが、より複雑な変換処理においても同じ原則は適用され、Delta Live Tablesはその様なパイプラインを構築する際に本質的に含まれる不雑性を削減します。この記事で説明されているアイデアをみなさまのユースケースで活用いただけたらと思います。

DLTの機能に関する素晴らしいデモとウォークスルー

Databricksにおけるエンドツーエンドの機械学習ワークフロー

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0