数年前にSpark使ってた時のメモなので色々問題あるかもしれません…
データフレームの作成
スキーマを指定してデータフレーム作成
val schema = StructType(
StructField("Column1", StringType, true) ::
StructField("Column2", IntegerType, true) ::
StructField("Column3", DoubleType, true) :: Nil
)
var df = spark.createDataFrame(sc.emptyRDD[Row], schema)
groupBy
Column1とDateTimeの日付まででGroup Byし、Column2とColumn3の最大と最小を導出
var _df = df
.groupBy(
$"Column1",
to_date($"DateTime", "yyyy/MM/dd hh:mm:ss").as("Date")
)
.agg(
max($"Column2").as("MaxColumn2"),
min($"Column3").as("MinColumn3")
)
Window関数
前提とするDataFrameのスキーマは以下
var schema = StructType(
StructField("group", StringType, true) ::
StructField("datetime", StringType, true) ::
StructField("number", DoubleType, true) ::
StructField("value", StringType, true) :: Nil
)
// yyyy/MM/dd hh:mm:ssを日付形式に変更(GroupByなどに含める)
to_date($"Time", "yyyy/MM/dd hh:mm:ss").as("Date")
var df = _df.test().test()
平均値
指定したグループに対して平均値を導出する。
var df = _df.groupBy($"group").agg(avg("number"))
行数カウント
Window関数で指定した条件で行数をカウントする
val w = Window.partitionBy("group").orderBy("number")
var df = _df.withColumn("dataCount", row_number().over(w))