0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Azure Synapse Analytics + Anomaly Detectorで異常値検出をやってみる

Last updated at Posted at 2022-02-10

Cognitive Servicesの1つに Anomaly Detector というサービスがあります。
これは時系列データの中から普段と異なる異常値を検出してくれるもので、機械学習の知識なく実装することが可能です。
今回はSynapse Analytics内のデータをAnomaly Detectorを使って異常値検出するということをやってみました。
公式手順 が準備されていますが、データ量が少ないため、もう少し大きなデータをDataLakeStorageから読み込んで利用してみます。
色々ハマったので書き残します。

Synapse Analytics作成

  • 公式手順 に従いSynapse Analytics Workspaceを作成します。
  • 作成できたらSynapse Studioを開きます。

Apache Sparkプール作成

  • Synapse Studioで、[管理]→[Apache Sparkプール]→[新規]の順に選択します。
    image.png

  • [基本]タブを入力します。値は任意です。
    image.png

  • [追加設定]では自動一時停止設定を入れています。設定は任意です。
    image.png

  • [タグ]も任意です。
    image.png

  • ここまで設定したらPoolを作成します。

Synapse AnalyticsとCognitive Servicesの連携

データ準備

タイムスタンプ確認

  • ここでかなりハマりました。Anomaly Detectorではタイムスタンプ列のフォーマットがISO8601準拠である必要があります。2022-01-01T00:00:00Z の形式です。
  • 事前に確認し一致していなければ変換する必要があります。

データ読み込み

  • Synapse Studioで、[開発]→[ノートブック]→[新しいノートブック]の順に選択します。
    image.png

  • Jupyter Notebookで以下のように入力し、DataFrameを作成します。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
account_name = "[DataLake Storage Account Name]"
container_name = "[DataLake Container Name]"
relative_path = "."
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)

df1 = spark.read.option('header', 'true') \
                .option('delimiter', ',') \
                .csv(adls_path + '/input.csv')
  • 正しく読み込まれているか確認します。
display(df1.limit(10))
  • データベース、テーブルを作成します。
%%pyspark
spark.sql("CREATE DATABASE IF NOT EXISTS [Database Name]")
df1.write.mode("overwrite").saveAsTable("[Database Name].[Table Name]")

Anomaly Detector

  • Synapse Studioより、[データ]→[レイクデータベース]→[対象DB]→[対象TABLE]を順に選択し、[Machine Learning]→[モデルを使用した予測]を選択します。
    image.png

  • [Anomaly Detector]を選択します。
    image.png

  • Azure Cognitive Servicesのリンクサービスには、事前に設定したリンクサービスを選択します。
    image.png

  • 細分性は時系列データのサンプリングレートを選択します。今回のデータは1時間毎の数値なのでhourlyを選択します。
    image.png

  • タイムスタンプ列にはタイムスタンプが入った列を指定します。今回はtimestamp列を選択します。
    image.png

  • 時系列値列には評価するValueを選択します。今回はWebの閲覧数を表すviews列を選択します。
    image.png

  • 列のグループ化には時系列ごとにグルーピングする列を指定します。今回はmarketplaceを指定しています。
    image.png

  • これで作成するとNotebookが開きます。

異常検知

  • 開いたNotebookを以下のように編集し実行します。今回のデータでは、marketplaceをグループ化していますが、他にもplatform列を絞らないと時系列データが重複してしまうため、WHERE句を追加しています。
from mmlspark.cognitive import *
from notebookutils import mssparkutils
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Load the data into a Spark DataFrame
df = spark.sql("SELECT * FROM [Database Name].[Table Name] WHERE platform = 'pc_web'").withColumn("views", col("views").cast(DoubleType()))

anomalyDetector = (SimpleDetectAnomalies()
    .setLinkedService("CognitiveService1")
    .setOutputCol("output")
    .setErrorCol("error")
    .setGranularity("hourly")
    .setTimestampCol("timestamp")
    .setValueCol("views")
    .setGroupbyCol("marketplace"))

results = anomalyDetector.transform(df)

# Show the results
display(results.select("timestamp", "platform", "marketplace", "views", "output.*", "error"))
  • 以下のように結果出力されればOKです。isAnomalyがTRUEの行が異常値と検出されています。
    image.png

以上

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?