Spark MLとBreezeでロジスティック回帰

  • 2
    Like
  • 0
    Comment

概要

機械学習手法を使って訓練と予測を行う際に、訓練フェーズではある言語・ライブラリを使用し、予測フェーズでは別の言語・ライブラリを使いたいというケースがありました。

本記事では、「広告配信システムを開発していて、広告がクリックされるかどうかをロジスティック回帰で予測する」という内容を題材にして、訓練はSparkバッチ、予測はScala + Breeze 1 で行う場合にどのようなイメージで実装できるかを紹介します。

なお、実装やデータなど色々と簡易化しているのでそのままは参考にできないと思いますが、全体のイメージや要素要素などで役立つ部分があれば幸いです。

サンプルコードは https://github.com/mura-s/spark-breeze-sample に掲載しています。

環境

ライブラリとそのバージョンは以下を使用しています。

  • Spark 2.1.0
  • Scala 2.11.11
  • sbt 0.13.15
  • Breeze 0.13.1

ロジスティック回帰

詳しくは説明しませんが、 sparkのドキュメント によると、以下のように記載されています。

Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes

(訳) ロジスティック回帰は、カテゴリ応答を予測する一般的な方法です。 それは、結果の確率を予測する一般化線形モデルの特殊なケースです。

このロジスティック回帰のモデルの訓練をSparkバッチで定期的に行っておき、リクエストごとに訓練結果のモデルを使って広告のクリック確率を予測するという流れです。

データ

今回使用する訓練データは data/train_data.csv に、テストデータは data/test_data.csv に配置してあります。

訓練データの一部:

0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1
1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9
1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7

訓練データのスキーマは以下のようになっており、 uid 〜 n2 まで全てのフィールドを特徴量として使用することにします。

/**
  * 訓練データ
  */
case class TrainData(
  label: Double,      // 正解ラベル (0.0: クリックしていない, 1.0: クリックした)
  uid: String,        // cookie等のuid
  hour: Int,          // 広告接触した時間 (0 〜 23)
  advertiserId: Int,  // 広告主ID (1 〜 10)
  campaignId: Int,    // キャンペーンID (1 〜 50)
  adId: Int,          // 広告ID (1 〜 100)
  siteId: Int,        // サイトID (1 〜 100)
  c1: Int,            // anonymized categorical variable1 (1 〜 100)
  c2: Int,            // anonymized categorical variable2 (1 〜 100)
  n1: Double,         // anonymized numerical variable1
  n2: Double          // anonymized numerical variable2
)

また、テストデータのスキーマは訓練データからlabelを除いたものです。

コメント内に (0 〜 23) , (1 〜 10) などと書いてあるものはそのフィールドの取りうる値の範囲です。簡易化のため、決め打ちしてしまっています。

Spark MLで訓練

「訓練データのロード -> データの前処理 -> 訓練」という流れで処理を行って、訓練後のモデルをデータストアに保存していきます。

訓練データのロード

訓練データはcsv形式のデータなので、spark-csvを使って Dataset[TrainData] としてロードしています。

val trainDs = spark.read
  .format("com.databricks.spark.csv")
  .schema(trainSchema)
  .load("data/train_data.csv")
  .as[TrainData]

訓練データの前処理

訓練データの各特徴量をロジスティック回帰の訓練で扱えるように、前処理を行って、org.apache.spark.ml.linalg.SparseVector に変換していきます。

今回は、String型のデータは Feature Hashing 2 で変換し、Int型のcategorical variableは One Hot Encoding 3 で変換することにします。

前処理の共通化

Spark MLにも前処理用のクラスが用意されており、 Feature Hashingでは org.apache.spark.ml.feature.HashingTF を、 One Hot Encodingでは org.apache.spark.ml.feature.OneHotEncoder を使用できるかと思います。

ただ今回は、Sparkで前処理とScala + Breezeでの前処理で共通の処理を行うために、自分で実装しています。

// Feature Hashing 用の共通クラス
object FeatureHashingUtil {
  val hashSize: Int = Math.pow(2.0, 15.0).toInt

  def indexOf(value: String): Int = {
    def nonNegativeMod(v: Int, mod: Int): Int = {
      val rawMod = v % mod
      rawMod + (if (rawMod < 0) mod else 0)
    }

    val hash = MurmurHash3.stringHash(value)
    nonNegativeMod(hash, hashSize)
  }
}

// One Hot Encoding 用の共通クラス
object OneHotEncodeUtil {
  def indexOf(value: Int, numFeatures: Int): Option[Int] =
    if (value < numFeatures) Some(value) else None
}

Spark ML側ではDatasetの加工用に、これらのクラスを使ってUDFを作成しておきます。

val featureHashing = udf { value: String =>
  val index = FeatureHashingUtil.indexOf(value)
  Vectors.sparse(FeatureHashingUtil.hashSize, Array(index), Array(1.0))
}

val oneHotEncode = udf { (value: Int, numFeatures: Int) =>
  OneHotEncodeUtil.indexOf(value, numFeatures) match {
    case Some(index) => Vectors.sparse(numFeatures, Array(index), Array(1.0))
    case None        => Vectors.sparse(numFeatures, Array.empty[Int], Array.empty[Double])
  }
}

これらのUDFを使用すると、特定のindexの値のみ 1.0 を持った Vector (もしくは全てのindexの値が 0.0 のVector) に変換されます。

前処理の実施

作成したUDFを使用して、ロードした訓練データに対して前処理を行います。
(各特徴量のカテゴリの長さは決め打ちしてしまっています。)

val preprocessedTrainDs = trainDs
  .select(
    col("label")                                as "label",
    featureHashing(col("uid"))                  as "uid",
    oneHotEncode(col("hour"),         lit(23))  as "hour",
    oneHotEncode(col("advertiserId"), lit(10))  as "advertiserId",
    oneHotEncode(col("campaignId"),   lit(50))  as "campaignId",
    oneHotEncode(col("adId"),         lit(100)) as "adId",
    oneHotEncode(col("siteId"),       lit(100)) as "siteId",
    oneHotEncode(col("c1"),           lit(100)) as "c1",
    oneHotEncode(col("c2"),           lit(100)) as "c2",
    col("n1")                                   as "n1",
    col("n2")                                   as "n2"
  )
  .as[PreprocessedTrainData]

訓練

org.apache.spark.ml.Pipeline を使って、訓練のフローを組み立てます。

// 各特徴量のフィールドを、1つの "features" というフィールドとして結合
val assembler = new VectorAssembler()
  .setInputCols(Array(
    "uid", "hour", "advertiserId", "campaignId", "adId", "siteId", "c1", "c2", "n1", "n2"
  ))
  .setOutputCol("features")

// ロジスティック回帰用のクラス (ハイパーパラメータは決め打ち)
val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.001)
  .setStandardization(false)

// Pipeline作成
val pipeline = new Pipeline()
  .setStages(Array(assembler, lr))

このPipelineを使用して訓練を行い、訓練後のモデルを取得します。

val model = pipeline.fit(preprocessedTrainDs)

訓練後のモデルの保存

訓練後のモデルを Scala + Breeze で予測する際に使用できるように、データストアに保存します。
今回は簡易化のために、ローカルの train_model ディレクトリ以下にファイルとして保存します。
ロジスティック回帰の予測では、重み (weights) と バイアス (bias) を使用して予測値を計算するので、これらをそれぞれ別ファイルに保存します。

val lrModel = model.stages.last.asInstanceOf[LogisticRegressionModel]

// 重みをカンマ区切りの形式で保存
val weightsPW = new PrintWriter("train_model/weights")
weightsPW.write(lrModel.coefficients.toArray.mkString(","))
weightsPW.close()

// バイアスを保存
val biasPW = new PrintWriter("train_model/bias")
biasPW.write(lrModel.intercept.toString)
biasPW.close()

予測

Scala + Breezeで予測を行った結果が、Sparkで予測を行った結果と一致することを確認するために、Sparkでも予測を行っておきます。

データのロードと前処理までは訓練と同じことを行い、その後、予測と予測確率の出力を行います。

// テストデータのロードと前処理 (省略)
val preprocessedTestDs = ...

// 予測
val predictedDf = model.transform(preprocessedTestDs)

// 予測確率を出力
predictedDf.select("probability")
    .collect()
    .foreach { case Row(v: Vector) => println(v(1)) }

これを spark-submit で実行すると、以下のような出力 (予測確率) を得られます。

0.9005777957499999
0.0032342605681243463
0.0019528956442807198
0.9923334807097671
0.005650920021960264

Scala + Breezeで予測

Spark MLで作成した訓練後のモデルを使用して、予測を行い、予測確率を出力していきます。
処理の流れは、「訓練後のモデルとテストデータのロード -> データの前処理 -> 予測確率を計算」という流れになります。

なおBreezeの使い方は、Breezeのgithub wikiにある Linear Algebra Cheat Sheet などが参考になるかと思います。

訓練後のモデルとテストデータのロード

保存先のデータストアから、それぞれ以下の型としてロードします。

// 重み、バイアス、テストデータのロード
val weights: SparseVector[Double] = ...
val bias: Double = ...
val testDataList: List[TestData] = ...

テストデータの前処理の実装

テストデータに対して、Spark ML側と全く同じ前処理を行って、データをBreezeのSparseVectorに変換します。

Spark ML側の説明で記載した FeatureHashingUtil と OneHotEncodeUtil を使用して、 Feature Hashing と One Hot Encoding の処理を実装します。

def oneHotEncoding(value: Int, numFeatures: Int): SparseVector[Double] = {
  OneHotEncodeUtil.indexOf(value, numFeatures) match {
    case Some(index)  => SparseVector(numFeatures)((index, 1.0))
    case None         => SparseVector.zeros(numFeatures)
  }
}

def featureHashing(value: String): SparseVector[Double] = {
  val index = FeatureHashingUtil.indexOf(value)
  SparseVector(FeatureHashingUtil.hashSize)((index, 1.0))
}

予測

テストデータのリスト ( testDataList: List[TestData] ) 1件ずつに対して、前処理と予測を行い、予測確率を出力します。
(各特徴量のカテゴリの長さはSpark ML側と同じ長さで決め打ちです。)

testDataList.foreach { data =>
  // 前処理 + vertcatで1つのVectorに結合
  val x = SparseVector.vertcat(
    featureHashing(data.uid),
    oneHotEncoding(data.hour, 23),
    oneHotEncoding(data.advertiserId, 10),
    oneHotEncoding(data.campaignId, 50),
    oneHotEncoding(data.adId, 100),
    oneHotEncoding(data.siteId, 100),
    oneHotEncoding(data.c1, 100),
    oneHotEncoding(data.c2, 100),
    SparseVector(Array(data.n1)),
    SparseVector(Array(data.n2))
  )

  // ロジスティック回帰で予測確率を計算
  val probability = sigmoid(bias + weights.dot(x))

  println(probability)
}

実行すると、以下のような出力 (予測確率) を得られます。

0.9005777957499999
0.0032342605681243463
0.0019528956442807198
0.9923334807097671
0.005650920021960264

Spark MLでの予測結果と同じ値になっているので、予測確率が正しく出力されていることがわかります。

最後に

Spark MLでのロジスティック回帰の訓練と、その結果を使用したScala + Breezeでの予測についての実装を紹介しました。

概要でも書きましたが、今回紹介したサンプルコードは https://github.com/mura-s/spark-breeze-sample にあります。


  1. Scalaの数値演算用ライブラリです。行列演算などを行うためのクラスやメソッドが用意されています。 

  2. https://ja.wikipedia.org/wiki/Feature_Hashing 

  3. https://spark.apache.org/docs/latest/ml-features.html#onehotencoder , https://en.wikipedia.org/wiki/One-hot