Shapeless を使ったタイプフルな Spark ライブラリ Frameless について。
はじめに
Cats エコシステムのリストを見ると、Frameless なるものが含まれていて、"Expressive types for Spark" とある。個人的に最近 Spark を勉強しているところでもあったので、ちょっと調べてみた。
Frameless 概要
Spark のためのタイプフルな Scala ライブラリで次のような特徴がある。
- Shapeless を活用して型安全性に
Dataset
をラップしたTypedDataset
が提供される。
- カラム指定が型安全。文字列型の列名による指定と違って、コンパイル時にカラムの存在がチェックされる。
- エンコーダーの有無や組み込み関数の型の整合性もコンパイル時チェックされる。
- キャストや射影((A, B) => (A) とか)なども型安全。
- Cats (Cats Effect, Cats MTL)との連携が、frameless-cats モジュールで提供される。
-
TypedDataset
版の Spark ML API が、frameless-ml モジュールで提供される。
この記事では、特に 1 と 2 について書く。
セットアップ
テンプレートが提供されているので、以下のようにして Frameless プロジェクトの雛形が生成できる。
$ sbt new imarios/frameless.g8
name [project name]: frameless-trial01
organization [com.nothing]:
package [com.nothing]:
version [0.1]:
ここではプロジェクト名だけ frameless-trial01 と指定した。生成されたものを見てみると以下のようなファイルがある。
$ cd frameless-trial01
$ tree
.
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── com
└── nothing
└── frameless
└── MainFramelesstrial01.scala
8 directories, 3 files
build.sbt の中身を見てみるとバージョンが古かったりするのので、ここを参照して以下のように書き換えおく。
- SparkVersion = "2.4.0"
- FramelessVersion = "0.8.0"
- scalaVersion := "2.12.8"
- あとで使うので frameless-cats も追加しておく
- scalacOptions や resolver も適当に追記する
こんな感じになる
main になるクラスも生成されているが、まずは REPL から試してみる。$ sbt console
と叩くと、build.sbt ファイルにある初期化コードをひとしきり実行した後、普通にプロンプトが表示されるので、まず次のようなテストデータを用意する1。
case class UserArtistData(userId: Int, artistId: Int, playCount: Long)
// defined class UserArtistData
val data = List(
// userId | artistId | playCount
( 1 , 100 , 10L ),
( 2 , 200 , 1L ),
( 2 , 300 , 12L )
).map(UserArtistData.tupled)
// data: List[UserArtistData] = List(UserArtistData(1,100,10), UserArtistData(2,200,1), UserArtistData(2,300,12))
List
から Frameless の TypedDataset
を生成するには以下のようにする。
val t = TypedDataset.create(data)
// t: frameless.TypedDataset[UserArtistData] = [userId: int, artistId: int ... 1 more field]
ためしに集計してみる。ユーザーごとの再生回数を合計するコードは下のようになる。
val agg = t.groupBy(t('userId)).agg(sum(t('playCount)))
// agg: frameless.TypedDataset[(Int, Long)] = [_1: int, _2: bigint]
ここで例えば、'userId
を 'user
としていたりするとコンパイルエラーになる。'Typed' たるゆえんで、リファクタリングするときにはかなり役に立つと思う。
表示を試してみると、、、
val job = agg.show()
// job: frameless.Job[Unit] = frameless.Job$$anon$4@2038d772
Job[Unit]
が返されるが、まだ実行はされず結果は表示されない。プログラムの記述と実行の分離の観点ではまだ記述の段階ということになる。Job
の代わりに Cats Effect の IO
なども使えるが後述する。
以下のように実行できる。
job.run
+---+---+
| _1| _2|
+---+---+
| 1| 10|
| 2| 13|
+---+---+
以上、簡単な例で Frameless のタイプフルな面と、Job/SparkDelay を用いた記述と実行の分離を見てみた。次にそれぞれ個別に見てみる。
TypedDataset
まず Frameless のタイプフルな面を担う TypedDataset
について。
Dataset と TypedDataset
比較
公式ドキュメントの比較を参考にして、Spark 謹製の Dataset
と Frameless
の TypedDataset
を比べてみる。
まず引き続き REPL から、Dataset
を使って以下のように二通りにフィルタリングする。
val ds = spark.createDataset(data)
ds.write.parquet("/tmp/foo") // 一旦書き出して読み直す
val ds = spark.read.parquet("/tmp/foo").as[UserArtistData]
val f1 = ds.filter($"playCount" >= 10).select($"playCount".as[Long])
val f2 = ds.filter(_.playCount >= 10).map(_.playCount)
f1
も f2
も共に Dataset[Long]
型になるが、
-
f1
は、手作業での明示的な型指定が必要だったり、$"playCount"
が UserArtistData のフィールド名に一致しなくても実行時までエラーにならない。 -
f2
は Scala ライクに書けて、カラムの不整合もコンパイル時に検出できたりするが、うまく最適化されない場合があるという(f1
とf2
をそれぞれexplain()
すると ReadSchema あたりに違いが出てくる)。
これを下のように TypedDataset
でやってみると、、、
import frameless.syntax._
val t = ds.typed // これで TypedDataset[UserArtistData]になる
val f3 = t.filter(t('playCount) >= 10).select(t('playCount))
f3.explain()
t
に 'playcount
に該当するカラムがなければコンパイル時のエラーとなるし as[Long]
もいらない。また explain()
で表示される実行計画も、f1
と同様に最適化されたものになっているのがわかる。
変換
-
Dataset
→TypedDataset
の変換は、上のコードのようにframeless.syntax
を使ってds.typed
としても良いし、TypedDataset.create(ds)
でも同じことになる。 -
TypedDataset
→Dataset
の変換は、単にt.dataset
とするだけで得られる。
DataFrame と TypedDataset
下のような簡単な DataFrame
操作を考えてみる。
import org.apache.spark.sql.functions.sum
val df = ds.toDF
df.groupBy("artistId").agg(sum("playCount").as("sum")).orderBy("sum").show()
+--------+---+
|artistId|sum|
+--------+---+
| 200| 1|
| 100| 10|
| 300| 12|
+--------+---+
これを TypedDataset
で書くと、、、
import frameless.functions.aggregate.{sum => fsum}
val t = ds.typed
val aggregated = t.groupBy(t('artistId)).agg(fsum(t('playCount)))
aggregated.orderBy(aggregated('_2).asc).show().run
+---+---+
| _1| _2|
+---+---+
|200| 1|
|100| 10|
|300| 12|
+---+---+
スキーマが変わるたびに、中間の TypedDataset
を一旦変数に出す必要が生じるので、DataFrame
のような fluent interface なスタイルでは書けなくなる。クエリによっては途中経過の変数がやたらと増えるし、TypedDataset[T]
の T がタプルなので、カラム指定が'_2'
みたいなマジックナンバーだらけになったりする。
メソッド/関数のシグネーチャとしてだけ TypedDataset
を使って、中では適当に割り切って Dataframe
を使うとか、何らかの工夫や妥協が必要かもしれない2。
変換については、TypedDataset ⇒ DataFrame
は toDF
ですんなりできるが、DataFrame ⇒ TypedDataset
は、途中で unsafe な操作を介するので、実行時例外が発生する場合がある。
Cats との連携
次に、Frameless のエフェクトの扱い、プログラムの記述と実行の分離について見てみる。
上のコードの t.show()
では、Job[Unit]
が返されていて、これを実行してコンソール上に結果(副作用)を得るにはさらにrun
を呼ぶ必要があった。Frameless では collect
などでもJob[Seq[UserArtistData]]
が返される。
実は、型クラス SparkDelay
のインスタンスさえあれば、Job
に限らず他のデータタイプも使える。
SparkDelay
は以下のようなもので、メソッド delay
は Cats Effect の Sync#delay
に暗黙の SparkSession
を追加したような形になっている。
trait SparkDelay[F[_]] {
def delay[A](a: => A)(implicit spark: SparkSession): F[A]
}
モジュール frameless-cats では、Sync[F]
から SparkDelay[F]
の導出が提供されているので、つまり Sync[F]
のインスタンスがあるもの、例えば Cats Effect であれば IO
などが Job
の代わりに使えることになる。
以降、Giter8 テンプレートで生成された叩き台の MainFramelesstrial01
を、Cats Effect ベースに改造してみる。MTL なども適当に使う。
MainFramelesstrial01.scala は以下のようなものだった。よく見るとリソースの準備・開放、プログラムの記述、プログラムの実行が一体に癒着している(complect)。
object MainFramelesstrial01 extends App {
val conf = new SparkConf().setMaster("local[*]").setAppName("frameless-first-example").set("spark.ui.enabled", "false")
implicit val spark = SparkSession.builder().config(conf).appName("frameless-trial01").getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
import frameless.TypedDataset
case class Bar(b: String, a: Long)
case class Foo(f: Long, o: Bar)
val fTypedDataset = TypedDataset.create(Foo(1,Bar("a",2)) :: Foo(10,Bar("b",20)) :: Nil)
fTypedDataset.show().run()
// Closing
spark.stop()
}
もちろん小さな雛形プログラムだから別に問題ではないが、Frameless + Cats のデモのために敢えて分解・再合成してみる。
まず、SparkSession の生成と停止に関連するコードを、Cats Effect の Bracket
を用いてラップする。
trait UseSpark[F[_]] {
private def acquire(implicit S: Sync[F]): F[SparkSession] = S.delay {
val conf = new SparkConf() .setMaster("local[*]") .setAppName("frameless-first-example") .set("spark.ui.enabled", "false")
implicit val spark: SparkSession = SparkSession.builder().config(conf).appName("trial02").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark
}
private def release(spark: SparkSession)(implicit S: Sync[F]): F[Unit] = S.delay(spark.stop())
def useSpark(use: SparkSession => F[Unit])(implicit S: Sync[F]): F[Unit] =
S.bracket(acquire)(use)(release)
}
プログラムの記述は以下のように書いてみる。文脈からのSparkSession
取得を、MTL の ApplicativeAsk
で表現してみた。記述レベルでは、IO
や ReaderT
といった具体的なデータタイプに束縛せず、型クラスによる制約だけで書いている。
type SparkAsk[F[_]] = ApplicativeAsk[F, SparkSession]
...
def apply[F[_]](implicit S: Sync[F], F: SparkAsk[F]): F[Unit] =
F.ask.flatMap { implicit spark =>
val fTypedDataset = TypedDataset.create(Foo(1,Bar("a",2)) :: Foo(10,Bar("b",20)) :: Nil)
fTypedDataset.show[F]()
}
この二つに具体的な型クラスインスタンスを与えて、実行を組み立てるコードは、例えば Job
の代わりが IO
(というか ReaderT[IO, SparkSession, ?]
)ならば以下のように書ける。
type Action[T] = ReaderT[IO, SparkSession, T]
object MainTrial02 extends IOApp with UseSpark[IO] {
def run(args: List[String]): IO[ExitCode] =
useSpark(program[Action].run) as ExitCode.Success
}
ソースはここにおいた。
※ collect()
や show()
といったメソッドが F[_]
でラップされているなら、cache
や unpersist
といったメソッドも同様にエフェクトを切り出していそうな気がするが、何故かそうなはっていない模様。
おわりに
-
TypedDataset については、型安全性と可読性がトレードオフになってしまう場合がけっこう多い気がする。上手く工夫しないと可読性が落ちて逆効果かもしれない。試しに『Advanced Analysis with Spark』の3章のサンプルを Frameless を使って書き換えてみたが、型安全のために失った可読性も結構大きかもしれない(ソースはここにおいた)。
-
Spark プログラミングのようなデータ分析系の場合、DDD的なドメインモデルを関数型な手法でプログラミングするような場合ほどには、記述と実行の分離だとかシンプリシティだとかコンポーザビリティといったものの追求には、実際あまりモチベーションがなさそうな気もする。この記事では敢えて実験として、やろうと思えば Frameless + Cats でそこそこ関数型っぽいプログラムが書けることを示してみた。
-
frameless-ml モジュールについて今回書かなかったが、ある程度習熟したら近いうち書きたい。