0
0

More than 3 years have passed since last update.

Azure Synapse AnalyticsのSpark NotebookでETL~MLまで一貫して実行する

Posted at

Synapse AnalyticsはSynapse Studioというあらゆるデータにアクセス可能な統合環境UIを持っており、ETL処理をSparkの上で実行することも可能です。オーケストレーション用のPipelineも作成することができます。NotebookをETL処理のためだけに利用することも想定されますが、データのロード~機械学習まで単一のNotebook上で実装可能です。

データの用意

Data HubからLinked ServiceとしてAzure Data Lake Storage Gen2を登録しておきます。これにより、コンテナの中をエクスプローラのように探索することが可能となります。
image.png

ADLSにアップロード済の.parquetファイルを使用します。Synapseは基本的に右クリックでデータにアクセス可能です。[New Notebook]を開くとデータをロードするための最初のコードが記載されたSpark Notebookが開きます。

image.png

Chartの方を選択するとグラフでデータの可視化ができます。簡単な分析ならここから行えそうですね。

早速ETLを行いたいと思います。今回は複雑なことはせずにデータの型を変換します。CSVで取得したデータをPARQUETに無理やり変換しているので、型情報が失われています。DB⇒PARQUETの場合はメタデータが保持されていると思います。

df = data_path.selectExpr(
    "cast(PatientID as int) PatientID",
    "cast(Pregnancies as int) Pregnancies",
    "cast(PlasmaGlucose as int) PlasmaGlucose",
    "cast(DiastolicBloodPressure as int)",
    "cast(TricepsThickness as int) TricepsThickness",
    "cast(SerumInsulin as int) SerumInsulin",
    "cast(BMI as double) BMI",
    "cast(DiabetesPedigree as double) DiabetesPedigree",
    "cast(Age as int) Age",
    "cast(Diabetic as int) label"
)

それぞれのカラムを適切な型にcastしています。以下でデータフレームの型が変換されていることを確認します。Diabeticカラムはlabelという名前にしておくと都合がいいのでここで変更します。

df
#
# output
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int]

元々そこまで処理が必要ではないのでETLとしてはここまでにします。

Spark用機械学習フレームワークMLibを利用した機械学習

Sparkの処理能力を活用するため、MLibでの機械学習を行います。今回使用するデータは糖尿病かどうかの二値分類のタスクになります。MLibのモデルは、x1,x2,...をベクトル化したfeaturesという名前のカラムとy用のlabelという名前のカラムを自動で認識して学習してくれます。

カラムをベクトル化する際にはVectorAssemblerを用いると容易に実装できます。

# x をベクトル化して、featuresカラムにする                                                                    
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
featurized_DF = vecAssembler.transform(df)

featurized_DFの中身を見るとfeaturesカラムが追加されていることが分かります。

featurized_DF
#
# DataFrame[PatientID: int, Pregnancies: int, PlasmaGlucose: int, DiastolicBloodPressure: int, TricepsThickness: int, SerumInsulin: int, BMI: double, DiabetesPedigree: double, Age: int, label: int, features: vector]

trainデータとtestデータに分けます。scikit-learntrain_test_split的な関数があります。

train_test_split
train, test = featurized_DF.randomSplit([0.7, 0.3], seed=12345)

学習(fit)を行います。

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

テストデータでモデルを検証します。モデルクラスのtransform()がいわゆるpredict関数的なものになります。

# predict test data 
predictions = lrModel.transform(test)
print(predictions)
predictions.select(['label','prediction','probability']).show()

Modelの検証

ROC

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')

evaluator.evaluate(predictions)

precision/recall/f1-score/support

y_true = predictions.select(['label']).collect()
y_pred = predictions.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

混合行列

scikit-learnconfusion_matrix()で算出することができます。
seabornでヒートマップを可視化しようと思ったのですがNotebookが仕様上%matplotlib inlineに対応していないらしく、泣く泣くdisplay()関数を利用しています。

import pandas as pd
cm = confusion_matrix(y_true, y_pred)
display(pd.DataFrame(cm))
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0