SparkSQLにはSpark独自のSQLが使えるSparkContextと、HiveQLが使用できるHiveContextがあります。
SparkSQLは関数など機能的に足りないもの(日付演算等)があり、HiveContextを使いたい場合もありますが”Hive”という名称にちょっとだけ性能の不安があるのではと思っていました。
そこで非常に簡易的なものではありますが、同一データ、SQLでContextのみを切り替え、比較してみたので備忘程度に記載します
実行環境・方法
環境はMBP1台構成で以下の通りとなります
- プロセッサ
- 2.4GHz Core i5
- メモリ
- 8GB(Heapは2GB割り当て)
実行には以下のソースコードをscalacでJar化し、spark-submitで実行しています
インプットデータはCSVファイルを用意して、TRAN_A、TRAN_Bのみ増幅して実行しています
(増幅はforループで単純増幅しているため、重複データのみが増加しています。そのためデータサイズと比較して処理速度は遅くなっています)
package kensyou
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
object test {
def main(args: Array[String]): Unit = {
// Spark初期設定
val sc = new SparkContext(new SparkConf())
// Contextの切り替え(ここを切り替えたものを用意しそれぞれ実行する)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
// テーブルカラム文字列
val tranAString = "COL_A COL_B"
val masterAString = "COL_A NAME"
val tranBString = "COL_A COL_B COL_C DATE"
// 文字列スキーマ作成
val tranASchema = StructType(tranAString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val masterASchema = StructType(masterAString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val tranBSchema = StructType(tranBString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// データ取り込み
val tranAData = sc.textFile("/Users/test/tranA.csv")
.map(_.split(","))
.map(p => Row(p(0), p(1)))
val masterAData = sc.textFile("/Users/test/masterA.csv")
.map(_.split(","))
.map(p => Row(p(0), p(1)))
val tranBData = sc.textFile("/Users/test/tranB.csv")
.map(_.split(","))
.map(p => Row(p(0), p(1), p(2)))
// SchemaRDDへの変換
val tranAchemaRDD = sqlContext.applySchema(tranAData, tranASchema)
val masterASchemaRDD = sqlContext.applySchema(masterAData, masterASchema)
val tranBSchemaRDD = sqlContext.applySchema(tranBData, tranBSchema)
// Table登録
tranAchemaRDD.registerTempTable("TRAN_A")
masterASchemaRDD.registerTempTable("MASTER_A")
tranBSchemaRDD.registerTempTable("TRAN_B")
val result = sqlContext.sql("""SELECT
TA.COL_A
,MA.NAME
,TA.COL_B
,TB.COL_C
,COUNT(*)
FROM
TRAN_A AS TA
INNER JOIN MASTER_A AS MA
ON TA.COL_A = MA.COL_A
INNER JOIN TRAN_B AS TB
ON TA.COL_A = TB.COL_A
AND TA.COL_B = TB.COL_B
GROUP BY
TA.COL_A
,MA.NAME
,TA.COL_B
,TB.COL_C
HAVING COUNT(*) >= 2""")
result.collect().foreach(println)
}
}
実行結果
以下の通りとなりました
増幅回数 | HiveContext(.sec) | SQLContext(.sec) |
---|---|---|
20(12KB程度) | 16 | 13 |
100(60KB程度) | 17 | 13 |
1000(600KB程度) | 20 | 17 |
5000(3MB程度) | 77 | 71 |
10000(6MB程度) | 249 | 237 |
結論
実行結果を見るに多少はSQLContextの方が早いものの、大きな差異はないようです
そのため機能的にHiveQLが使いたければ安心して使ってよさそうですね
ソースコード自体も参照しましたがHiveContext自体はSQLContextをextendして実装されています
ParseをHive用のものを使っていますが、実行自体は同一になりそうですね
ただHiveテーブルを使う場合はrunHive関数が動作しそうなのですが、この場合の動作はどうなるんでしょうかね???
まあここら辺の詳しい内容は後日ソースコードを読み、機会があれば追記します