4
3

More than 3 years have passed since last update.

3つのApache Spark APIの物語:RDD対データフレーム、データセット

Posted at

RDD vs DataFrames and Datasets: A Tale of Three Apache Spark APIsの翻訳です。

開発者が嬉しいと思うことにおいて、開発者を生産的にする、すなわち、利用しやすく、直感的かつ多彩な表現な可能な一連のAPIに勝るものはありません。Apach Sparkの開発者へのアピールポイントの一つには、大規模データセットをScala、Java、Python、Rといった様々な言語で簡単に使えるAPIがあります。

この記事では、Apache Spark 2.2以降で利用できる3種類のAPI、RDD、データフレーム、データセットを探索していきます。それぞれのAPIをなぜ、どのようなときに使うべきなのか。それぞれの性能、最適化のメリットの概要。RDDではなくデータフレーム、データセットを使うべきシナリオ。特に、Apache Spark 2.0で統合されたデータフレームとデータセットにフォーカスします。

この統合の主な動機づけは、学習すべきコンセプトの数を制限し、構造化データを処理する手段を提供することで、Sparkをシンプルなものにするという我々のリクエストです。そして、この構造を通じて、このドメイン固有の言語要素として、Sparkは高レベルの抽象化とAPIを提供します。

Resilient Distributed Dataset (RDD)

RDDはSparkの誕生以来、ユーザーの目に触れる主要なAPIでした。コアにおいては、RDDはあなたのデータの要素の不変分散コレクションであり、トランスフォーメーションアクションを提供する低レベルAPIを用いて、並列で処理されるクラスターのノードのパーティショニングされます。

いつRDDを使うのか?

RDDを用いるシナリオ、一般的なユースケースを考えてみます。

  • データセットをコントロールするために、低レベルのトランスフォーメーションとアクションが必要
  • メディアストリームやテキストストリームのようにデータが非構造化データである
  • ドメイン固有表現よりも機能的プログラム要素を用いてデータを操作したい
  • 名前やカラムによってデータの属性にアクセスしつつも、列フォーマットなどのスキーマを強制しようと思わない、そして
  • 構造化、準構造化データに対するデータフレーム、データセットで得られる最適化、性能を犠牲にできる

Apache Spark 2.0ではRDDに何が起きたのか?

こう質問するかもしれません:RDDは二級市民として追いやられたのですか?時代遅れのものなのですか?

答えは完全にNOです!

さらに、以下でわかるように、好きな時かつシームレスに、データフレーム、データセットとRDDとの間を、シンプルなAPI呼び出しで行ったり来たりすることができます。そして、データフレームとデータセットはRDDの上に構築されています。

データフレーム

RDD同様、データフレームはデータの不変分散コレクションです。RDDと異なり、リレーショナルデータベースのテーブルのように、データは名前付きカラムで整理されます。大規模データの処理を容易にするように設計されており、開発者はデータの分散コレクションに構造をもたらし、高レベルの抽象化を実現できます。分散データを操作するためにドメイン固有言語のAPIを提供し、専門のデータエンジニアだけでなく、より広いユーザーがSparkにアクセスできるようになります。

Apache Spark 2.0ウェビナーおよびブログ記事において、Spark 2.0においては、データフレームAPIとデータセットAPIはマージされ、ライブラリ全体で処理機能が統合されると言及しました。この統合により、開発者が学ぶべきコンセプトの数は減り、データセットと呼ばれる単一の高レベル、型安全なAPIを利用できるようになりました。

データセット

Spark 2.0からは、データセットは二つの異なるAPIの特性を有することになります:強い型付け(strongly-typed)のAPIと型付けのない(untyped)APIです。概念上、データフレームは、一般的なオブジェクトDataset[Row]のコレクションに対するエイリアスです。ここで、Rowは一般的な型付けのないJVMオブジェクトです。一方、データセットは、強い型付けのJVMオブジェクトであり、ScalaあるいはJavaクラスで定義するケースクラスで表現されます。

型あり、型なしのAPI

言語 メインの抽象化
Scala Dataset[T] & DataFrame (Dataset[Row]に対するエイリアス)
Java Dataset[T]
Python* DataFrame
R* DataFrame

注意: PythonとRはコンパイル時の型安全がないため、データフレームという型なしのAPIしかありません。

データセットAPIの利点

Spark2.0では、Spark開発者として多くの点でデータフレーム、データセットの統合APIのメリットを享受することができます。

1. 静的型付け、実行時の型安全

静的型付けと実行時の型安全を、最も制限のないSQLから最も制限のあるデータフレームまでのスペクトルとして見てみましょう。例えば、Spark SQLの文字列クエリーにおいては、実行時(ランタイム)まで文法エラーを知ることはできず、手間がかかる可能性がありますが、データフレーム、データセットにおいては、コンパイル時にエラーを検知することができ、開発者の時間とコストを節約できます。すなわち、APIに含まれない関数をデータフレームで起動した際には、コンパイラがそれを捕捉します。しかし、実行するまでは存在しないカラム名を検知することはできません。

スペクトルの最も遠くに存在するのが、最も制限のあるデータセットです。データセットAPIはラムダ関数とJVM型付けオブジェクトで表現されるので、型付けされたパラメーターのミスマッチはコンパイル時点で検知されます。また、データセットを使った際には、あなたのコードの解析エラーもまたコンパイル時に検知されるので、開発者の時間とコストを節約することができます。

これら全ては、あなたのSparkコードにおける文法、解析エラーに対する型安全のスペクトルとして表現することができ、データセットが最も制限が強くかつ開発者の生産性を高めることができます。

2. ハイレベルの抽象化、構造化データ、非構造化データに対するカスタムビュー

Datasets[Row]のコレクションとしてのデータフレームは、お手元の準構造化データに対する構造化カスタムビューを表現することができます。例えば、JSONで表現される膨大なIoTデバイスのイベントデータが手元にあるとしましょう。JSONは準構造化フォーマットであるため、強い型付けのDataset[DeviceIoTData]のコレクションとしてデータセットを適用することでうまく表現することができます。

JSON
{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp" :1458081226051}

それぞれのJSONエントリを、カスタムオブジェクトDeviceIoTDataとScalaケースクラスを用いて表現することができます。

Scala
case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)

そして、JSONファイルからデータを読み込むことができます。

Scala
// read the json file and create the dataset from the 
// case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json("/databricks-public-datasets/data/iot/iot_devices.json").as[DeviceIoTData]

上のコードの内部では3つのことが起こっています。

  1. SparkがJSONを読み込み、スキーマを推定し、データフレームのコレクションを生成
  2. この時点では、Sparkは正確な型を知らないため、DataFrame = Dataset[Row]として、一般的なRowオブジェクトのコレクションにデータを変換
  3. 次にSparkは、DeviceIoTDataクラスで表現されているように、Dataset[Row]->Dataset[DeviceIoTData]として型付けされたScala JVMオブジェクトに変換

構造化データを取り扱ったことがあれば、オブジェクト内の特定の属性にアクセスしたり、列志向でデータを参照、処理したことには慣れていることでしょう。Dataset[ElementType]に型付けされたオブジェクトのコレクションとしてのデータセットによって、コンパイル時の型安全と強く型付けされたJVMオブジェクトに対するカスタムビューの両方をシームレスに活用することができます。そして、上のコードから得られる強く型付けされたDataset[T]は容易に、高レベルのメソッドを用いて処理および表示することができます。

3. 構造による使いやすいAPI

構造(structure)は、お使いのSparkプログラムがデータに対してできることに関するコントロールを制限するかもしれませんが、高レベルのconstructとして表現できるドメイン固有のオペレーションと豊富なセマンティクスを提供します。しかし、ほとんどの計算処理はデータセットの高レベルAPIで達成することができます。例えば、RDDの行のデータフィールドを用いるよりも、データセットの型付けがされたDeviceIoTDataオブジェクトにアクセスし、aggselectsumavgmapfiltergroupByオペレーションを実行する方が遥かにシンプルです。

(RDDにおける)関係代数型の表現を用いるよりも、ドメイン固有APIによる計算表現の方がはるかにシンプルで容易なものになります。例えば、以下のコードでは、filter()map()を用いて別の不変データセットを作成しています。

Scala
// Use filter(), map(), groupBy() country, and compute avg() 
// for temperatures and humidity. This operation results in 
// another immutable Dataset. The query is simpler to read, 
// and expressive

val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()

//display the resulting dataset
display(dsAvgTmp)

4. パフォーマンス、最適化

上で述べてきたメリットに加えて、二つの理由からデータフレーム、データセットAPIを利用することによる容量の効率性、パフォーマンスの改善を見過ごすわけにはいきません。

第一に、データフレーム、データセットAPIはSparkのSQLエンジン上に構築されているので、最適化された論理的、物理的クエリー計画を生成するのにCatalystを利用します。R、Java、Scala、Pythonのデータフレーム/データセットAPIの全てのリレーションタイプのクエリーは同じコードオプティマイザを経由しますので、容量、スピードの効率性を手に入れることができます。Dataset[T]に型付けされたAPIはデータエンジニアリングタスクに最適化されており、型付けのされていないDataset[Row](データフレームのエイリアス)はさらに高速で、インタラクティブな分析に適しています。

第二に、コンパイラとしてのSparkはお使いのデータセットタイプのJVMオブジェクトを理解するので、エンコーダーを用いてお使いの型固有のJVMオブジェクトをTungstenの内部メモリ表現にマッピングします。結果として、Tungstenエンコーダーは、効率的にJVMオブジェクトをシリアライズ/デシリアライズでき、優れたスピードで処理できるようにコンパクトなバイトコードを生成します。

いつデータフレーム、データセットを使うべきか?

  • 豊富なセマンティクス、高レベルの抽象化、ドメイン固有APIが必要であれば、データフレーム、データセットを使ってください
  • フィルター、マップ、集約、平均、合計、SQLクエリー、列アクセス、準構造化データに対するラムダ関数の適用など高レベルの表現に対する処理要件があるのであればデータフレーム、データセットを使ってください
  • コンパイル時の高レベルな型安全、型付けされたJVMオブジェクト、Catalystオプティマイザーによるメリット、Tungstenの効率的なコード生成を活用したいのであれば、データセットを使ってください
  • SparkライブラリにまたがるAPIの統合、簡素化が必要であればデータフレーム、データセットを使ってください
  • Rユーザーならデータフレームを使ってください
  • Pythonユーザーならデータフレームを使い、さらなるコントロールが必要であればRDDを使ってください

以下のように、.rddメソッドを使うことで、データフレーム、データセットからRDDにシームレスに変換を行うことができます。

Scala
// select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows
val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)
// convert to RDDs and take the first 10 rows
val eventsRDD = deviceEventsDS.rdd.take(10)

全てを統合する

まとめると、RDDあるいはデータフレーム、データセットをいつ使うのかの選択肢は明確なように見えます。前者は低レベルの機能とコントロールを提供し、後者はカスタムビューと構造化、高レベルなドメイン固有のオペレーションを提供することで、容量を削減し、処理速度を改善します。

Sparkの初期リリースからの学びとして、開発者にとってSparkをどれだけシンプルなものにするか、どのように最適化し、高速にするのかという問いに対して、我々は低レベルのRDD APIを、データフレーム、データセットとしての高レベルの抽象化に引き上げ、CatalystオプティマイザーとTungstenの上に、ライブラリにまたがる統合されたデータ抽象化を構築する決断をしました。

あなたの要件、ユースケースに適したものをデータフレーム、データセット、RDD APIの中から選んでください。しかし、あなたが構造化、準構造化データを取り扱う陣営に分けられたとしても驚くことではありません。

次のステップは?

DatabricksでApache Spark 2.2を試すことができます。

Spark SummitのプレゼンテーションA Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasetsを聴講することもできます。

Databricks 無料トライアル

Databricks 無料トライアル

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3