Posted at

カスタムDataSourceを作成する方法

More than 3 years have passed since last update.

Apache Spark Advent Calendar 2015 20日目の記事です。(大遅刻ですみません。。。)この記事では、DataFrame APIを通してカスタムDataSourceへ接続する方法を解説します。


0. 新たなDataSourceを書き始める前に

既に実装済みのDataSourceではないか確認するのが良いでしょう。Spark組み込みのDataSourceとして下記の4つがあります。


  • JSON

  • Parquet

  • JDBC/ODBC

  • Hive(今回の解説とは異なる実装)

http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

また、spark-packages.orgにはSparkユーザーコミュニティで開発されたパッケージが登録されています。cassandraやredshiftなどカスタムDataSourceも多数登録されているので、必要としているDataSourceが既に実装されていないかをここから確認すると良いでしょう。


1. 作るものの確認

既存の実装が見つからなかった場合、残念ながら自分でDataSourceへ接続するためのコードを開発することになります。この記事では、spark-csvを参考にしながらカスタムDataSourceの実装方法を解説します。


DataSourceの指定方法のおさらい

parquet などデフォルトで実装されているDataSourceは、

val df = sqlContext.read

.load("/path/to/my.parquet")

df.select("name", "age").write
.save("/path/to/nameAndAge.parquet")

と書くことができます。DataSourceを明示する場合は、 format()を指定します。

val df = sqlContext.read

.format("json")
.load("/path/to/input")

df.select("name", "age").write
.format("parquet")
.save("nameAndAge.parquet")

format()の引数には、デフォルトのDataSource名かカスタムDataSourceのパッケージ名を指定します。例えば、spark-csvを使うには

val df = sqlContext.read

.format("com.databricks.spark.csv")
.load("/path/to/my.csv")

df.select("name", "age").write
.format("com.databricks.spark.csv")
.save("/path/to/nameAndAge.csv")

と書きます。


2. Scala APIの実装

カスタムDataSourceを作成するには、以下の2つのクラスを実装します。



  • BaseRelation派生クラス


  • DefaultSourceクラス

BaseRelationはDataSourceのスキーマ付きデータを表すクラスです。BaseRelationの派生クラスではデータ読込みや書込み処理を実装します。

DefaultSourceクラスは、ユーザーとカスタムDataSourceの接点となります。DataSourceへ渡されるオプションやユーザー指定のスキーマを受け取り、BaseRelation派生クラスのインスタンスを生成します。


DefaultSourceの実装

DefaultSourceクラスは、format()で指定されるパッケージ名の直下に定義します。com.example.customがパッケージ名だとした場合、com.example.custom.DefaultSourceになります。以下にDefaultSourceクラスの実装例を示します。

package com.example.custom

// some import ...

class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {

// from RelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
// ...
}

// from SchemaRelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
// ...
}

// from CreatableRelationProvider
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
// ...
}
}

この例では、


  • RelationProvider

  • SchemaRelaionProvider

  • CreatableRelationProvider

を実装しています。RealtionProviderはDataSourceからデータを読込むためのインターフェイスを提供する場合に実装します。SchemaRelationProviderはユーザーが指定したスキーマを受け取れること以外はRelationProviderと同じです。


RelationProvider/SchemaRelationProvider

RelationProviderSchemaRelationProviderは、org.apache.spark.sql.sources.interfaces.scala で以下のように定義されています。

trait RelationProvider {

def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
}

trait SchemaRelationProvider {
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation
}

RelationProviderが提供するcreateRelation()はDataSourceからデータを読み込む場合に呼び出されます。例えば、

sqlContext.read

.format("com.example.custom")
.option("optionKey", "optionValue")
.load("/path/to/datasource")

が実行された場合、com.example.custom.DefaultSourceで実装されているRelationProvidercreateRelation()が呼び出さます。parameters引数にはoption()で渡したKey-Valueが渡されます。また、load()の引数は、pathというKeyでparametersから取得することができます。

また、schema()でユーザー指定のスキーマが渡された場合、SchemaRelationProviderが提供するcreateRealtion()が呼び出されます。


CreatableRelationProvider

CreatableRelationProviderは、org.apache.spark.sql.sources.interfaces.scala で以下のように定義されています。

trait CreatableRelationProvider {

def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
}

このtraitでは、dataをdata sourceへ書込んでBaseRelationのインスタンスを返します。spack-csvでは、dataをcsvファイルへ書き込んだ後にSchemaRelationProvidercreateRelation()を呼び出す実装になっています。


BaseRelation派生クラスの実装

BaseRelationはスキーマ付きデータを表すクラスで、いくつかあるScanのtraitを実装する必要があります。実装可能なScanクラスには、


  • TableScan

  • PrunedScan

  • PrunedFilteredScan

  • InsertableRelation

  • CatalystScan

があります。ここでは、spark-csvで実装されているTableScanPrunedScanInsertableRelationについて説明します。


TableScan/PrunedScan

TableScanPrunedScanはdata sourceからデータを読込みRDDを構築するためのAPIを提供します。org.apache.spark.sql.sources.interfaces.scala で以下のように定義されています。

trait TableScan {

def buildScan(): RDD[Row]
}

trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}

buildScan()では、コンストラクタで渡された情報をもとにdata sourceからデータを読み込みます。PrunedScanは不要なカラムを削除してからRDDを構築するためのAPIを提供します。


InsertableRelation

InsertableRelationはdata sourceへ書込みするAPIを提供します。org.apache.spark.sql.sources.interfaces.scala で以下のように定義されています。

trait InsertableRelation {

def insert(data: DataFrame, overwrite: Boolean): Unit
}

insert()メソッドでは、dataをdata sourceへ書き込む処理を実装します。overwriteがtrueの場合、上書きしfalseの場合、既存のデータに追記するようにします。spark-csvの実装では、CreatableRelationProviderの実装とファイル書込みの処理が共通化されています。

CreatableRelationProviderInsertableRelationは似たようなtraitですが、CreatableRelationProviderはDataFrame APIでsave()したときに呼び出され、InsertableRelationはSparkSQLからINSERT .. TABLEが実行されたときに呼び出されるようです。

例えば、

df.write.format("...").save("/path/to/file")

のようなDataFrame APIを使ったコードが実行された場合はCreatableRelationProvidercreateRealtion()が呼び出されます。

INSERT OVERWRITE TABLE sometable SELECT foo, bar, ...

のようなSparkSQLが実行された場合は、InsertableRelationinsert()が呼び出されます。

この辺りは、data sourceを自作した際に非常にわかりづらいと感じました。


4. まとめ

カスタムDataSourceは実装するべきtraitがそれほど多くないので仕様が分かってしまえば比較的楽に実装できるように思います。ただ参考にするコードによって実装方法(とくに副作用をどのタイミングで起こすか)がまちまちだったり、ドキュメントが整備されていなかったりと少しハードルが高いという印象です。

この記事によって、日本からspark-packages.orgへ登録する人が増えれば幸いです。


5. 参考情報