3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【初心者】Amazon SageMaker Random Cut Forest アルゴリズムを試してみた

Last updated at Posted at 2022-12-31

背景・目的

SageMaker向けのビルドインアルゴリズムのRandom Cut Forestを試してみようと思います。

まとめ

  • RCFは教師なしアルゴリズム
  • データセット内の異常なデータポイントや外れ値を検出できる。

概要

Random Cut Forest (RCF) アルゴリズム

データセット内の異常なデータポイントを検出する、教師なしのアルゴリズム。
異常は、時系列データにおける突然のスパイク、周期性の中断、分類できないデータポイントとして現れる場合がある。
各データポイントで、RCFは異常スコアを関連付ける。

  • 低スコスであれば、データポイントは通常とみなされ、高い値は、データ内に異常があると示される。
  • 異常の定義は、アプリケーションにより異なるが、一般的な方法としては3標準偏差を超えるスコアであれば異常とみなされる。

交通量解析や音量スパイクなど、異常検出アルゴは1次元時系列でターに広く応用される。これに対してRCFは任意の次元入力で機能するように設計されている。
SageMaker RCFは、機能数、データセットのサイズ、インスタンス数に関して適切にスケーリングされる。

RCF アルゴリズムの入出力インターフェイス

  • SageMaker RCFは、トレーニングデータとテストデータのコンテンツタイプは以下の通り。text/csvを使用する場合は、コンテンツをtext/csv;label_size=1として指定する必要がある。
    • application/x-recordio-protobuf
    • text/csv
  • 各行の最初の列は異常ラベルを表す。異常データポイントの場合は「1」、通常のデータポイントの場合は「0」
  • ファイルモードまたはパイプモードで使用すると、recordIO-wrapped-protobufまたはCSV形式のデータについてRCFモデルをトレーニングできる。
  • 必要なIAMポリシーは以下の通り
    • AmazonSageMakerFullAccess
    • AmazonEC2ContainerRegistryFullAccess

RCF アルゴリズムのインスタンスに関する推奨事項

  • トレーニングには、以下のインスタンスファミリーを推奨とのこと。
    • ml.m4
    • ml.c4
    • ml.c5
  • 推論には、最大限のパフォーマンスと使用時間当たりのコスト最小化のために、ml.c5.xl インスタンスタイプを推奨とのこと。
  • GPUを利用することはない。

実装

  • An Introduction to SageMaker Random Cut Forestsを元に試します。
  • Numenta Anomaly Benchmark (NAB) NYC Taxi のデータセットをでモデルをトレーニングします。
    • このデータセットは、ニューヨーク市のタクシー利用者を6ヶ月間記録している
  • このモデルにより、各データポイントの異常スコアを出力することで、異常なイベントを予測します。

Setup

  • インスタンスタイプは、ml.t3.mediumしたとのこと。

Select Amazon S3 Bucket

  1. サンプルコードを元に、以下を実行する。
    import boto3
    import botocore
    import sagemaker
    import sys
    
    
    bucket = (
        sagemaker.Session().default_bucket()
    )  # Feel free to change to another bucket you have access to
    prefix = "sagemaker/rcf-benchmarks"
    execution_role = sagemaker.get_execution_role()
    region = boto3.Session().region_name
    
    # S3 bucket where the original data is downloaded and stored.
    downloaded_data_bucket = f"sagemaker-sample-files"
    downloaded_data_prefix = "datasets/tabular/anomaly_benchmark_taxi"
    
    
    def check_bucket_permission(bucket):
        # check if the bucket exists
        permission = False
        try:
            boto3.Session().client("s3").head_bucket(Bucket=bucket)
        except botocore.exceptions.ParamValidationError as e:
            print(
                "Hey! You either forgot to specify your S3 bucket"
                " or you gave your bucket an invalid name!"
            )
        except botocore.exceptions.ClientError as e:
            if e.response["Error"]["Code"] == "403":
                print(f"Hey! You don't have permission to access the bucket, {bucket}.")
            elif e.response["Error"]["Code"] == "404":
                print(f"Hey! Your bucket, {bucket}, doesn't exist!")
            else:
                raise
        else:
            permission = True
        return permission
    
    
    if check_bucket_permission(bucket):
        print(f"Training input/output will be stored in: s3://{bucket}/{prefix}")
    if check_bucket_permission(downloaded_data_bucket):
        print(
            f"Downloaded training data will be read from s3://{downloaded_data_bucket}/{downloaded_data_prefix}"
        )
    
    
  2. 成功すると、以下のメッセージが表示されます。
    • エラーメッセージが表示される場合は、バケットの存在が実在しているか、またはIAMポリシーの確認が必要です。
    ​
    Training input/output will be stored in: s3://{バケット名}/sagemaker/rcf-benchmarks
    Downloaded training data will be read from s3://sagemaker-sample-files/datasets/tabular/anomaly_benchmark_taxi
    
    

Obtain and Inspect Example Data

  1. Numenta Anomaly Benchmark (NAB) NYC Taxi datasetから取得したデータセットが、sagemaker-sample-filesに格納されている。これをダウンロードします。

    %%time
    
    import pandas as pd
    
    data_filename = "NAB_nyc_taxi.csv"
    s3 = boto3.client("s3")
    s3.download_file(downloaded_data_bucket, f"{downloaded_data_prefix}/{data_filename}", data_filename)
    taxi_data = pd.read_csv(data_filename, delimiter=",")
    
    
  2. ダウンロードが成功すると以下のようにCPUタイムなどが表示されます。

    CPU times: user 187 ms, sys: 32.5 ms, total: 220 ms
    Wall time: 1.03 s
    
  3. モデルのトレーニング前に、データを検査します。
    モデルのトレーニング前には、データを検査することで以下が可能になるため重要とのこと。

    • 基本的なパターンや構造の確認
    • 前処理して除去できるノイズの発見
      headにより、先頭の5行が表示されます。
    • timestampとvalueのカラムが存在する。
    • timestampは、30分毎の年月日、時分秒まで格納されている
    • valueは乗車回数
    taxi_data.head()
    
  4. 以下のように表示されました。
    image.png

  5. 次に視覚化し、概要を確認します。

    %matplotlib inline
    
    import matplotlib
    import matplotlib.pyplot as plt
    
    matplotlib.rcParams["figure.dpi"] = 100
    
    taxi_data.plot()
    
    
  6. 以下のようにプロットされました。以下のことがわかりました。

    • 6000付近でスパイクしてる。
    • 定期的に減少し、その後直ぐに回復、短期間で右肩上がりで増える。
      image.png
  7. 6000付近のデータを確認します。

    taxi_data[5500:6500].plot()
    
  8. 以下のようにプロットされました。

    • 50データポイントの周期で乗車回数が一定
      image.png
  9. 一日のトレンドを確認してみます。

    taxi_data[5952:6000]
    taxi_data[5952:6000].plot
    
  10. 以下のように表示されました。
    image.png

    1日分の乗車回数のデータセット
    ``` timestamp value 5952 2014-11-02 00:00:00 25110 5953 2014-11-02 00:30:00 23109 5954 2014-11-02 01:00:00 39197 5955 2014-11-02 01:30:00 35212 5956 2014-11-02 02:00:00 13259 5957 2014-11-02 02:30:00 12250 5958 2014-11-02 03:00:00 10013 5959 2014-11-02 03:30:00 7898 5960 2014-11-02 04:00:00 6375 5961 2014-11-02 04:30:00 4532 5962 2014-11-02 05:00:00 5116 5963 2014-11-02 05:30:00 5232 5964 2014-11-02 06:00:00 4542 5965 2014-11-02 06:30:00 5298 5966 2014-11-02 07:00:00 5155 5967 2014-11-02 07:30:00 6029 5968 2014-11-02 08:00:00 6280 5969 2014-11-02 08:30:00 8771 5970 2014-11-02 09:00:00 10151 5971 2014-11-02 09:30:00 12501 5972 2014-11-02 10:00:00 13990 5973 2014-11-02 10:30:00 16534 5974 2014-11-02 11:00:00 17133 5975 2014-11-02 11:30:00 18775 5976 2014-11-02 12:00:00 18985 5977 2014-11-02 12:30:00 19911 5978 2014-11-02 13:00:00 19123 5979 2014-11-02 13:30:00 19524 5980 2014-11-02 14:00:00 19640 5981 2014-11-02 14:30:00 18364 5982 2014-11-02 15:00:00 17940 5983 2014-11-02 15:30:00 17949 5984 2014-11-02 16:00:00 17288 5985 2014-11-02 16:30:00 16326 5986 2014-11-02 17:00:00 17522 5987 2014-11-02 17:30:00 19243 5988 2014-11-02 18:00:00 20291 5989 2014-11-02 18:30:00 21649 5990 2014-11-02 19:00:00 22839 5991 2014-11-02 19:30:00 21772 5992 2014-11-02 20:00:00 20994 5993 2014-11-02 20:30:00 19774 5994 2014-11-02 21:00:00 18398 5995 2014-11-02 21:30:00 17764 5996 2014-11-02 22:00:00 17334 5997 2014-11-02 22:30:00 15431 5998 2014-11-02 23:00:00 12958 5999 2014-11-02 23:30:00 10224 ```

Training

SageMaker トレーニングジョブを構成して、タクシーデータでランダムカットフォレスト (RCF) アルゴリズムをトレーニングします。

Hyperparameters

SageMaker RCF トレーニングジョブに特有のものは、次のハイパーパラメータです。

  • num_samples_per_tree
    • 各ツリーに送信されるランダムにサンプリングされたデータポイントの数。 原則として、1/num_samples_per_tree は、データセット内の正常点に対する異常点の推定比率に近似する必要があります。
  • num_trees
    • フォレスト内に作成するツリーの数。 各ツリーは、異なるデータサンプルから個別のモデルを学習します。 完全なフォレスト モデルは、各構成ツリーから予測された平均異常スコアを使用します。
  • feature_dim
    • 各データポイントの次元。
  1. 以下のコマンドを実行してトレーニングします。

    from sagemaker import RandomCutForest
    
    session = sagemaker.Session()
    
    # specify general training job information
    rcf = RandomCutForest(
        role=execution_role,
        instance_count=1,
        instance_type="ml.m4.xlarge",
        data_location=f"s3://{bucket}/{prefix}/",
        output_path=f"s3://{bucket}/{prefix}/output",
        num_samples_per_tree=512,
        num_trees=50,
    )
    
    # automatically upload the training data to S3 and run the training job
    rcf.fit(rcf.record_set(taxi_data.value.to_numpy().reshape(-1, 1)))
    
  2. しばらくすると完了します。

    2022-12-31 02:24:03 Uploading - Uploading generated training model
    2022-12-31 02:24:03 Completed - Training job completed
    Training seconds: 147
    Billable seconds: 147
    
  3. 実行完了したジョブを確認します。

    print(f"Training job name: {rcf.latest_training_job.job_name}")
    

    image.png

  4. 完了は以下の画面からでも確認が可能です。

    • SageMakerのナビゲーションペイン > トレーニングジョブ
      image.png
    • 上記で確認したトレーニングジョブがありました。該当のジョブをクリックします。
      image.png
    • ジョブの詳細が表示されます。(Complete、実行時間を確認できました。)
      image.png
    • 「履歴の表示」をクリックすると各処理の内訳と実行時間(開始〜終了)が確認できます。
      image.png

Inference

deploy()関数を使用して推論エンドポイントを作成する。
推論が計算されるインスタンスタイプと起動するインスタンスの初期数を指定する。

  1. デプロイします。

    • インスタンス数は、1
    • インスタンスタイプはml.m4.xlarge
    rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")
    

    image.png

  2. エンドポイント名を確認します。

    print(f"Endpoint name: {rcf_inference.endpoint}")
    

    image.png

  3. コンソール名からもエンドポイントを確認します。

    • SageMakerのナビゲーションペイン>エンドポイントをクリックします。
      image.png
    • 上記で確認したエンドポイントがありました。
      image.png

Data Serialization/Deserialization

さまざまな形式のデータを推論エンドポイントに渡すことができます。 この例では、CSV 形式のデータを渡す方法を示します。
(その他の使用可能な形式は、JSON 形式および RecordIO Protobuf です。 推論エンドポイントを構成するときに、SageMaker Python SDK ユーティリティ csv_serializer および json_deserializer を使用します。)

  1. CSV形式のシリアライザーとJSONのでシリアライザーを指定します。
    from sagemaker.serializers import CSVSerializer
    from sagemaker.deserializers import JSONDeserializer
    
    rcf_inference.serializer = CSVSerializer()
    rcf_inference.deserializer = JSONDeserializer()
    
    
  2. 異常検知のために、トレーニングデータセットをCSV形式で推論エンドポイントに渡す。下記で6データポイントのみ渡しています。
    taxi_data_numpy = taxi_data.value.to_numpy().reshape(-1, 1)
    print(taxi_data_numpy[:6])
    results = rcf_inference.predict(
        taxi_data_numpy[:6], initial_args={"ContentType": "text/csv", "Accept": "application/json"}
    )
    
    ===
    
    [[10844]
     [ 8127]
     [ 6210]
     [ 4656]
     [ 3820]
     [ 2873]]
    
    

Computing Anomaly Scores

  1. 全てのデータセット全体から異常スコアを計算してプロットします。

    results = rcf_inference.predict(taxi_data_numpy)
    scores = [datum["score"] for datum in results["scores"]]
    
    # add scores to taxi data frame and print first few values
    taxi_data["score"] = pd.Series(scores, index=taxi_data.index)
    taxi_data.head()
    
    ===
    
    timestamp	value	score
    0	2014-07-01 00:00:00	10844	0.922123
    1	2014-07-01 00:30:00	8127	0.990607
    2	2014-07-01 01:00:00	6210	0.930758
    3	2014-07-01 01:30:00	4656	0.843785
    4	2014-07-01 02:00:00	3820	0.885350
    
    
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    
    #
    # *Try this out* - change `start` and `end` to zoom in on the
    # anomaly found earlier in this notebook
    #
    start, end = 0, len(taxi_data)
    # start, end = 5500, 6500
    taxi_data_subset = taxi_data[start:end]
    
    ax1.plot(taxi_data_subset["value"], color="C0", alpha=0.8)
    ax2.plot(taxi_data_subset["score"], color="C1")
    
    ax1.grid(which="major", axis="both")
    
    ax1.set_ylabel("Taxi Ridership", color="C0")
    ax2.set_ylabel("Anomaly Score", color="C1")
    
    ax1.tick_params("y", colors="C0")
    ax2.tick_params("y", colors="C1")
    
    ax1.set_ylim(0, 40000)
    ax2.set_ylim(min(scores), 1.4 * max(scores))
    fig.set_figwidth(10)
    
    
  2. 以下のようにプロットされました。
    image.png

  3. 平均スコアから 3 標準偏差 (約 99.9 パーセンタイル) を超えるスコアを持つデータ ポイントをプロットします。

    score_mean = taxi_data["score"].mean()
    score_std = taxi_data["score"].std()
    score_cutoff = score_mean + 3 * score_std
    
    anomalies = taxi_data_subset[taxi_data_subset["score"] > score_cutoff]
    anomalies
    

    image.png

  4. 異常スコアをプロットします。

    ax2.plot(anomalies.index, anomalies.score, "ko")
    fig
    

    image.png

Stop and Delete the Endpoint

  1. 最後にエンドポイントを削除します。
    sagemaker.Session().delete_endpoint(rcf_inference.endpoint)
    
  2. マネコンからも消えていました。
    image.png

Epilogue

RCF アルゴリズムは、たとえば、データが周期性を壊したり、グローバルな動作を異常に変更したりする時期を検出することもできます。
持っているデータの種類に応じて、アルゴリズムのパフォーマンスを改善する方法がいくつかあります。
たとえば、1 つの方法は、適切なトレーニングセットを使用することです。 特定のデータ セットが「正常な」動作の特徴であることがわかっている場合、そのデータ セットをトレーニングすると、「異常な」データをより正確に特徴付けることができます。
もう 1 つの改善点は、「シングリング」と呼ばれるウィンドウ処理手法を利用することです。 これは、上記で使用した NYC タクシー データセットなど、周期がわかっている定期的なデータを操作する場合に特に便利です。 アイデアは、データポイントの期間を特徴の長さの単一のデータポイントとして扱い、これらの特徴ベクトルに対して RCF アルゴリズムを実行することです。 つまり、元のデータがポイントで構成されている場合、変換を実行します。

import numpy as np


def shingle(data, shingle_size):
    num_data = len(data)
    shingled_data = np.zeros((num_data - shingle_size, shingle_size))

    for n in range(num_data - shingle_size):
        shingled_data[n] = data[n : (n + shingle_size)]
    return shingled_data


# single data with shingle size=48 (one day)
shingle_size = 48
prefix_shingled = "sagemaker/randomcutforest_shingled"
taxi_data_shingled = shingle(taxi_data.values[:, 1], shingle_size)
print(taxi_data_shingled)

session = sagemaker.Session()

# specify general training job information
rcf = RandomCutForest(
    role=execution_role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    data_location=f"s3://{bucket}/{prefix_shingled}/",
    output_path=f"s3://{bucket}/{prefix_shingled}/output",
    num_samples_per_tree=512,
    num_trees=50,
)

# automatically upload the training data to S3 and run the training job
rcf.fit(rcf.record_set(taxi_data_shingled))

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

rcf_inference = rcf.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")

rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()
print(rcf_inference.content_type, rcf_inference.accept)
# Score the shingled datapoints
results = rcf_inference.predict(
    taxi_data_shingled, initial_args={"ContentType": "text/csv", "Accept": "application/json"}
)
scores = np.array([datum["score"] for datum in results["scores"]])

# compute the shingled score distribution and cutoff and determine anomalous scores
score_mean = scores.mean()
score_std = scores.std()
score_cutoff = score_mean + 3 * score_std

anomalies = scores[scores > score_cutoff]
anomaly_indices = np.arange(len(scores))[scores > score_cutoff]

print(anomalies)

fig, ax1 = plt.subplots()
ax2 = ax1.twinx()

#
# *Try this out* - change `start` and `end` to zoom in on the
# anomaly found earlier in this notebook
#
start, end = 0, len(taxi_data)
taxi_data_subset = taxi_data[start:end]

ax1.plot(taxi_data["value"], color="C0", alpha=0.8)
ax2.plot(scores, color="C1")
ax2.scatter(anomaly_indices, anomalies, color="k")

ax1.grid(which="major", axis="both")
ax1.set_ylabel("Taxi Ridership", color="C0")
ax2.set_ylabel("Anomaly Score", color="C1")
ax1.tick_params("y", colors="C0")
ax2.tick_params("y", colors="C1")
ax1.set_ylim(0, 40000)
ax2.set_ylim(min(scores), 1.4 * max(scores))
fig.set_figwidth(10)

image.png

最後のエンドポイントを削除します。

sagemaker.Session().delete_endpoint(rcf_inference.endpoint)

考察

ランダムカットフォレストを使うことにより簡単に、異常値を見つけることができました。

参考

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?