26
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Spark 1.6 Dataset APIをいち早く試してみる

Last updated at Posted at 2015-12-14

Apache Spark 1.6から新しく追加されたDataset APIを試してみる。
2015/12/14現在まだリリースされてないが、年内中にはリリースされるはず。

背景

  • RDDはLow Level APIで、としてフレキシブルだが、最適化が難しい
  • (Spark 1.3から登場した)DataFrameはHigh Level APIでオプティマイザーが最適化してくれるが、フレキシブルさがない。特にUDFの使い勝手が不便なところや型チェックに弱い

Dataset API 登場

  • 上記の問題を解決するためにSpark 1.6から実験的(Experimental)に登場したのがDataset APIである
  • RDDとDataFrameの良いところを併せ持つAPIとして開発されています。つまり、早くて使い勝手のよいAPIだと言えます。

SparkDatasets.png

画像はhttp://technicaltidbit.blogspot.jpより

Dataset APIの要件定義は SPARK-9999 に詳しく書かれてる通り、大きく下記の4つになります。

Fast
Typesafe
Java Compatible
Interoperates with DataFrames

Let's give it a try

Spark 1.6.0 ダウンロード

解凍後に、/usr/local/spark-1.6.0 へコピー

Sparkソースに同胞されてるファイルを使って比較してみる

/usr/local/spark-1.6.0/bin/spark-shell 起動

// 各自のパスを合わせて設定
scala> val peopleFile = "/usr/local/spark-1.6.0/examples/src/main/resources/people.json"

// JSON → DataFrame
scala> val df = sqlContext.read.json(peopleFile)
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// 中身確認
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

// Datasetを使うため、case class 定義
scala> case class Person(age: Long, name: String)

// DataFrameからDatasetに変換
scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

// DataFrameでの20歳以上の人を取得
// DataFrameは行列計算に特化したフレームワークなので、カラム名を指定する必要がある
scala> df.where($"age" >= 20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

// Datasetでの20歳以上の人を取得
// DatasetはDataFrameのRowをJVMオブジェクト(この例ではPerson)として扱えるため、UDFが適用が簡単
scala> ds.filter(_.age >= 20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

// Dataset → DataFrame
scala> val df2 = ds.toDF
df2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// Dataset → RDD
scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[121] at rdd at <console>:33

// 少し複雑な処理(年代別人数集計)
scala> import org.apache.spark.sql.types._

// DataFrameの場合
scala> :paste
df.where($"age" > 0)
  .groupBy((($"age" / 10) cast IntegerType) * 10 as "decade")
  .agg(count($"name"))
  .orderBy($"decade")
  .show

+------+-----------+
|decade|count(name)|
+------+-----------+
|    10|          1|
|    30|          1|
+------+-----------+

// Datasetの場合
scala> :paste
ds.filter(_.age > 0)
  .groupBy(p => (p.age / 10) * 10)
  .agg(count("name"))
   // orderByがないようなので、DFへ変換(これはやや不便)
  .toDF().withColumnRenamed("value", "decade").orderBy("decade") 
  .show

+------+-----------+
|decade|count(name)|
+------+-----------+
|    10|          1|
|    30|          1|
+------+-----------+

他の例はdatabricksサイトを参考にしてください

Dataset API以外のSpark1.6.0についてはこちら

まとめ

  • Dataset APIはRDD-likeに使えてDataFrameのパフォーマンスの利点も活かせる
  • DataFrameやRDDへ簡単も変換できる
  • まだ安定版ではないため、足りないファンクションもあるけど、今後に期待
26
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?