- partitionByで指定して
- saveAsTable or insertInto
でいけると思いきや、バグ or 仕様が実装されてないかで、そのままでは
save or insert できないっぽい。
なので、
- hive tableは自前でcreate tableしておく with partition
- df.saveAsTableでhive tableをcreateしてくれるが、partitionByが効かない。一回作って、partitionを編集して、自前でcreate tableする。
- regsiterTempTableでtemp tableに吐き出して
- 普通のspark上で hiveqlでinsertする。dynamic partitionで
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._
val options = new HashMap[String, String]
options.put("spark.hadoop.mapred.output.compress", "true")
options.put("spark.hadoop.mapred.output.compression.codec", "true")
//options.put("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
options.put("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
options.put("spark.hadoop.mapred.output.compression.type", "BLOCK")
//options.put("parquet.compression","GZIP")
//SET hive.exec.compress.output=true;
//SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
//SET mapred.output.compression.type=BLOCK;
//val sqlContext = new SQLContext(sc)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
sqlContext.setConf("parquet.compression", "SNAPPY")
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
val date="2016-07-23"
val hour="00"
// imp
val impRdd = sc.textFile(s"/data/imp/$date/$hour")
val parsedImp = impRdd.map(LogParser.parse)
val impDf = parsedImp.map(LogParser.parseForDFRow2).toDF
scala.util.Try(sqlContext.dropTempTable("temp_imp"))
inviewDf.registerTempTable("temp_imp")
sqlContext.sql(s"INSERT overwrite TABLE sandbox.partitioned_imp partition(dt, dh) SELECT * FROM temp_imp limit 10")