2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Frameless 〜 Scala の型安全な Spark ライブラリ

Last updated at Posted at 2019-08-11

Shapeless を使ったタイプフルな Spark ライブラリ Frameless について。

はじめに

Cats エコシステムのリストを見ると、Frameless なるものが含まれていて、"Expressive types for Spark" とある。個人的に最近 Spark を勉強しているところでもあったので、ちょっと調べてみた。

Frameless 概要

Spark のためのタイプフルな Scala ライブラリで次のような特徴がある。

  1. Shapeless を活用して型安全性に Dataset をラップした TypedDataset が提供される。
  • カラム指定が型安全。文字列型の列名による指定と違って、コンパイル時にカラムの存在がチェックされる。
  • エンコーダーの有無や組み込み関数の型の整合性もコンパイル時チェックされる。
  • キャストや射影((A, B) => (A) とか)なども型安全。
  1. Cats (Cats Effect, Cats MTL)との連携が、frameless-cats モジュールで提供される。
  2. TypedDataset 版の Spark ML API が、frameless-ml モジュールで提供される。

この記事では、特に 1 と 2 について書く。

セットアップ

テンプレートが提供されているので、以下のようにして Frameless プロジェクトの雛形が生成できる。

$ sbt new imarios/frameless.g8
name [project name]: frameless-trial01
organization [com.nothing]:
package [com.nothing]:
version [0.1]:

ここではプロジェクト名だけ frameless-trial01 と指定した。生成されたものを見てみると以下のようなファイルがある。

$ cd frameless-trial01
$ tree
.
├── build.sbt
├── project
│   └── build.properties
└── src
    └── main
        └── scala
            └── com
                └── nothing
                    └── frameless
                        └── MainFramelesstrial01.scala
8 directories, 3 files

build.sbt の中身を見てみるとバージョンが古かったりするのので、ここを参照して以下のように書き換えおく。

  • SparkVersion = "2.4.0"
  • FramelessVersion = "0.8.0"
  • scalaVersion := "2.12.8"
  • あとで使うので frameless-cats も追加しておく
  • scalacOptions や resolver も適当に追記する

こんな感じになる

main になるクラスも生成されているが、まずは REPL から試してみる。$ sbt console と叩くと、build.sbt ファイルにある初期化コードをひとしきり実行した後、普通にプロンプトが表示されるので、まず次のようなテストデータを用意する1

case class UserArtistData(userId: Int, artistId: Int, playCount: Long)
// defined class UserArtistData

val data = List(
  // userId | artistId | playCount
   ( 1      , 100      ,      10L ),
   ( 2      , 200      ,       1L ),
   ( 2      , 300      ,      12L )
).map(UserArtistData.tupled)
// data: List[UserArtistData] = List(UserArtistData(1,100,10), UserArtistData(2,200,1), UserArtistData(2,300,12))

List から Frameless の TypedDataset を生成するには以下のようにする。

val t = TypedDataset.create(data)
// t: frameless.TypedDataset[UserArtistData] = [userId: int, artistId: int ... 1 more field]

ためしに集計してみる。ユーザーごとの再生回数を合計するコードは下のようになる。

val agg = t.groupBy(t('userId)).agg(sum(t('playCount)))
// agg: frameless.TypedDataset[(Int, Long)] = [_1: int, _2: bigint]

ここで例えば、'userId'user としていたりするとコンパイルエラーになる。'Typed' たるゆえんで、リファクタリングするときにはかなり役に立つと思う。

表示を試してみると、、、

val job = agg.show()
// job: frameless.Job[Unit] = frameless.Job$$anon$4@2038d772

Job[Unit] が返されるが、まだ実行はされず結果は表示されない。プログラムの記述と実行の分離の観点ではまだ記述の段階ということになる。Job の代わりに Cats Effect の IO なども使えるが後述する。

以下のように実行できる。

job.run

+---+---+
| _1| _2|
+---+---+
|  1| 10|
|  2| 13|
+---+---+

以上、簡単な例で Frameless のタイプフルな面と、Job/SparkDelay を用いた記述と実行の分離を見てみた。次にそれぞれ個別に見てみる。

TypedDataset

まず Frameless のタイプフルな面を担う TypedDataset について。

Dataset と TypedDataset

比較

公式ドキュメントの比較を参考にして、Spark 謹製の DatasetFramelessTypedDataset を比べてみる。

まず引き続き REPL から、Dataset を使って以下のように二通りにフィルタリングする。

val ds = spark.createDataset(data)
ds.write.parquet("/tmp/foo") // 一旦書き出して読み直す
val ds = spark.read.parquet("/tmp/foo").as[UserArtistData]

val f1 = ds.filter($"playCount" >= 10).select($"playCount".as[Long])
val f2 = ds.filter(_.playCount >= 10).map(_.playCount)

f1f2 も共に Dataset[Long] 型になるが、

  • f1 は、手作業での明示的な型指定が必要だったり、$"playCount" が UserArtistData のフィールド名に一致しなくても実行時までエラーにならない。
  • f2 は Scala ライクに書けて、カラムの不整合もコンパイル時に検出できたりするが、うまく最適化されない場合があるという(f1f2 をそれぞれ explain() すると ReadSchema あたりに違いが出てくる)。

これを下のように TypedDataset でやってみると、、、

import frameless.syntax._
val t = ds.typed // これで TypedDataset[UserArtistData]になる
val f3 = t.filter(t('playCount) >= 10).select(t('playCount))
f3.explain()

t'playcount に該当するカラムがなければコンパイル時のエラーとなるし as[Long] もいらない。また explain()で表示される実行計画も、f1と同様に最適化されたものになっているのがわかる。

変換

  • DatasetTypedDataset の変換は、上のコードのように frameless.syntax を使って ds.typedとしても良いし、TypedDataset.create(ds) でも同じことになる。
  • TypedDatasetDataset の変換は、単に t.dataset とするだけで得られる。

DataFrame と TypedDataset

下のような簡単な DataFrame 操作を考えてみる。

import org.apache.spark.sql.functions.sum
val df = ds.toDF
df.groupBy("artistId").agg(sum("playCount").as("sum")).orderBy("sum").show()

+--------+---+
|artistId|sum|
+--------+---+
|     200|  1|
|     100| 10|
|     300| 12|
+--------+---+

これを TypedDataset で書くと、、、

import frameless.functions.aggregate.{sum => fsum}
val t = ds.typed
val aggregated = t.groupBy(t('artistId)).agg(fsum(t('playCount)))
aggregated.orderBy(aggregated('_2).asc).show().run

+---+---+
| _1| _2|
+---+---+
|200|  1|
|100| 10|
|300| 12|
+---+---+

スキーマが変わるたびに、中間の TypedDataset を一旦変数に出す必要が生じるので、DataFrame のような fluent interface なスタイルでは書けなくなる。クエリによっては途中経過の変数がやたらと増えるし、TypedDataset[T] の T がタプルなので、カラム指定が'_2'みたいなマジックナンバーだらけになったりする。

メソッド/関数のシグネーチャとしてだけ TypedDataset を使って、中では適当に割り切って Dataframe を使うとか、何らかの工夫や妥協が必要かもしれない2

変換については、TypedDataset ⇒ DataFrametoDF ですんなりできるが、DataFrame ⇒ TypedDataset は、途中で unsafe な操作を介するので、実行時例外が発生する場合がある。

Cats との連携

次に、Frameless のエフェクトの扱い、プログラムの記述と実行の分離について見てみる。

上のコードの t.show() では、Job[Unit]が返されていて、これを実行してコンソール上に結果(副作用)を得るにはさらにrunを呼ぶ必要があった。Frameless では collect などでもJob[Seq[UserArtistData]] が返される。

実は、型クラス SparkDelay のインスタンスさえあれば、Job に限らず他のデータタイプも使える。

SparkDelay は以下のようなもので、メソッド delay は Cats Effect の Sync#delay に暗黙の SparkSession を追加したような形になっている。

trait SparkDelay[F[_]] {
  def delay[A](a: => A)(implicit spark: SparkSession): F[A]
}

モジュール frameless-cats では、Sync[F] から SparkDelay[F] の導出が提供されているので、つまり Sync[F] のインスタンスがあるもの、例えば Cats Effect であれば IO などが Job の代わりに使えることになる。

以降、Giter8 テンプレートで生成された叩き台の MainFramelesstrial01 を、Cats Effect ベースに改造してみる。MTL なども適当に使う。

MainFramelesstrial01.scala は以下のようなものだった。よく見るとリソースの準備・開放、プログラムの記述、プログラムの実行が一体に癒着している(complect)。

object MainFramelesstrial01 extends App {
  val conf = new SparkConf().setMaster("local[*]").setAppName("frameless-first-example").set("spark.ui.enabled", "false")
  implicit val spark = SparkSession.builder().config(conf).appName("frameless-trial01").getOrCreate()

  import spark.implicits._

  spark.sparkContext.setLogLevel("WARN")

  import frameless.TypedDataset

  case class Bar(b: String, a: Long)
  case class Foo(f: Long, o: Bar)

  val fTypedDataset = TypedDataset.create(Foo(1,Bar("a",2)) :: Foo(10,Bar("b",20)) :: Nil)

  fTypedDataset.show().run()

  // Closing
  spark.stop()
}

もちろん小さな雛形プログラムだから別に問題ではないが、Frameless + Cats のデモのために敢えて分解・再合成してみる。

まず、SparkSession の生成と停止に関連するコードを、Cats Effect の Bracket を用いてラップする。

trait UseSpark[F[_]] {
  private def acquire(implicit S: Sync[F]): F[SparkSession] = S.delay {
    val conf = new SparkConf() .setMaster("local[*]") .setAppName("frameless-first-example") .set("spark.ui.enabled", "false")
    implicit val spark: SparkSession = SparkSession.builder().config(conf).appName("trial02").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    spark
  }
  private def release(spark: SparkSession)(implicit S: Sync[F]): F[Unit] = S.delay(spark.stop())

  def useSpark(use: SparkSession => F[Unit])(implicit S: Sync[F]): F[Unit] =
    S.bracket(acquire)(use)(release)
}

プログラムの記述は以下のように書いてみる。文脈からのSparkSession取得を、MTL の ApplicativeAsk で表現してみた。記述レベルでは、IOReaderT といった具体的なデータタイプに束縛せず、型クラスによる制約だけで書いている。

type SparkAsk[F[_]] = ApplicativeAsk[F, SparkSession]
...  
def apply[F[_]](implicit S: Sync[F], F: SparkAsk[F]): F[Unit] =
  F.ask.flatMap { implicit spark =>
    val fTypedDataset = TypedDataset.create(Foo(1,Bar("a",2)) :: Foo(10,Bar("b",20)) :: Nil)
    fTypedDataset.show[F]()
  }

この二つに具体的な型クラスインスタンスを与えて、実行を組み立てるコードは、例えば Job の代わりが IO(というか ReaderT[IO, SparkSession, ?])ならば以下のように書ける。

type Action[T] = ReaderT[IO, SparkSession, T]

object MainTrial02 extends IOApp with UseSpark[IO] {
  def run(args: List[String]): IO[ExitCode] =
    useSpark(program[Action].run) as ExitCode.Success
}

ソースはここにおいた。

collect()show() といったメソッドが F[_] でラップされているなら、cacheunpersist といったメソッドも同様にエフェクトを切り出していそうな気がするが、何故かそうなはっていない模様。

おわりに

  • TypedDataset については、型安全性と可読性がトレードオフになってしまう場合がけっこう多い気がする。上手く工夫しないと可読性が落ちて逆効果かもしれない。試しに『Advanced Analysis with Spark』の3章のサンプルを Frameless を使って書き換えてみたが、型安全のために失った可読性も結構大きかもしれない(ソースはここにおいた)。

  • Spark プログラミングのようなデータ分析系の場合、DDD的なドメインモデルを関数型な手法でプログラミングするような場合ほどには、記述と実行の分離だとかシンプリシティだとかコンポーザビリティといったものの追求には、実際あまりモチベーションがなさそうな気もする。この記事では敢えて実験として、やろうと思えば Frameless + Cats でそこそこ関数型っぽいプログラムが書けることを示してみた。

  • frameless-ml モジュールについて今回書かなかったが、ある程度習熟したら近いうち書きたい。

  1. 『Advanced Analysis with Spark』の3章で使われている、リスナー、アーティスト、再生回数の組

  2. この辺は Dataset と DataFrame の併用・使い分けでも似たような事情かもしれない。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?