2024年1月26日 Oracle AI Brown Bag Seminar#5 「AIでサクッと異常を検出 - OCI Anomaly Detection -」で実施したデモをベースに、AI Service の OCI Anomaly Detectionを用いて異常検出システムを構築してみましょう。
参考までにスライド資料と動画を記載しておきます。
はじめに
異常検出システムを構築することで、センサーデータや売上データなどから異変や故障をといった通常と異なる挙動を示すものを自動で検出することができます。状態を人が確認する作業を無くしたり、不慮の事故を減らすことにもつながります。故障が減り生産性が向上も期待できます。異常検出を自動システム化することで様々なメリットを受けることができます。
以前、別の記事で異常検出システムを実現するAnomaly Detectionを使ったサンプルアーキテクチャを説明しました。
今回は、記事の中の同期検出のサンプルアーキテクチャを実際に構築し、異常検出を行ってみたいと思います。
OCI Anomaly Detectionとは
OCI Anomaly Detectionは、多変量または単変量の時系列データに対し、リアルタイムおよびバッチの異常処理をするAI Serviceです。異常検出のモデル作成やトレーニングが簡単にできるフルマネージドサービスです。
異常検出のアーキテクチャ
通常、異常検出はセンサーデータ、ログデータやSNSなどのデータソースからのデータ取得をトリガーに検出モデルを使って異常を検出します。今回は建物のセンサーデータを使用した、以下のようなシステムを構築していきます。
構築する流れは、以下の通りです。
異常検出のサンプルアーキテクチャに使われているサービスについて、それぞれの役割を簡単に説明します。
Service | 説明 |
---|---|
Sensors(Virtual Machine) | センサーデータをStreamingに送信します。仮想マシンでPythonコードを実行して疑似センサーデータを生成して送信します。 |
Streaming | メッセージブローカーとしてStreamingを使用します。センサーデータの送信処理と異常検出処理を非同期で実行させることができます。 |
Service Connector Hub | Streamingに送られたデータを取り出し、Functionsを起動して、データをObject Storageに格納します。また失敗時に再試行します。 |
Object Storage | センサーデータを格納します。 |
Functions | Service Connector Hubから起動され、Anomaly DetectionのAPIをCallし、異常を検出します。 |
Anomaly Detection | 異常検出のモデル作成とモデルを使った異常検出のAPIを提供します。 |
Notifications | 異常検出の結果をSlackに通知します。 |
異常検知のサンプルデータ
次に、異常検出システムで使用する処理済みのセンサーデータを説明します。含まれるデータ列の種類は、タイムスタンプ、建物の温度、湿度データの3種類です。
データセットは以下の3つです。
-
処理済のトレーニングCSVデータ:タイムスタンプ列を含む11個の信号、7299個の観測値
-
処理済のテストJSONデータ (正常データ) :タイムスタンプ列を含む11個の信号、5個の正常な観測値
-
処理済のテストJSONデータ (異常データ) :タイムスタンプ列を含む11個の信号、4個の正常な観測値、1個の異常な観測値
実装手順
では、ここから異常検出システムを構築していきます。前提として、OCI CLI, FnProject, dockerなどの環境設定は完了しているものとします。Pythonのライブラリは必要に応じてinstallしてください。
0. 事前準備
使用するコードの準備
コードはこちらのGitHubリポジトリのものを使用します。
$ git clone https://github.com/sh-sho/bbs_anomaly_detection.git
$ tree -L 1 bbs_anomaly_detection/
bbs_anomaly_detection/
├── README.md
├── con_func
├── data
└── send_data
Directory | Description |
---|---|
con_func | Functionsにデプロイするコードが格納されています。 |
data | モデルの作成時に使用したトレーニングデータと、異常検出に使う検出データが格納されています。 |
send_data | Sensorの代わりとして仮想マシンから検出データを送信するためのコードが格納されています。 |
データストアにデータを格納
今回はObject Storageをデータストアとして使用します。
ストレージ > オブジェクトストレージとアーカイブ・ストレージ >バケットを選択します。
「バケットの作成」をクリックし、バケット名を記入してObject Storageのバケットを作成します。
作成したバケットを選択し、「アップロード」からトレーニング用のCSVファイルをアップロードします。
これで、データストアにトレーニング用のデータを格納できました。
1. プロジェクトの作成
アナリティクスAI > AIサービス > 異常検出をクリックします。
次に、プロジェクト > プロジェクトの作成 をクリックします。
2. データアセットの登録
データ・アセット > データアセットの作成 をクリックします。コンパートメント、名前と説明を記入します。タイプは「Object Storage」を選択し、0. 事前準備で格納したバケットのトレーニングデータを選択します。「作成」を押してデータアセットを登録します。
3. 検出モデルの作成
作成したプロジェクトを選択します。
モデル > モデルの作成とトレーニング からモデルを構築します。
図のように、既存のデータ・アセットから登録したデータアセットを選択します。
次に「モデルのトレーニング」で、各パラメータを選択します。
各パラメータの機能は以下の通りです。
-
ターゲット誤警報確率 (FAP): 誤警報が発生する可能性です。モデルのパフォーマンスを示します。
-
トレーイング分割率: モデルのトレーニングに使用されるトレーニングデータ全体における割合です。残りのデータがモデルのパフォーマンス (FAPなど) を評価するのに使用されます。
-
Algorithm Hint: アルゴリズムにOne Class SVMかMSETのどちらを使用するかを指定します。
-
Window size: 1~100の値でWindow sizeを指定します。
最後に、「モデルの作成とトレーニング」をクリックしてモデルを構築します。
作成したモデルのOCIDは、この後使用するのでメモしておきます。
4. 異常検出
異常検出のアーキテクチャを作成します。
通知 Notifications
まず、異常検出の通知を送信するNotificationsから作成します。
開発者サービス > アプリケーション統合 > 通知を選択します。
トピック > トピックの作成 から名前と説明を入力してトピックを作成します。
作成したトピックを選択し、サブスクリプションの作成をします。
プロトコルはSlackを選択し、WebhookのURLを貼ります。
SlackのWebhookのURLはこちらを参考にして取得してください。
最後に、この後使用するトピックのOCIDをメモしておきます。
サーバーレス実行環境 Functions
Anomaly DetectionのAPIをたたく、サーバーレス実行環境のFunctionsを作成します。
こちらを参考にFucntionsを実装してください。
開発者サービス > ファンクション > アプリケーション を選択します。
次に、fn Projectでファンクションを作成していきます。
まず、fn init
コマンドでファンクションのひな型をcn_func
という名前で作成します。
$ mkdir bbs_app
$ cd bbs_app
$ fn init --runtime python con_func
Creating function at: ./con_func
Function boilerplate generated.
func.yaml created.
$ ls
con_func
次に、Functionsの内容をGitHubのコードに書き換えます。作成したひな形のcon_func/func.py
をbbs_anomaly_detection/con_func /con_func/func.py
のコードと置き換えます。
$ cd con_func/
$ ls
func.py func.yaml requirements.txt
$ cp -f ~/bbs_anomaly_detection/ con_func/con_func/func.py \
~/bbs_app/con_func/
同様に、con_func/requirements.txt
も、bbs_anomaly_detection/con_func /con_func/requirements.txt
と置き換えます。
$ cp -f ~/bbs_anomaly_detection/con_func/con_func/requirements.txt \
~/bbs_app/con_func/
$ ls
func.py func.yaml requirements.txt
次に、func.yaml
に環境変数を書き足します。以下のように、MODEL_OCID
とTOPIC_OCID
を指定します。
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256
# 以下を追加する
config:
MODEL_OCID: ocid1.aianomalydetectionmodel.oc1.xxx.amaaaaaaxxxxxx
TOPIC_OCID: ocid1.onstopic.oc1.xxx.amaaaaaaxxxxxxxxx
変更後、Functionsをデプロイします。
$ cd ..
$ pwd
/home/ubuntu/bbs_app
$ fn deploy --app bbs_app con_func
Deploying function at: ./con_func
Deploying con_func to app: bbs_app
...
Successfully created function: con_func with...
無事デプロイができれば、Functionsの設定は完了です。
データ収集 Streaming & Service Connector Hub
次に、センサーデータの収集と連携のために、StreamingとService Connector Hubを構築します。
まず、Streamingから作成します。アナリティクスとAI > メッセージング > ストリーミング > ストリームの作成 を選択します。
ストリームの詳細を入力して作成します。すべてデフォルトの値を設定しています。
作成したストリーミングを選択し、この後使用するOCIDとメッセージ・エンドポイントをメモしておきます。
次に、Service Connector Hubを作成します。アナリティクスとAI > メッセージング > コネクタ・ハブ > サービス・コネクタの作成 を選択します。
ソース、タスク、ターゲットを以下のように設定します。
- ソース: Streaming
- タスク: Functions
- ターゲット: Object Storage
ソースには先ほど作成したStreamingの情報を入力します。
タスクには先ほど作成したFunctionsの情報を入力します。
ターゲットには先ほど作成したObject Storageを指定します。ここで指定するObject Storageはセンサーデータを格納するためのものなので、別のObject Storageを指定しても良いです。
以上で、StreamingとService Connector Hubの構築は完了です。
センサーデータの送信 Virtual Machine
最後に、Virtual Machineから疑似的なセンサーデータをStreamingに送信します。
Virtual Machineを立てて、先ほどのGitHubのリポジトリをcloneします。
先に、送信するデータを確認します。
$ cd bbs_anomaly_detection/data/
$ tree -L 2
.
├── error_data
│ ├── error_data01.json
│ ├── error_data02.json
│ ├── error_data03.json
│ ├── error_data04.json
│ └── error_data05.json
├── normal_data
│ ├── normal_data01.json
│ ├── normal_data02.json
│ ├── normal_data03.json
│ ├── normal_data04.json
│ └── normal_data05.json
└── training_data
└── training_data.csv
送信するデータはnormal_data
が5セット、error_data
が5セットあります。
normal_data
には、10個の5つの建物の温度のセンサーデータと5つの建物の圧力のセンサーデータが入っています。
$ cat normal_data/normal_data01.json
[
{
"values": [
0.4738691915899007,
-0.2103424394727376,
0.024398945908493404,
1.178795358083708,
-0.0027861332237756747,
-0.3361819458453297,
0.46908916038037074,
-0.8186905010094537,
-0.4766573216296686,
0.21636488871770235
]
}
]
error_data
も同様の値が入っています。5つ目のデータセットの最後の圧力の値に異常値を設定しています。
$ cat error_data/error_data05.json
[
{
"values": [
-1.5429359828621247,
-0.16674988024549042,
-0.43808546982885677,
-2.2666591015849966,
1.002076046036585,
-0.583356777094332,
3.4045540252865725,
-1.674770619920698,
-4.497768611265446,
100000000000000
]
}
]
このセンサーデータにタイムスタンプを付けて、Virtual Machineから送信します
必要な環境変数を設定します。.env
ファイルをbbs_anomaly_detection
ディレクトリ配下に作成します。
$ cd ~/bbs_anomaly_detection/
$ touch .env
$ ls -a
. .. .env .git .gitignore README.md con_func data send_data
.env
ファイル内にstream_ocid
とmessage_endpoint
を設定します。
stream_ocid = "ocid1.stream.oc1.xxx.amaaaaaaxxxxxxxxx"
message_endpoint ="https://cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com"
send_data
のディレクトリに移動します。send_normal_data.py
が正常なデータを送信するPythonコードで、send_error_data.py
が異常なデータを送信するPythonコードです。
$ cd ~/bbs_anomaly_detection/send_data/
$ ls
send_error_data.py send_normal_data.py
Virtual Machine上でコードを実行します。コードを実行するとoffsetの値と送信したデータの内容が返ってきます。
まずは正常なデータを送信した場合です。
$ python3 send_normal_data.py
Publishing 1 messages to the stream ocid1.stream.oc1.ap-tokyo-1......
...
Published message to partition 0 , offset 4
[{'values': [0.4738691915899007, -0.2103424394727376, 0.024398945908493404, 1.178795358083708, -0.0027861332237756747, -0.3361819458453297, 0.46908916038037074, -0.8186905010094537, -0.4766573216296686, 0.21636488871770235], 'timestamp': '2024-01-29T14:38:00Z'}]
問題なく構成されていると以下のような値がSlackに返ってきます。Slackへの通知にはAnomaly Detectionから返ってきた異常検出結果が含まれています。
正常時はdetection_results
の値が空白なJSONデータがAnomaly Detectionから返ってきます。本番で稼働させる際は、正常時はSlackでの通知は必要ありませんが、今回は比較しやすいように通知を行っています。
次に、異常値を含んだセンサーデータを送信します。
$ python3 send_error_data.py
Publishing 1 messages to the stream ocid1.stream.oc1.ap-tokyo-1...
...
Published message to partition 0 , offset 9
[{'values': [-0.015539492872891303, -0.091931079449716, -0.21391934608376093, -1.531492800753115, -0.2427943533533093, -1.6779543077755958, -0.5892422126416113, 0.3210492038392434, -0.4265569448060912, 0.041516327170202046], 'timestamp': '2024-01-29T15:02:57Z'}]
異常検出された場合は、以下のような値がSlackが返ってきます。
異常時はAnomaly Detectionで検出した結果が含まれています。Anomaly DetectionからのResponseには以下の情報が含まれています。
Key | Description |
---|---|
anomaly_value | 実際のセンサー値 |
anomaly_score | 異常の発生確率や連続性に基づいてOracleが算出した異常スコア |
estimated_value | センサーの推定値 |
imputed_value | 欠損値補完が行われた場合の補完値 |
signal_name | 異常があったシグナル名 |
row_index | timestampがない場合に、異常ポイントがどこにあるかを示すインデックス番号 |
timestamp | タイムスタンプ |
anomaly_value
やsignal_name
の値によって、異常時の操作を変更させることもできます。
まとめ
今回はAnomaly Detectionを使って、建物のセンサーデータの異常検出モデルの作成を行いました。またOCIのサービスを使い異常検出システムも構築し、異常検出を行いました。
参考
Oracle Cloud Infrastructureドキュメント 異常検知
Oracle Cloud Infrastructure Anomaly Detectionを使って簡単に異常検出をしてみる