こちらではMLlibを用いた機械学習チュートリアルのサンプルノートブックの流れを説明します。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
翻訳済みのノートブックはこちらです。
このチュートリアルはApache Spark MLlibを使い始められるように設計されています。ここでは2値分類問題を取り扱います。デモグラフィックデータに基づいて、ある個人の収入が$50,000より高いかどうかを予測できるでしょうか?UCI Machine Learning Repositoryからのデータセット使用しており、このデータはDatabricksランタイム上で提供されています。このノートブックでは、データの前処理、機械学習パイプライン、いくつかの機械学習アルゴリズムなどMLlibで利用できる幾つかの機能をデモンストレーションします。
このノートブックには以下のステップが含まれています:
- データセットのロード
- 特徴量の処理
- モデルの定義
- パイプラインの構築
- モデルの評価
- ハイパーパラメーターチューニング
- 予測及びモデルパフォーマンスの評価
要件
Databricksランタイム7.0以降、あるいはDatabricks機械学習ランタイム7.0以降が必要です。
Step 1. データセットのロード
データの最初の数行を表示するためにDatabricksユーティリティを使用します。
%fs head --maxBytes=1024 databricks-datasets/adult/adult.data
データセットにはカラム名が含まれていないので、カラム名とデータ型を割り当てるためにスキーマを作成します。
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""
dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)
トレーニングセットとテストセットにランダムに分割し、再現性確保のためのシードを設定します。
前処理を行う前にデータを分割するのがベストです。これによって、モデルを評価する際にテストデータセットが新規のデータをより近い形でシミュレーションすることが可能となります。
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # トレーニングデータに複数回アクセスするのでキャッシュします
print(testDF.count())
データを確認してみましょう。
display(trainDF)
hours_per_week
の数の分布はどのようになっているでしょうか?
display(trainDF.select("hours_per_week").summary())
education
のステータスはどうなっているでしょうか?
display(trainDF
.groupBy("education")
.count()
.sort("count", ascending=False))
バックグラウンド: トランスフォーマー、エスティメーター、パイプライン
このノートブックで説明されるMLlibの機械学習においては3つの重要なコンセプトがあります。トランスフォーマー、エスティメーター、パイプラインです。
-
トランスフォーマー(Transformer): データフレームを入力とし新たなデータフレームを返却します。トランスフォーマーはデータからパラメーターを学習せず、モデルトレーニングのためのデータを準備する、あるいはトレーニングしたMLlibモデルを用いた予測を生成するために、シンプルにルールベースの変換処理を適用します。トランスフォーマーは
.transform()
メソッドで呼び出すことができます。 -
エスティメーター(Estimator):
.fit()
メソッド経由でデータフレームからパラメーターを学習(あるいは"フィット")し、トランスフォーマーであるモデルを返却します。 -
パイプライン(Pipeline): 複数のステップを容易に実行できるように単一のワークフローにまとめます。機械学習んモデルの生成においては、通常異なる数多くのステップのセットアップが含まれ、それらに対してイテレーションを行います。パイプラインは、このプロセスの自動化に役立ちます。
詳細はこちら:
Step 2. 特徴量の前処理
このノートブックのゴールはデータセットに含まれる特徴量(教育レベル、既婚・未婚、職業など)からincome
のレベルを予測するモデルを構築することです。MLlibで必要となるフォーマットになるように、特徴量を操作、前処理することが最初のステップとなります。
カテゴリー変数を数値に変換
線形回帰、ロジスティック回帰のようないくつかの機械学習アルゴリズムでは数値の特徴量が求められます。Adultデータセットには学歴、職業、既婚・未婚のようなカテゴリー変数が含まれています。
以下のコードブロックでは、カテゴリー変数を0あるいは1のみの値を取る一連の数値変数に変換するために、StringIndexer
とOneHotEncoder
の使い方を説明しています。
-
StringIndexer
は、文字列のカラムをラベルインデックスのカラムに変換します。例えば、 "red"、"blue"、"green"という値を0、1、2に変換します。 -
OneHotEncoder
は、カテゴリーのインデックスを2値のベクトルにマッピングします。それぞれの行で最大1つの"1"が存在し、それがその行のカテゴリー変数のインデックスに対応します。
Sparkにおけるワンホットエンコーディングは2ステップのプロセスになります。最初にStringIndexerを使用して、次にOneHotEncoderを使用します。以下のコードブロックではStringIndexerとOneHotEncoderを定義していますが、まだデータには適用していません。
詳細はこちら:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]
# 以下の2行はエスティメーターです。後ほどデータセットを変換するために適用する関数を返却します
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])
# ラベルのカラム("income")も文字列です。2つの値を取り得ます。"<=50K" と ">50K" です
# StringIndexerを用いて数値に変換します
labelToIndex = StringIndexer(inputCol="income", outputCol="label")
このノートブックでは、特徴量エンジニアリングとモデリングのステップすべてをまとめるパイプラインを構築します。しかし、上のコードブロックで作成したstringIndexer
を適用することで、エスティメーターとトランスフォーマーがどの容易動作するのかを詳しくみてみましょう。
データセットを変換するのに使用するStringIndexerModel
を返却してもらうために.fit()
メソッドを呼び出します。
StringIndexerModel
の.transform()
メソッドは、新たなカラムが追加された新規のデータフレームを返却します。必要であれば、右側にスクロールして新規のカラムを見てみてください。
詳細はこちら:
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))
すべての特徴量カラムを1つの特徴量ベクトルに結合
多くのMLlibアルゴリズムは入力として1つの特徴量カラムを要求します。このカラムのそれぞれの行には、予測で用いる一連の特徴量に対応するデータポイントのベクトルが含まれます。
MLlibはカラムのリストから1つのベクトルを作成するために、VectorAssembler
トランスフォーマーを提供します。
以下のコードブロックではVectorAssemblerの使い方を説明しています。
詳細はこちら:
from pyspark.ml.feature import VectorAssembler
# これにはデータセットの数値カラムと、ワンホットエンコーディングされた2値ベクトルカラムの両方が含まれています
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
Step 3. モデルの定義
このノートブックでは、ロジスティック回帰モデルを使用します。
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)
Step 4. パイプラインの構築
Pipeline
はトランスフォーマーとエスティメーターの順序ありリストです。データセットに適用される変換処理を自動化し、再現性を確保するためにパイプラインを定義することができます。このステップではパイプラインを定義し、テストデータセットに適用します。
StringIndexer
で見たのと同じように、Pipeline
はエスティメーターです。pipeline.fit()
メソッドはトランスフォーマーであるPipelineModel
を返却します。
詳細はこちら:
from pyspark.ml import Pipeline
# 以前のステップで作成したステージに基づいてパイプラインを定義
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])
# パイプラインモデルの定義
pipelineModel = pipeline.fit(trainDF)
# テストデータセットにパイプラインモデルを適用
predDF = pipelineModel.transform(testDF)
モデルの予測結果を表示します。features
カラムはワンホットエンコーディングされる場合にはよく起こり得る、多くのゼロから構成されるsparse vectorとなります。
display(predDF.select("features", "label", "prediction", "probability"))
Step 5. モデルの評価
display
コマンドにはビルトインのROC曲線オプションがあります。
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")
モデルを評価するためには、ROC曲線のAUC(曲線の下部の面積)を評価するためにBinaryClassificationEvaluator
を用い、精度を評価するためにMulticlassClassificationEvaluator
を使用します。
詳細はこちら:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")
Step 6. ハイパーパラメーターチューニング
MLlibはハイパーパラメーターチューニングと交差検証を行うための手段を提供しています。
- ハイパーパラメーターチューニングに対しては、
ParamGridBuilder
を用いることで、一連のモデルハイパーパラメーターに対するグリッドサーチを定義することができます。 - 交差検証については、
CrossValidator
を用いることで、エスティメーター(入力データセットに適用するパイプライン)、エバリュエーター、ハイパーパラメーターのグリッド空間、交差検証に用いるフォールド数を指定することができます。
詳細はこちら:
モデルをチューニングするために ParamGridBuilder
と CrossValidator
を使用します。このサンプルでは、3つの値のregParam
と3つの値のelasticNetParam
を用いるので、 CrossValidator
で検証する 3 x 3 = 9 のハイパーパラメーターの組み合わせが存在します。
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.5, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.build())
MLlibでCrossValidator
を呼び出す際、DatabrikcsはMLflowを用いて自動ですべてのラン(トレーニングの実行)をトラッキングします。それぞれのモデルを日隠すためにMLflowのUI(AWS|Azure|GCP)を活用することもできます。
このサンプルでは、エスティメーターとして作成したパイプラインを使用します。
# 3-foldのCrossValidatorの作成
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)
# 交差検証の実行。このステップは数分要し、交差検証の結果得られたベストなモデルを返却します
cvModel = cv.fit(trainDF)
以下のようにプロットでモデルを比較することができます。
Step 7. 予測の実行及びモデルパフォーマンスの評価
テストデータセットに対する予測を行うために、交差検証で特定されたベストなモデルを使用し、ROC曲線のAUCを用いてモデルのパフォーマンスを評価します。
# テストデータセットに対する予測を行うために交差検証で特定されたベストなモデルを使用
cvPredDF = cvModel.transform(testDF)
# ROC曲線のAUCと精度に基づいてモデルのパフォーマンスを評価
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")
ハイパーパラメーターチューニングを行うことで、AUCも精度も改善しています。
Databricksでは、SQLコマンドを用いて、年齢、職業ごとの予測結果を表示することができます。予測結果のデータセットに対する一時ビューを作成します。
cvPredDF.createOrReplaceTempView("finalPredictions")
一時ビューに対してクエリーを実行します。
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age