\CyberAgent Developers #2 Advent Calendar 2018 12/3 の投稿です/
1000% Sparking!
みなさま Apache Spark でビックデータと戯れてますか?
DataFrame/Dataset が導入されてから RDD の時代よりずっと簡単に集計できるようになりましたね。
これらは名前からして Python 使いにお馴染み Pandas の DataFrame のようなものなのですが、Spark は Scala なので型情報がリッチです。わぁい静的型あかり静的型だいすき。
Spark では type DataFrame = Dataset[Row]
と DataFrame は Dataset の一種として定義されていて、 Dataset は任意の型パラメータを取ることができます。たとえば、
case class Person(name: String, height: Int)
val people: Dataset[Person]
のように型でスキーマを表現することができます。なので型安全というわけですね……!
というのはわりと初見殺しで、私も含めチームメンバーが軒並みハマっていたので、メモしておこうと思います。
Datasetの操作は型安全じゃない
まずは次のような Dataset を用意してみましょう。
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
case class Person(name: String, height: Int)
val people: Dataset[Person] = Seq(Person("Sayori", 157), Person("Natsuki", 149), Person("Yuri", 165)).toDS()
とりあえず people.show()
で中身を覗いてみます。
+-------+------+
| name|height|
+-------+------+
| Sayori| 157|
|Natsuki| 149|
| Yuri| 165|
+-------+------+
name カラムを SELECT してみましょう。
val names: DataFrame = people.select("name")
残念ながら、型が DataFrame になってしまいました。
Dataset は SELECT や JOIN などスキーマを変えるような操作をすると、すぐ DataFrame になってしまい型からスキーマが分からなくなってしまいます。
もちろん値としてはスキーマ情報を持っていて names.show()
をすると期待通りの中身ですね。
+-------+
| name|
+-------+
| Sayori|
|Natsuki|
| Yuri|
+-------+
では people.select("age")
として存在しないカラムを SELECT するとどうなるでしょうか。
[error] (run-main-2) org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [name, height];;
[error] 'Project ['age]
[error] +- LocalRelation [name#2, height#3]
[error] org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [name, height];;
[error] 'Project ['age]
[error] +- LocalRelation [name#2, height#3]
とコンパイルは通るのですが、実行時エラーが出てしまうのですね。。。
まぁこれは SELECT するカラムの情報が型レベルには存在しないので、コンパイル時にチェックできないのは当たり前のことではあります。
では型パラメータを取るメソッドではどうでしょうか。Dataset の型変換をしてみましょう。
case class Height(height: Int)
val heights: Dataset[Height] = people.as[Height]
これはもちろん大丈夫で、
case class Age(age: Int)
val ages: Dataset[Age] = people.as[Age]
ではまた存在しないカラムに型変換をしてみようとすると、どうでしょうか。
[error] (run-main-0) org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [name, height];
[error] org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [name, height];
これもコンパイルは通るのですが、実行時エラーです。。。
型パラメータ渡してるのでコンパイル時チェックしてほしいところですが、素朴に実現しようとすると case class 同士の Structural Subtyping が必要になるので、実はそう簡単な話ではないのですね。残念。
Datasetの型パラメータはスキーマじゃない
さて、上記どおり型安全じゃないことは触ってるとすぐに気づくとは思うのですが、スキーマの罠はともすると混乱しやすいところです。先ほどの例を再掲します。
case class Height(height: Int)
val heights: Dataset[Height] = people.as[Height]
ここだけ抜きだすと heights
はカラムを一つだけ持っているように見えるのですが、おもむろに heights.show()
してみるとこうなります。
+-------+------+
| name|height|
+-------+------+
| Sayori| 157|
|Natsuki| 149|
| Yuri| 165|
+-------+------+
おや name カラムが生きてますね? ということは、
heights.select("name").show()
こんな操作も問題なくて、
+-------+
| name|
+-------+
| Sayori|
|Natsuki|
| Yuri|
+-------+
と正常に実行されてしまいます。
heights
の型 Dataset[Height]
に name
の情報はどこにもないのに……!
つまり Dataset の型パラメータを見ても、正確なスキーマは分からないのです。。。
拡張メソッドで自然に型変換しよう
上記のスキーマの問題はなかなか厄介で、分かっていても複雑な集計をしているとうっかりしがちで、ファイル出力してみたら意図しないカラムが紛れてたなんてことが起こります。あと型パラメータの順番通りにカラムが並んでないのもありがちですね。
そんな事故を防ぐためには as
する前に select
すればいいのですが、カラムが増えてくると、
case class Monika(m: Just[Monika], o: Just[Monika], n: Just[Monika], i: Just[Monika], k: Just[Monika], a: Just[Monika])
people.select("m", "o", "n", "i", "k", "a").as[Monika]
みたいになってきて面倒なんですよね。DRYの原則にも反しますし。
ということで暗黙の型変換を利用して、
implicit class DataSetHelper[T](val dataset: Dataset[T]) extends AnyVal {
def selectAs[U](implicit encoder: Encoder[U]): Dataset[U] =
dataset.select(encoder.schema.fields.map(f => new ColumnName(f.name)): _*).as[U]
}
という拡張メソッドを用意しておくと、
people.selectAs[Monika]
と書けて便利です。 as
する大抵のシチュエーションでは、この selectAs
を使った方が自然なのではないかと思っていて、実際うちのプロダクトでは selectAs
を多用しています。
Datasetの型パラメータは何なのか
さて拡張メソッドを作ったのはいいのですが、型安全ではなくスキーマも表してないとなると、はたして Dataset の型パラメータは要らない子なのでしょうか?
もちろん、そんなことはなくて。たとえば DataFrame を受け取って複雑なデータ処理をしてから DataFrame を返す関数なんてものを考えると、これはかなりつらいわけですね。呼び出し側からすると、どんなスキーマの DataFrame を渡せばいいかも、どんなスキーマの DataFrame が返ってくるかも、実装を読みとかないと分からないので。
たしかに Dataset の型パラメータは不完全ですが、上記の例のように少なくとも存在しないカラムに型変換しようとすると、実行時エラーを吐いてくれます。つまりこれは型変換できるかどうかの assert であって、契約プログラミング的に捉えればいいのですね。
まぁ実のところ Dataset の真価は、普通の Scala コレクションのように無名関数を食わせて
val justMonika: Dataset[Person] = people.map(_.copy(name = "Monika"))
と map できたり、
val justMonika: Dataset[Person] = people.filter(_.name == "Monika")
filter できたりするところにあります。
型安全なDatasetが欲しくなったら
まだ試してないのですが frameless というライブラリに、 Dataset のラッパークラス TypedDataset があるようです。 shapeless を使っているとのことで、型レベルプログラミングに抵抗ないチームであれば、検討してみても良いと思います。
実行環境
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
scalaVersion := "2.12.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0")
sbt.version=1.2.6