Scala
Spark
sparksql

SparkSQL - HiveContext/SQLContextの性能比較

More than 3 years have passed since last update.

SparkSQLにはSpark独自のSQLが使えるSparkContextと、HiveQLが使用できるHiveContextがあります。
SparkSQLは関数など機能的に足りないもの(日付演算等)があり、HiveContextを使いたい場合もありますが”Hive”という名称にちょっとだけ性能の不安があるのではと思っていました。
そこで非常に簡易的なものではありますが、同一データ、SQLでContextのみを切り替え、比較してみたので備忘程度に記載します

実行環境・方法

環境はMBP1台構成で以下の通りとなります

プロセッサ
2.4GHz Core i5
メモリ
8GB(Heapは2GB割り当て)

実行には以下のソースコードをscalacでJar化し、spark-submitで実行しています

インプットデータはCSVファイルを用意して、TRAN_A、TRAN_Bのみ増幅して実行しています
(増幅はforループで単純増幅しているため、重複データのみが増加しています。そのためデータサイズと比較して処理速度は遅くなっています)

perf.scala
package kensyou

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._

object test {

  def main(args: Array[String]): Unit = {

    // Spark初期設定    
    val sc = new SparkContext(new SparkConf())

    // Contextの切り替え(ここを切り替えたものを用意しそれぞれ実行する)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

    // テーブルカラム文字列
    val tranAString = "COL_A COL_B"
    val masterAString = "COL_A NAME"
    val tranBString = "COL_A COL_B COL_C DATE"

    // 文字列スキーマ作成
    val tranASchema = StructType(tranAString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    val masterASchema = StructType(masterAString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    val tranBSchema = StructType(tranBString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // データ取り込み
    val tranAData = sc.textFile("/Users/test/tranA.csv")
                    .map(_.split(","))
                    .map(p => Row(p(0), p(1)))
    val masterAData = sc.textFile("/Users/test/masterA.csv")
                    .map(_.split(","))
                    .map(p => Row(p(0), p(1)))
    val tranBData = sc.textFile("/Users/test/tranB.csv")
                    .map(_.split(","))
                    .map(p => Row(p(0), p(1), p(2)))

    // SchemaRDDへの変換
    val tranAchemaRDD = sqlContext.applySchema(tranAData, tranASchema)
    val masterASchemaRDD = sqlContext.applySchema(masterAData, masterASchema)
    val tranBSchemaRDD = sqlContext.applySchema(tranBData, tranBSchema)

    // Table登録
    tranAchemaRDD.registerTempTable("TRAN_A")
    masterASchemaRDD.registerTempTable("MASTER_A")
    tranBSchemaRDD.registerTempTable("TRAN_B")

    val result = sqlContext.sql("""SELECT 
                                     TA.COL_A
                                    ,MA.NAME
                                    ,TA.COL_B
                                    ,TB.COL_C
                                    ,COUNT(*)
                                   FROM
                                    TRAN_A AS TA 
                                    INNER JOIN MASTER_A AS MA
                                      ON  TA.COL_A = MA.COL_A
                                    INNER JOIN TRAN_B AS TB
                                      ON  TA.COL_A = TB.COL_A
                                      AND TA.COL_B = TB.COL_B
                                   GROUP BY
                                     TA.COL_A
                                    ,MA.NAME
                                    ,TA.COL_B
                                    ,TB.COL_C
                                   HAVING COUNT(*) >= 2""")

    result.collect().foreach(println)
  }
}

実行結果

以下の通りとなりました

増幅回数 HiveContext(.sec) SQLContext(.sec)
20(12KB程度) 16 13
100(60KB程度) 17 13
1000(600KB程度) 20 17
5000(3MB程度) 77 71
10000(6MB程度) 249 237

結論

実行結果を見るに多少はSQLContextの方が早いものの、大きな差異はないようです
そのため機能的にHiveQLが使いたければ安心して使ってよさそうですね

ソースコード自体も参照しましたがHiveContext自体はSQLContextをextendして実装されています
ParseをHive用のものを使っていますが、実行自体は同一になりそうですね

ただHiveテーブルを使う場合はrunHive関数が動作しそうなのですが、この場合の動作はどうなるんでしょうかね???
まあここら辺の詳しい内容は後日ソースコードを読み、機会があれば追記します