Scala
初心者
機械学習
データ分析
関数型プログラミング

Scalaでデータ分析にチャレンジする: データ処理のパイプラインを作る

記事を書こうと思った理由

データ分析と言えば、pythonやRなどの言語が主流です。理由としては、実装が簡単、ライブラリが豊富というのが主な理由であると思います。
実際にpythonやRで機械学習のライブラリ多少は叩いてみたこともありますが、初心者が軽く触ってもなんとなく結果がでるのはとてもおもしろかったです。ですが、自分が慣れていないということもあり、pythonで実装を行うと

  • 動的型付けなので、実行してみるまでうまくいくかわからない
  • 文字をうまくパースできなかった時に、エラーからうまくリカバリできない
  • ウェブスクレイピングなど、非同期的な処理がやりづらい
  • テスト、コードの再利用、可読性がいまいち

など、諸々大変なところが多かったです。もちろん、「いやいや、そんなのpythonでも余裕だよ!」とpythonに慣れている方は思うと思いますが、Scalaで普段開発している僕には割ときつい部分がありました。

そこで、データ分析の勉強をかねて、クリーニング=>モデルの学習=>検証それぞれのフェィズで、Scalaで上記の困ったところを解決するための自分なりのTIPSを随時まとめていきます。
スクリーンショット 2017-11-02 15.36.08.png

Scalaのバージョンは2.12を使います。
まだScalaを初めて一年ちょいの弱者+特にデータ分析も専門家というわけではないので、
違うところなどがあれば突っ込んでいただけると嬉しいです。

もし、下のコードをささっと試したい方は、Scastieでお試しください

この記事のゴール

  • Scalaを使ってデータ処理の流れを構築する

読んでくれそうな人

  • Scalaは初心者~中級者だが、Scalaを使ってデータ分析の勉強をしたい人

書かないこと

  • 圏論とかの理論(そのあたりの理論はよくわからないので、基本、理論ではなく、実利についてまとめていきます)
  • 機械学習とかの理論(そのあたりの理論は 略)

何はともあれ、まずはデータ処理の流れ(パイプライン)を作りたい

データの分析をするにあたって、まず、やりたいことととしては、

処理1=>処理2=>処理3...

のように処理をつなぐパイプラインを構築していきたいです。
具体例を書くなら、

データをサンプリングする=>サンプルの規格化をする=>規格化した中から抽出する

みたいな感じですね。
Scalaは静的型付けなので、型でしばれるはずです。イメージとしては、

[サンプルデータ型]=処理=>[規格化したデータ型]=処理=>[抽出したデータ型]

みたいな感じにしたいです。このように処理をつなげていくことで、好きなようにデータ処理の流れを作れるはずです。
そのため。データの流れを構築するような基盤となる
traitをまず作りたいです。
いろいろ調べてたら、こちらの本がとても参考になりました。

以下のようにパイプラインとなるメソッド|>を持つTransform classを構築します。

import scala.util.Try

abstract class Transform[T, A](config: Config) {
  // T:インプットする特徴量のデータ,A:アウトプットされるデータ
  self =>
  def |> : PartialFunction[T, Try[A]] // データ処理を行うメソッド

  def map[B](f: A => B): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t)

      override def apply(t: T): Try[B] = self.|>(t).map(f)
    }
  }

  def flatMap[B](f: A => Transform[T, B]): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t)

      override def apply(t: T): Try[B] = self.|>(t).flatMap(f(_).|>(t))
    }
  }

  def andThen[B](tr: Transform[A, B]): Transform[T, B] = new Transform[T, B](config) {
    override def |> = new PartialFunction[T, Try[B]] {
      override def isDefinedAt(t: T) =
        self.|>.isDefinedAt(t) && tr.|>.isDefinedAt(self.|>(t).get)

      override def apply(t: T) = tr.|>(self.|>(t).get)
    }
  }
}

Configはデータ分析で使用する設定の情報を持つ型として、

trait Config

case class ConfigInt(iParam: Int) extends Config

case class ConfigDouble(fParam: Double) extends Config

case class ConfigArrayDouble(fParams: Array[Double]) extends Config

case object ConfigNone extends Config

のように定義しておきます。

この型にラップすることで、データ処理の設定に関する情報を渡します。(例えばサンプル数は200がいい!とかですね。)

def |>T型(処理する前の型)と、B型(処理した後の型)をTryで包んだ型Try[B]
から作られる部分型PartialFunction[T, Try[B]]を返すメソッドです。部分関数に関してはこちら
今回は使いませんが、mapflatMap|>により実装します。
このメソッドとTrymapflatMapにより、

  for {
  u <- sampler |> t   // サンプル作成
  v <- normalizer |> u // サンプルデータの規格化
  w <- aggregator |> v // 規格化したデータから一部を抽出
} yield w

のようにデータのパイプラインを構築できはずです。
ここではsmaplerなどが具体的な処理を持つインスタンスですが、テストやコードの再利用性を考えると、依存性を注入(DI)して、
定義した方が良さそうです。
そこでMinimalCakePatternを使用して、
DIし、ワークフローを定義するWorkFlowトレイトを定義します。

trait UsesSampling[T, A] {
  val sampler: Transform[T, A]
}

trait UsesNormalization[T, A] {
  val normalizer: Transform[T, A]
}

trait UsesAggregation[T, A] {
  val aggregator: Transform[T, A]
}
trait Workflow[T, U, V, W] extends UsesSampling[T, U] with UsesNormalization[U, V] with UsesAggregation[V, W] {

  def ||>(t: T): Try[W] =
    for {
      u <- sampler |> t   // サンプル作成
      v <- normalizer |> u // サンプルデータの規格化
      w <- aggregator |> v // 規格化したデータから一部を抽出
    } yield w
}

T=>U=>Vと処理が進んでいきます。
これで大まかな流れはできました!

あとは具体的な処理をするロジックを定義するだけです!
そのためにTransformを継承した具体的な処理を責務としてクラスを作っていきます。
ちょっとややこしいところもあるので、大枠を知りたい人は無視してください。

以下このようなデータ処理を行いたいとします。

- 連続的な値の順列を作成
- その順列を[0,1]の間に収まるように規格化
- さらにその中から最大の値を抽出する

それぞれ実装していきます。

trait MixInSampling {
  val sampler: Transform[DblF, DblArray] = new Transform[DblF, DblArray](ConfigInt(samples)) {
    override def |> : PartialFunction[DblF, Try[DblArray]] = {
      case f: DblF => Try(Array.tabulate(samples)(n => f(1.0 * n / samples))) // 0,0.01,0.02,,,1.00までの値をfで変換した要素をもつベクトルを生成
    }
  }
}
// 最大値の最小値で規格化するロジックを持つclass
case class MinMax(values:DblArray){

  val range = (0.0, 0.0)

  // valuesのなかで最小な値をと最大な値を抽出する
  protected val minMax = values.foldLeft(range) { (mM, x) => {
    val min = mM._1
    val max = mM._2
    (if (x < min) x else min, if (x > max) x else max)
  }
  }

  val min = minMax._1
  val max = minMax._2

  //[lox,high]の間で正規化する yi-low=(xi-x_low/(x_hig-x_low)(high-low))
  def normalize(low: Double = 0.0, high: Double = 1.0): DblArray = {
    val ratio = (high - low) / (max - min)
    values.map(x => (x - min) * ratio.toDouble + low)
  }
}

// 上で定義したMinMaxにより、値を[0,1]の間に規格化する
trait MixInNormalizer {
  val normalizer: Transform[DblArray, DblArray] = new Transform[DblArray, DblArray](ConfigDouble(normRatio)) {
    override def |> = {
      case x: DblArray if (x.nonEmpty) => Try(MinMax[Double](x).normalize())
    }
  }
}
// 規格化した値の中から、一番大きな値のindexを抽出するロジックを定義
trait MixInAggregator {
  val aggregator = new Transform[DblArray, Int](ConfigInt(splits)) {
    override def |> : PartialFunction[DblArray, Try[Int]] = {
      case x: DblArray if x.nonEmpty => Try(x.indices.find(x(_) == 1.0).getOrElse(-1)) // 規格化したデータのうち最大の値(=1.0)の値のindexを取得する
    }
  }
}

では、最後にこれらのロジックを注入して、WorkFlowをインスタンス化して実行します。

  val workflow= new Workflow[DblF,DblArray,DblArray,Int] with MixInSampling with MixInNormalizer with MixInAggregator

  val g=(x:Double)=>Math.log(x+1.0)+Random.nextDouble // サンプルデータをこの関数により変換する

  val out=workflow ||> g

  println(out)

結果

Success(96)

無事実行できました!!!

結果は、96番目の値が最大値でした

MInimalCakePatternにより、ロジックの変更、注入、分解が非常に簡単になっているので、例えば、「最後のAggregationの処理をフローから除いて、サンプルのロジックを少し変えたいなあ。」
と言う場合は、以下のように少し変えるだけで簡単にできます

// 例えば、サンプルのロジックを入れ替え、aggregationを外したい場合
// 入れ替えることも、使い回す事も、拡張する事も自由自在
// ワークフローの流れを構築
trait WorkflowWithoutAgr[T, U, V] extends UsesSampling[T, U] with UsesNormalization[U, V]{

  def ||>(t: T): Try[V] =
    for {
      u <- sampler |> t
      v <- normalizer |> u
    } yield v
}
trait MixInSampling2 {
  val sampler: Transform[DblF, DblArray] = new Transform[DblF, DblArray](ConfigInt(samples)) {
    override def |> : PartialFunction[DblF, Try[DblArray]] = {
      case f: DblF => Try(Array.tabulate(samples)(n => f(Math.exp(1.0 * n / samples)))) // サンプルをexpにかけるように少し変更
    }
  }
  val workflowWithoutAgr=new WorkflowWithoutAgr[DblF,DblArray,DblArray] with MixInSampling2 with MixInNormalizer

  val out2 = workflowWithoutAgr ||> g

ことができました!めでたしめでたし!!

次は、データのパースや整形について、簡単なTIPSをまとめたいです。

まとめ

  • Transformクラスを定義することで、データのパイプラインを構築し、可読性と汎用性が高いコードになるようにした
  • 実際の処理で使うロジックはMinimalCakePatternで注入することで、コードの再利用性を高くした

リファレンス