Synapse AnalyticsはSynapse Studioというあらゆるデータにアクセス可能な統合環境UIを持っており、ETL処理をSparkの上で実行することも可能です。オーケストレーション用のPipelineも作成することができます。NotebookをETL処理のためだけに利用することも想定されますが、データのロード~機械学習まで単一のNotebook上で実装可能です。
データの用意
Data HubからLinked ServiceとしてAzure Data Lake Storage Gen2を登録しておきます。これにより、コンテナの中をエクスプローラのように探索することが可能となります。
ADLSにアップロード済の.parquetファイルを使用します。Synapseは基本的に右クリックでデータにアクセス可能です。[New Notebook]を開くとデータをロードするための最初のコードが記載されたSpark Notebookが開きます。
Chartの方を選択するとグラフでデータの可視化ができます。簡単な分析ならここから行えそうですね。
早速ETLを行いたいと思います。今回は複雑なことはせずにデータの型を変換します。CSVで取得したデータをPARQUETに無理やり変換しているので、型情報が失われています。DB⇒PARQUETの場合はメタデータが保持されていると思います。
df = data_path.selectExpr(
"cast(PatientID as int) PatientID",
"cast(Pregnancies as int) Pregnancies",
"cast(PlasmaGlucose as int) PlasmaGlucose",
"cast(DiastolicBloodPressure as int)",
"cast(TricepsThickness as int) TricepsThickness",
"cast(SerumInsulin as int) SerumInsulin",
"cast(BMI as double) BMI",
"cast(DiabetesPedigree as double) DiabetesPedigree",
"cast(Age as int) Age",
"cast(Diabetic as int) label"
)
それぞれのカラムを適切な型にcastしています。以下でデータフレームの型が変換されていることを確認します。Diabetic
カラムはlabel
という名前にしておくと都合がいいのでここで変更します。
df
#
# output
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int]
元々そこまで処理が必要ではないのでETLとしてはここまでにします。
Spark用機械学習フレームワークMLibを利用した機械学習
Sparkの処理能力を活用するため、MLibでの機械学習を行います。今回使用するデータは糖尿病かどうかの二値分類のタスクになります。MLibのモデルは、x1,x2,...をベクトル化したfeatures
という名前のカラムとy用のlabel
という名前のカラムを自動で認識して学習してくれます。
カラムをベクトル化する際にはVectorAssembler
を用いると容易に実装できます。
# x をベクトル化して、featuresカラムにする
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
featurized_DF = vecAssembler.transform(df)
featurized_DF
の中身を見るとfeatures
カラムが追加されていることが分かります。
featurized_DF
#
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int, features: vector]
trainデータとtestデータに分けます。scikit-learn
のtrain_test_split
的な関数があります。
train_test_split
train, test = featurized_DF.randomSplit([0.7, 0.3], seed=12345)
学習(fit)を行います。
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(train)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
テストデータでモデルを検証します。モデルクラスのtransform()
がいわゆるpredict関数的なものになります。
# predict test data
predictions = lrModel.transform(test)
print(predictions)
predictions.select(['label','prediction','probability']).show()
Modelの検証
ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)
precision/recall/f1-score/support
y_true = predictions.select(['label']).collect()
y_pred = predictions.select(['prediction']).collect()
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
混合行列
scikit-learn
のconfusion_matrix()
で算出することができます。
seaborn
でヒートマップを可視化しようと思ったのですがNotebookが仕様上%matplotlib inline
に対応していないらしく、泣く泣くdisplay()
関数を利用しています。
import pandas as pd
cm = confusion_matrix(y_true, y_pred)
display(pd.DataFrame(cm))