やったのは
- 生ログが1行jsonのデータでhdfsにある
- 生ログをspark RDDとして読み込み
- RDDをparseして、dfに変換
- dfをparquet formatでhive tableにsaveしてあとでhive tableとして引けるようにする。
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
val options = new HashMap[String, String]
options.put("spark.hadoop.mapred.output.compress", "true")
options.put("spark.hadoop.mapred.output.compression.codec", "true")
options.put("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
options.put("spark.hadoop.mapred.output.compression.type", "BLOCK")
//val sqlContext = new SQLContext(sc)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val date="2016-07-21"
val hour="00"
// imp
val impRdd = sc.textFile(s"/data/service/imp/$date/$hour")
val parsedImp = impRdd.map(LogParser.parse)
imp logをdfに変換して、 hive tableとしてsaveする。
val impDf = parsedImp.flatMap(LogParser.parseForDFRow).toDF
impDf.write.options(options).saveAsTable("imp_fact_table")
あと考慮すること、やってみることは
- partitionByでpartition指定
- insertIntoを使う
- text methodを使ってのtsv?での出力