2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksでMLlibを使い始める - 2値分類のサンプル

Last updated at Posted at 2022-05-05

こちらではMLlibを用いた機械学習チュートリアルのサンプルノートブックの流れを説明します。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

翻訳済みのノートブックはこちらです。

このチュートリアルはApache Spark MLlibを使い始められるように設計されています。ここでは2値分類問題を取り扱います。デモグラフィックデータに基づいて、ある個人の収入が$50,000より高いかどうかを予測できるでしょうか?UCI Machine Learning Repositoryからのデータセット使用しており、このデータはDatabricksランタイム上で提供されています。このノートブックでは、データの前処理、機械学習パイプライン、いくつかの機械学習アルゴリズムなどMLlibで利用できる幾つかの機能をデモンストレーションします。

このノートブックには以下のステップが含まれています:

  1. データセットのロード
  2. 特徴量の処理
  3. モデルの定義
  4. パイプラインの構築
  5. モデルの評価
  6. ハイパーパラメーターチューニング
  7. 予測及びモデルパフォーマンスの評価

要件

Databricksランタイム7.0以降、あるいはDatabricks機械学習ランタイム7.0以降が必要です。

Step 1. データセットのロード

データの最初の数行を表示するためにDatabricksユーティリティを使用します。

%fs head --maxBytes=1024 databricks-datasets/adult/adult.data

Screen Shot 2022-05-05 at 16.19.59.png

データセットにはカラム名が含まれていないので、カラム名とデータ型を割り当てるためにスキーマを作成します。

Python
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

トレーニングセットとテストセットにランダムに分割し、再現性確保のためのシードを設定します。

前処理を行う前にデータを分割するのがベストです。これによって、モデルを評価する際にテストデータセットが新規のデータをより近い形でシミュレーションすることが可能となります。

Python
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # トレーニングデータに複数回アクセスするのでキャッシュします
print(testDF.count())

Screen Shot 2022-05-05 at 16.20.58.png

データを確認してみましょう。

Python
display(trainDF)

Screen Shot 2022-05-05 at 16.21.30.png

hours_per_weekの数の分布はどのようになっているでしょうか?

Python
display(trainDF.select("hours_per_week").summary())

Screen Shot 2022-05-05 at 16.22.07.png

educationのステータスはどうなっているでしょうか?

Python
display(trainDF
        .groupBy("education")
        .count()
        .sort("count", ascending=False))

Screen Shot 2022-05-05 at 16.22.39.png

バックグラウンド: トランスフォーマー、エスティメーター、パイプライン

このノートブックで説明されるMLlibの機械学習においては3つの重要なコンセプトがあります。トランスフォーマーエスティメーターパイプラインです。

  • トランスフォーマー(Transformer): データフレームを入力とし新たなデータフレームを返却します。トランスフォーマーはデータからパラメーターを学習せず、モデルトレーニングのためのデータを準備する、あるいはトレーニングしたMLlibモデルを用いた予測を生成するために、シンプルにルールベースの変換処理を適用します。トランスフォーマーは.transform()メソッドで呼び出すことができます。

  • エスティメーター(Estimator): .fit()メソッド経由でデータフレームからパラメーターを学習(あるいは"フィット")し、トランスフォーマーであるモデルを返却します。

  • パイプライン(Pipeline): 複数のステップを容易に実行できるように単一のワークフローにまとめます。機械学習んモデルの生成においては、通常異なる数多くのステップのセットアップが含まれ、それらに対してイテレーションを行います。パイプラインは、このプロセスの自動化に役立ちます。

詳細はこちら:

Step 2. 特徴量の前処理

このノートブックのゴールはデータセットに含まれる特徴量(教育レベル、既婚・未婚、職業など)からincomeのレベルを予測するモデルを構築することです。MLlibで必要となるフォーマットになるように、特徴量を操作、前処理することが最初のステップとなります。

カテゴリー変数を数値に変換

線形回帰、ロジスティック回帰のようないくつかの機械学習アルゴリズムでは数値の特徴量が求められます。Adultデータセットには学歴、職業、既婚・未婚のようなカテゴリー変数が含まれています。

以下のコードブロックでは、カテゴリー変数を0あるいは1のみの値を取る一連の数値変数に変換するために、StringIndexerOneHotEncoderの使い方を説明しています。

  • StringIndexerは、文字列のカラムをラベルインデックスのカラムに変換します。例えば、 "red"、"blue"、"green"という値を0、1、2に変換します。
  • OneHotEncoderは、カテゴリーのインデックスを2値のベクトルにマッピングします。それぞれの行で最大1つの"1"が存在し、それがその行のカテゴリー変数のインデックスに対応します。

Sparkにおけるワンホットエンコーディングは2ステップのプロセスになります。最初にStringIndexerを使用して、次にOneHotEncoderを使用します。以下のコードブロックではStringIndexerとOneHotEncoderを定義していますが、まだデータには適用していません。

詳細はこちら:

Python
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# 以下の2行はエスティメーターです。後ほどデータセットを変換するために適用する関数を返却します
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

# ラベルのカラム("income")も文字列です。2つの値を取り得ます。"<=50K" と ">50K" です
# StringIndexerを用いて数値に変換します
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

このノートブックでは、特徴量エンジニアリングとモデリングのステップすべてをまとめるパイプラインを構築します。しかし、上のコードブロックで作成したstringIndexerを適用することで、エスティメーターとトランスフォーマーがどの容易動作するのかを詳しくみてみましょう。

データセットを変換するのに使用するStringIndexerModelを返却してもらうために.fit()メソッドを呼び出します。

StringIndexerModel.transform()メソッドは、新たなカラムが追加された新規のデータフレームを返却します。必要であれば、右側にスクロールして新規のカラムを見てみてください。

詳細はこちら:

Python
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))

Screen Shot 2022-05-05 at 16.24.31.png

すべての特徴量カラムを1つの特徴量ベクトルに結合

多くのMLlibアルゴリズムは入力として1つの特徴量カラムを要求します。このカラムのそれぞれの行には、予測で用いる一連の特徴量に対応するデータポイントのベクトルが含まれます。

MLlibはカラムのリストから1つのベクトルを作成するために、VectorAssemblerトランスフォーマーを提供します。

以下のコードブロックではVectorAssemblerの使い方を説明しています。

詳細はこちら:

Python
from pyspark.ml.feature import VectorAssembler

# これにはデータセットの数値カラムと、ワンホットエンコーディングされた2値ベクトルカラムの両方が含まれています
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

Step 3. モデルの定義

このノートブックでは、ロジスティック回帰モデルを使用します。

Python
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

Step 4. パイプラインの構築

Pipelineはトランスフォーマーとエスティメーターの順序ありリストです。データセットに適用される変換処理を自動化し、再現性を確保するためにパイプラインを定義することができます。このステップではパイプラインを定義し、テストデータセットに適用します。

StringIndexerで見たのと同じように、Pipelineはエスティメーターです。pipeline.fit()メソッドはトランスフォーマーであるPipelineModelを返却します。

詳細はこちら:

Python
from pyspark.ml import Pipeline

# 以前のステップで作成したステージに基づいてパイプラインを定義
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# パイプラインモデルの定義
pipelineModel = pipeline.fit(trainDF)

# テストデータセットにパイプラインモデルを適用
predDF = pipelineModel.transform(testDF)

モデルの予測結果を表示します。featuresカラムはワンホットエンコーディングされる場合にはよく起こり得る、多くのゼロから構成されるsparse vectorとなります。

Python
display(predDF.select("features", "label", "prediction", "probability"))

Screen Shot 2022-05-05 at 16.26.33.png

Step 5. モデルの評価

displayコマンドにはビルトインのROC曲線オプションがあります。

Python
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

Screen Shot 2022-05-05 at 16.27.16.png

モデルを評価するためには、ROC曲線のAUC(曲線の下部の面積)を評価するためにBinaryClassificationEvaluatorを用い、精度を評価するためにMulticlassClassificationEvaluatorを使用します。

詳細はこちら:

Python
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

Screen Shot 2022-05-05 at 16.27.55.png

Step 6. ハイパーパラメーターチューニング

MLlibはハイパーパラメーターチューニングと交差検証を行うための手段を提供しています。

  • ハイパーパラメーターチューニングに対しては、ParamGridBuilderを用いることで、一連のモデルハイパーパラメーターに対するグリッドサーチを定義することができます。
  • 交差検証については、CrossValidatorを用いることで、エスティメーター(入力データセットに適用するパイプライン)、エバリュエーター、ハイパーパラメーターのグリッド空間、交差検証に用いるフォールド数を指定することができます。

詳細はこちら:

モデルをチューニングするために ParamGridBuilderCrossValidator を使用します。このサンプルでは、3つの値のregParamと3つの値のelasticNetParamを用いるので、 CrossValidatorで検証する 3 x 3 = 9 のハイパーパラメーターの組み合わせが存在します。

Python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

MLlibでCrossValidatorを呼び出す際、DatabrikcsはMLflowを用いて自動ですべてのラン(トレーニングの実行)をトラッキングします。それぞれのモデルを日隠すためにMLflowのUI(AWS|Azure|GCP)を活用することもできます。

このサンプルでは、エスティメーターとして作成したパイプラインを使用します。

Python
# 3-foldのCrossValidatorの作成
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# 交差検証の実行。このステップは数分要し、交差検証の結果得られたベストなモデルを返却します
cvModel = cv.fit(trainDF)

以下のようにプロットでモデルを比較することができます。

Screen Shot 2022-05-05 at 16.31.06.png

Step 7. 予測の実行及びモデルパフォーマンスの評価

テストデータセットに対する予測を行うために、交差検証で特定されたベストなモデルを使用し、ROC曲線のAUCを用いてモデルのパフォーマンスを評価します。

Python
# テストデータセットに対する予測を行うために交差検証で特定されたベストなモデルを使用
cvPredDF = cvModel.transform(testDF)

# ROC曲線のAUCと精度に基づいてモデルのパフォーマンスを評価
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

ハイパーパラメーターチューニングを行うことで、AUCも精度も改善しています。

Screen Shot 2022-05-05 at 16.32.13.png

Databricksでは、SQLコマンドを用いて、年齢、職業ごとの予測結果を表示することができます。予測結果のデータセットに対する一時ビューを作成します。

Python
cvPredDF.createOrReplaceTempView("finalPredictions")

一時ビューに対してクエリーを実行します。

SQL
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation

Screen Shot 2022-05-05 at 16.34.22.png

SQL
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age

クエリーの結果をグラフで表示することも可能です。
Screen Shot 2022-05-05 at 16.34.50.png

Databricks 無料トライアル

Databricks 無料トライアル

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?