背景・目的
SageMaker向けのビルドインアルゴリズムのRandom Cut Forestを試してみようと思います。
まとめ
- RCFは教師なしアルゴリズム
- データセット内の異常なデータポイントや外れ値を検出できる。
概要
Random Cut Forest (RCF) アルゴリズム
データセット内の異常なデータポイントを検出する、教師なしのアルゴリズム。
異常は、時系列データにおける突然のスパイク、周期性の中断、分類できないデータポイントとして現れる場合がある。
各データポイントで、RCFは異常スコアを関連付ける。
- 低スコスであれば、データポイントは通常とみなされ、高い値は、データ内に異常があると示される。
- 異常の定義は、アプリケーションにより異なるが、一般的な方法としては3標準偏差を超えるスコアであれば異常とみなされる。
交通量解析や音量スパイクなど、異常検出アルゴは1次元時系列でターに広く応用される。これに対してRCFは任意の次元入力で機能するように設計されている。
SageMaker RCFは、機能数、データセットのサイズ、インスタンス数に関して適切にスケーリングされる。
RCF アルゴリズムの入出力インターフェイス
- SageMaker RCFは、トレーニングデータとテストデータのコンテンツタイプは以下の通り。text/csvを使用する場合は、コンテンツをtext/csv;label_size=1として指定する必要がある。
- application/x-recordio-protobuf
- text/csv
- 各行の最初の列は異常ラベルを表す。異常データポイントの場合は「1」、通常のデータポイントの場合は「0」
- ファイルモードまたはパイプモードで使用すると、recordIO-wrapped-protobufまたはCSV形式のデータについてRCFモデルをトレーニングできる。
- 必要なIAMポリシーは以下の通り
- AmazonSageMakerFullAccess
- AmazonEC2ContainerRegistryFullAccess
RCF アルゴリズムのインスタンスに関する推奨事項
- トレーニングには、以下のインスタンスファミリーを推奨とのこと。
- ml.m4
- ml.c4
- ml.c5
- 推論には、最大限のパフォーマンスと使用時間当たりのコスト最小化のために、ml.c5.xl インスタンスタイプを推奨とのこと。
- GPUを利用することはない。
実装
- An Introduction to SageMaker Random Cut Forestsを元に試します。
- Numenta Anomaly Benchmark (NAB) NYC Taxi のデータセットをでモデルをトレーニングします。
- このデータセットは、ニューヨーク市のタクシー利用者を6ヶ月間記録している
- このモデルにより、各データポイントの異常スコアを出力することで、異常なイベントを予測します。
Setup
- インスタンスタイプは、ml.t3.mediumしたとのこと。
Select Amazon S3 Bucket
- サンプルコードを元に、以下を実行する。
import boto3 import botocore import sagemaker import sys bucket = ( sagemaker.Session().default_bucket() ) # Feel free to change to another bucket you have access to prefix = "sagemaker/rcf-benchmarks" execution_role = sagemaker.get_execution_role() region = boto3.Session().region_name # S3 bucket where the original data is downloaded and stored. downloaded_data_bucket = f"sagemaker-sample-files" downloaded_data_prefix = "datasets/tabular/anomaly_benchmark_taxi" def check_bucket_permission(bucket): # check if the bucket exists permission = False try: boto3.Session().client("s3").head_bucket(Bucket=bucket) except botocore.exceptions.ParamValidationError as e: print( "Hey! You either forgot to specify your S3 bucket" " or you gave your bucket an invalid name!" ) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "403": print(f"Hey! You don't have permission to access the bucket, {bucket}.") elif e.response["Error"]["Code"] == "404": print(f"Hey! Your bucket, {bucket}, doesn't exist!") else: raise else: permission = True return permission if check_bucket_permission(bucket): print(f"Training input/output will be stored in: s3://{bucket}/{prefix}") if check_bucket_permission(downloaded_data_bucket): print( f"Downloaded training data will be read from s3://{downloaded_data_bucket}/{downloaded_data_prefix}" )
- 成功すると、以下のメッセージが表示されます。
- エラーメッセージが表示される場合は、バケットの存在が実在しているか、またはIAMポリシーの確認が必要です。
Training input/output will be stored in: s3://{バケット名}/sagemaker/rcf-benchmarks Downloaded training data will be read from s3://sagemaker-sample-files/datasets/tabular/anomaly_benchmark_taxi
Obtain and Inspect Example Data
-
Numenta Anomaly Benchmark (NAB) NYC Taxi datasetから取得したデータセットが、sagemaker-sample-filesに格納されている。これをダウンロードします。
%%time import pandas as pd data_filename = "NAB_nyc_taxi.csv" s3 = boto3.client("s3") s3.download_file(downloaded_data_bucket, f"{downloaded_data_prefix}/{data_filename}", data_filename) taxi_data = pd.read_csv(data_filename, delimiter=",")
-
ダウンロードが成功すると以下のようにCPUタイムなどが表示されます。
CPU times: user 187 ms, sys: 32.5 ms, total: 220 ms Wall time: 1.03 s
-
モデルのトレーニング前に、データを検査します。
モデルのトレーニング前には、データを検査することで以下が可能になるため重要とのこと。- 基本的なパターンや構造の確認
- 前処理して除去できるノイズの発見
headにより、先頭の5行が表示されます。 - timestampとvalueのカラムが存在する。
- timestampは、30分毎の年月日、時分秒まで格納されている
- valueは乗車回数
taxi_data.head()
-
次に視覚化し、概要を確認します。
%matplotlib inline import matplotlib import matplotlib.pyplot as plt matplotlib.rcParams["figure.dpi"] = 100 taxi_data.plot()
-
以下のようにプロットされました。以下のことがわかりました。
-
6000付近のデータを確認します。
taxi_data[5500:6500].plot()
-
以下のようにプロットされました。
-
一日のトレンドを確認してみます。
taxi_data[5952:6000] taxi_data[5952:6000].plot
-
1日分の乗車回数のデータセット
``` timestamp value 5952 2014-11-02 00:00:00 25110 5953 2014-11-02 00:30:00 23109 5954 2014-11-02 01:00:00 39197 5955 2014-11-02 01:30:00 35212 5956 2014-11-02 02:00:00 13259 5957 2014-11-02 02:30:00 12250 5958 2014-11-02 03:00:00 10013 5959 2014-11-02 03:30:00 7898 5960 2014-11-02 04:00:00 6375 5961 2014-11-02 04:30:00 4532 5962 2014-11-02 05:00:00 5116 5963 2014-11-02 05:30:00 5232 5964 2014-11-02 06:00:00 4542 5965 2014-11-02 06:30:00 5298 5966 2014-11-02 07:00:00 5155 5967 2014-11-02 07:30:00 6029 5968 2014-11-02 08:00:00 6280 5969 2014-11-02 08:30:00 8771 5970 2014-11-02 09:00:00 10151 5971 2014-11-02 09:30:00 12501 5972 2014-11-02 10:00:00 13990 5973 2014-11-02 10:30:00 16534 5974 2014-11-02 11:00:00 17133 5975 2014-11-02 11:30:00 18775 5976 2014-11-02 12:00:00 18985 5977 2014-11-02 12:30:00 19911 5978 2014-11-02 13:00:00 19123 5979 2014-11-02 13:30:00 19524 5980 2014-11-02 14:00:00 19640 5981 2014-11-02 14:30:00 18364 5982 2014-11-02 15:00:00 17940 5983 2014-11-02 15:30:00 17949 5984 2014-11-02 16:00:00 17288 5985 2014-11-02 16:30:00 16326 5986 2014-11-02 17:00:00 17522 5987 2014-11-02 17:30:00 19243 5988 2014-11-02 18:00:00 20291 5989 2014-11-02 18:30:00 21649 5990 2014-11-02 19:00:00 22839 5991 2014-11-02 19:30:00 21772 5992 2014-11-02 20:00:00 20994 5993 2014-11-02 20:30:00 19774 5994 2014-11-02 21:00:00 18398 5995 2014-11-02 21:30:00 17764 5996 2014-11-02 22:00:00 17334 5997 2014-11-02 22:30:00 15431 5998 2014-11-02 23:00:00 12958 5999 2014-11-02 23:30:00 10224 ```
Training
SageMaker トレーニングジョブを構成して、タクシーデータでランダムカットフォレスト (RCF) アルゴリズムをトレーニングします。
Hyperparameters
SageMaker RCF トレーニングジョブに特有のものは、次のハイパーパラメータです。
- num_samples_per_tree
- 各ツリーに送信されるランダムにサンプリングされたデータポイントの数。 原則として、1/num_samples_per_tree は、データセット内の正常点に対する異常点の推定比率に近似する必要があります。
- num_trees
- フォレスト内に作成するツリーの数。 各ツリーは、異なるデータサンプルから個別のモデルを学習します。 完全なフォレスト モデルは、各構成ツリーから予測された平均異常スコアを使用します。
- feature_dim
- 各データポイントの次元。
-
以下のコマンドを実行してトレーニングします。
from sagemaker import RandomCutForest session = sagemaker.Session() # specify general training job information rcf = RandomCutForest( role=execution_role, instance_count=1, instance_type="ml.m4.xlarge", data_location=f"s3://{bucket}/{prefix}/", output_path=f"s3://{bucket}/{prefix}/output", num_samples_per_tree=512, num_trees=50, ) # automatically upload the training data to S3 and run the training job rcf.fit(rcf.record_set(taxi_data.value.to_numpy().reshape(-1, 1)))
-
しばらくすると完了します。
2022-12-31 02:24:03 Uploading - Uploading generated training model 2022-12-31 02:24:03 Completed - Training job completed Training seconds: 147 Billable seconds: 147
-
実行完了したジョブを確認します。
print(f"Training job name: {rcf.latest_training_job.job_name}")
-
完了は以下の画面からでも確認が可能です。
Inference
deploy()関数を使用して推論エンドポイントを作成する。
推論が計算されるインスタンスタイプと起動するインスタンスの初期数を指定する。
-
デプロイします。
- インスタンス数は、1
- インスタンスタイプはml.m4.xlarge
rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")
-
エンドポイント名を確認します。
print(f"Endpoint name: {rcf_inference.endpoint}")
-
コンソール名からもエンドポイントを確認します。
Data Serialization/Deserialization
さまざまな形式のデータを推論エンドポイントに渡すことができます。 この例では、CSV 形式のデータを渡す方法を示します。
(その他の使用可能な形式は、JSON 形式および RecordIO Protobuf です。 推論エンドポイントを構成するときに、SageMaker Python SDK ユーティリティ csv_serializer および json_deserializer を使用します。)
- CSV形式のシリアライザーとJSONのでシリアライザーを指定します。
from sagemaker.serializers import CSVSerializer from sagemaker.deserializers import JSONDeserializer rcf_inference.serializer = CSVSerializer() rcf_inference.deserializer = JSONDeserializer()
- 異常検知のために、トレーニングデータセットをCSV形式で推論エンドポイントに渡す。下記で6データポイントのみ渡しています。
taxi_data_numpy = taxi_data.value.to_numpy().reshape(-1, 1) print(taxi_data_numpy[:6]) results = rcf_inference.predict( taxi_data_numpy[:6], initial_args={"ContentType": "text/csv", "Accept": "application/json"} ) === [[10844] [ 8127] [ 6210] [ 4656] [ 3820] [ 2873]]
Computing Anomaly Scores
-
全てのデータセット全体から異常スコアを計算してプロットします。
results = rcf_inference.predict(taxi_data_numpy) scores = [datum["score"] for datum in results["scores"]] # add scores to taxi data frame and print first few values taxi_data["score"] = pd.Series(scores, index=taxi_data.index) taxi_data.head() === timestamp value score 0 2014-07-01 00:00:00 10844 0.922123 1 2014-07-01 00:30:00 8127 0.990607 2 2014-07-01 01:00:00 6210 0.930758 3 2014-07-01 01:30:00 4656 0.843785 4 2014-07-01 02:00:00 3820 0.885350
fig, ax1 = plt.subplots() ax2 = ax1.twinx() # # *Try this out* - change `start` and `end` to zoom in on the # anomaly found earlier in this notebook # start, end = 0, len(taxi_data) # start, end = 5500, 6500 taxi_data_subset = taxi_data[start:end] ax1.plot(taxi_data_subset["value"], color="C0", alpha=0.8) ax2.plot(taxi_data_subset["score"], color="C1") ax1.grid(which="major", axis="both") ax1.set_ylabel("Taxi Ridership", color="C0") ax2.set_ylabel("Anomaly Score", color="C1") ax1.tick_params("y", colors="C0") ax2.tick_params("y", colors="C1") ax1.set_ylim(0, 40000) ax2.set_ylim(min(scores), 1.4 * max(scores)) fig.set_figwidth(10)
-
平均スコアから 3 標準偏差 (約 99.9 パーセンタイル) を超えるスコアを持つデータ ポイントをプロットします。
score_mean = taxi_data["score"].mean() score_std = taxi_data["score"].std() score_cutoff = score_mean + 3 * score_std anomalies = taxi_data_subset[taxi_data_subset["score"] > score_cutoff] anomalies
-
異常スコアをプロットします。
ax2.plot(anomalies.index, anomalies.score, "ko") fig
Stop and Delete the Endpoint
Epilogue
RCF アルゴリズムは、たとえば、データが周期性を壊したり、グローバルな動作を異常に変更したりする時期を検出することもできます。
持っているデータの種類に応じて、アルゴリズムのパフォーマンスを改善する方法がいくつかあります。
たとえば、1 つの方法は、適切なトレーニングセットを使用することです。 特定のデータ セットが「正常な」動作の特徴であることがわかっている場合、そのデータ セットをトレーニングすると、「異常な」データをより正確に特徴付けることができます。
もう 1 つの改善点は、「シングリング」と呼ばれるウィンドウ処理手法を利用することです。 これは、上記で使用した NYC タクシー データセットなど、周期がわかっている定期的なデータを操作する場合に特に便利です。 アイデアは、データポイントの期間を特徴の長さの単一のデータポイントとして扱い、これらの特徴ベクトルに対して RCF アルゴリズムを実行することです。 つまり、元のデータがポイントで構成されている場合、変換を実行します。
import numpy as np
def shingle(data, shingle_size):
num_data = len(data)
shingled_data = np.zeros((num_data - shingle_size, shingle_size))
for n in range(num_data - shingle_size):
shingled_data[n] = data[n : (n + shingle_size)]
return shingled_data
# single data with shingle size=48 (one day)
shingle_size = 48
prefix_shingled = "sagemaker/randomcutforest_shingled"
taxi_data_shingled = shingle(taxi_data.values[:, 1], shingle_size)
print(taxi_data_shingled)
session = sagemaker.Session()
# specify general training job information
rcf = RandomCutForest(
role=execution_role,
instance_count=1,
instance_type="ml.m4.xlarge",
data_location=f"s3://{bucket}/{prefix_shingled}/",
output_path=f"s3://{bucket}/{prefix_shingled}/output",
num_samples_per_tree=512,
num_trees=50,
)
# automatically upload the training data to S3 and run the training job
rcf.fit(rcf.record_set(taxi_data_shingled))
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")
rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()
print(rcf_inference.content_type, rcf_inference.accept)
# Score the shingled datapoints
results = rcf_inference.predict(
taxi_data_shingled, initial_args={"ContentType": "text/csv", "Accept": "application/json"}
)
scores = np.array([datum["score"] for datum in results["scores"]])
# compute the shingled score distribution and cutoff and determine anomalous scores
score_mean = scores.mean()
score_std = scores.std()
score_cutoff = score_mean + 3 * score_std
anomalies = scores[scores > score_cutoff]
anomaly_indices = np.arange(len(scores))[scores > score_cutoff]
print(anomalies)
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
#
# *Try this out* - change `start` and `end` to zoom in on the
# anomaly found earlier in this notebook
#
start, end = 0, len(taxi_data)
taxi_data_subset = taxi_data[start:end]
ax1.plot(taxi_data["value"], color="C0", alpha=0.8)
ax2.plot(scores, color="C1")
ax2.scatter(anomaly_indices, anomalies, color="k")
ax1.grid(which="major", axis="both")
ax1.set_ylabel("Taxi Ridership", color="C0")
ax2.set_ylabel("Anomaly Score", color="C1")
ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")
ax1.set_ylim(0, 40000)
ax2.set_ylim(min(scores), 1.4 * max(scores))
fig.set_figwidth(10)
最後のエンドポイントを削除します。
sagemaker.Session().delete_endpoint(rcf_inference.endpoint)
考察
ランダムカットフォレストを使うことにより簡単に、異常値を見つけることができました。
参考