Synapse AnalyticsはSynapse Studioというあらゆるデータにアクセス可能な統合環境UIを持っており、ETL処理をSparkの上で実行することも可能です。オーケストレーション用のPipelineも作成することができます。NotebookをETL処理のためだけに利用することも想定されますが、データのロード~機械学習まで単一のNotebook上で実装可能です。
データの用意
Data HubからLinked ServiceとしてAzure Data Lake Storage Gen2を登録しておきます。これにより、コンテナの中をエクスプローラのように探索することが可能となります。

ADLSにアップロード済の.parquetファイルを使用します。Synapseは基本的に右クリックでデータにアクセス可能です。[New Notebook]を開くとデータをロードするための最初のコードが記載されたSpark Notebookが開きます。
Chartの方を選択するとグラフでデータの可視化ができます。簡単な分析ならここから行えそうですね。
早速ETLを行いたいと思います。今回は複雑なことはせずにデータの型を変換します。CSVで取得したデータをPARQUETに無理やり変換しているので、型情報が失われています。DB⇒PARQUETの場合はメタデータが保持されていると思います。
df = data_path.selectExpr(
"cast(PatientID as int) PatientID",
"cast(Pregnancies as int) Pregnancies",
"cast(PlasmaGlucose as int) PlasmaGlucose",
"cast(DiastolicBloodPressure as int)",
"cast(TricepsThickness as int) TricepsThickness",
"cast(SerumInsulin as int) SerumInsulin",
"cast(BMI as double) BMI",
"cast(DiabetesPedigree as double) DiabetesPedigree",
"cast(Age as int) Age",
"cast(Diabetic as int) label"
)
それぞれのカラムを適切な型にcastしています。以下でデータフレームの型が変換されていることを確認します。Diabeticカラムはlabelという名前にしておくと都合がいいのでここで変更します。
df
#
# output
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int]
元々そこまで処理が必要ではないのでETLとしてはここまでにします。
Spark用機械学習フレームワークMLibを利用した機械学習
Sparkの処理能力を活用するため、MLibでの機械学習を行います。今回使用するデータは糖尿病かどうかの二値分類のタスクになります。MLibのモデルは、x1,x2,...をベクトル化したfeaturesという名前のカラムとy用のlabelという名前のカラムを自動で認識して学習してくれます。
カラムをベクトル化する際にはVectorAssemblerを用いると容易に実装できます。
# x をベクトル化して、featuresカラムにする
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
featurized_DF = vecAssembler.transform(df)
featurized_DFの中身を見るとfeaturesカラムが追加されていることが分かります。
featurized_DF
#
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int, features: vector]
trainデータとtestデータに分けます。scikit-learnのtrain_test_split的な関数があります。
train_test_split
train, test = featurized_DF.randomSplit([0.7, 0.3], seed=12345)
学習(fit)を行います。
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(train)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
テストデータでモデルを検証します。モデルクラスのtransform()がいわゆるpredict関数的なものになります。
# predict test data
predictions = lrModel.transform(test)
print(predictions)
predictions.select(['label','prediction','probability']).show()
Modelの検証
ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)
precision/recall/f1-score/support
y_true = predictions.select(['label']).collect()
y_pred = predictions.select(['prediction']).collect()
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
混合行列
scikit-learnのconfusion_matrix()で算出することができます。
seabornでヒートマップを可視化しようと思ったのですがNotebookが仕様上%matplotlib inlineに対応していないらしく、泣く泣くdisplay()関数を利用しています。
import pandas as pd
cm = confusion_matrix(y_true, y_pred)
display(pd.DataFrame(cm))
