Cognitive Servicesの1つに Anomaly Detector というサービスがあります。
これは時系列データの中から普段と異なる異常値を検出してくれるもので、機械学習の知識なく実装することが可能です。
今回はSynapse Analytics内のデータをAnomaly Detectorを使って異常値検出するということをやってみました。
公式手順 が準備されていますが、データ量が少ないため、もう少し大きなデータをDataLakeStorageから読み込んで利用してみます。
色々ハマったので書き残します。
Synapse Analytics作成
- 公式手順 に従いSynapse Analytics Workspaceを作成します。
- 作成できたらSynapse Studioを開きます。
Apache Sparkプール作成
-
ここまで設定したらPoolを作成します。
Synapse AnalyticsとCognitive Servicesの連携
データ準備
-
今回は同様の異常検知サービスである Amazon Lookout for Metricsのサンプルデータを拝借します。
https://github.com/harunobukameda/Amazon-Lookout-for-Metrics/blob/main/ecommerce.zip -
input.csvをDataLake Storageに配置します。
タイムスタンプ確認
- ここでかなりハマりました。Anomaly Detectorではタイムスタンプ列のフォーマットがISO8601準拠である必要があります。2022-01-01T00:00:00Z の形式です。
- 事前に確認し一致していなければ変換する必要があります。
データ読み込み
from pyspark.sql import SparkSession
from pyspark.sql.types import *
account_name = "[DataLake Storage Account Name]"
container_name = "[DataLake Container Name]"
relative_path = "."
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)
df1 = spark.read.option('header', 'true') \
.option('delimiter', ',') \
.csv(adls_path + '/input.csv')
- 正しく読み込まれているか確認します。
display(df1.limit(10))
- データベース、テーブルを作成します。
%%pyspark
spark.sql("CREATE DATABASE IF NOT EXISTS [Database Name]")
df1.write.mode("overwrite").saveAsTable("[Database Name].[Table Name]")
Anomaly Detector
-
Synapse Studioより、[データ]→[レイクデータベース]→[対象DB]→[対象TABLE]を順に選択し、[Machine Learning]→[モデルを使用した予測]を選択します。
-
これで作成するとNotebookが開きます。
異常検知
- 開いたNotebookを以下のように編集し実行します。今回のデータでは、marketplaceをグループ化していますが、他にもplatform列を絞らないと時系列データが重複してしまうため、WHERE句を追加しています。
from mmlspark.cognitive import *
from notebookutils import mssparkutils
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
# Load the data into a Spark DataFrame
df = spark.sql("SELECT * FROM [Database Name].[Table Name] WHERE platform = 'pc_web'").withColumn("views", col("views").cast(DoubleType()))
anomalyDetector = (SimpleDetectAnomalies()
.setLinkedService("CognitiveService1")
.setOutputCol("output")
.setErrorCol("error")
.setGranularity("hourly")
.setTimestampCol("timestamp")
.setValueCol("views")
.setGroupbyCol("marketplace"))
results = anomalyDetector.transform(df)
# Show the results
display(results.select("timestamp", "platform", "marketplace", "views", "output.*", "error"))
以上