Learning spark 2nd editionをみながら写経ちう
とりあえず最初にimportするやつ
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
csvからdataframeの作成
val df = spark.read.option("header",true).csv("path/filename.csv")
作ったdataframeから条件でデータを抽出
val filtered_df = df.select("col1", "col2", "col3").where(col("col4") === 0)
val filtered_df = df.select("col1", "col2", "col3").filter(col("col4") === 0)
## どっちも一緒
グルーピング
val GroupingDF = df.select("col1", "col2", "col3").groupBy("col1", "col2").agg(sum("col3").alias("total"), count("col3").alias("count"))
# groupByでどこのカラムでグルーピングするか、aggのなかに集計方法を書く
リストからdataframeを作成
val data = Seq(("Brooke", 20), ("Brooke", 25), ("Denny", 31), ("Jules", 30), ("TD", 35))
val dataDF = spark.createDataFrame(data).toDF("name", "age")
val dataDF = data.toDF("name", "age")
# どちらでも良いみたい
データの確認
scala> dataDF.show(3, false)
+------+---+
|name |age|
+------+---+
|Brooke|20 |
|Brooke|25 |
|Denny |31 |
+------+---+
only showing top 3 rows
# show(表示する行数, 一定字数以上を省略するか)
データに算術処理する
scala> dataDF.select(expr("age * 2")).show
scala> dataDF.select(col("age") * 2).show
+---------+
|(age * 2)|
+---------+
| 40|
| 50|
| 62|
| 60|
| 70|
+---------+
# どっちとも同じ結果になる
# selectの中は1columnしか無理そう
算術処理したりしてcolumn追加
scala> val dataDFthirty = dataDF.withColumn("over_thirty", (expr("age >= 30")))
scala> dataDFthirty.show()
+------+---+-----------+
| name|age|over_thirty|
+------+---+-----------+
|Brooke| 20| false|
|Brooke| 25| false|
| Denny| 31| true|
| Jules| 30| true|
| TD| 35| true|
+------+---+-----------+
scala> val dataDFna = dataDF.withColumn("nameage", (concat(expr("name"), expr("age"))))
scala> dataDFna.show()
+------+---+--------+
| name|age| nameage|
+------+---+--------+
|Brooke| 20|Brooke20|
|Brooke| 25|Brooke25|
| Denny| 31| Denny31|
| Jules| 30| Jules30|
| TD| 35| TD35|
+------+---+--------+
# concatもいけるこの時expr()でカラム指定しないとダメみたい
scala> val dataDFdouble = dataDF.withColumn("doubleAge", (expr("age * 2")))
val dataDFtoriple = dataDFdouble.withColumn("toripleAge", (expr("age + doubleAge")))
scala> dataDFtoriple.show()
+------+---+---------+----------+
| name|age|doubleAge|toripleAge|
+------+---+---------+----------+
|Brooke| 20| 40| 60|
|Brooke| 25| 50| 75|
| Denny| 31| 62| 93|
| Jules| 30| 60| 90|
| TD| 35| 70| 105|
+------+---+---------+----------+
# カラム同士でもできる
ソート
scala> dataDF.sort(col("age").desc).show()
+------+---+
| name|age|
+------+---+
| TD| 35|
| Denny| 31|
| Jules| 30|
|Brooke| 25|
|Brooke| 20|
+------+---+
scala> dataDF.sort(col("name").desc).show()
+------+---+
| name|age|
+------+---+
| TD| 35|
| Jules| 30|
| Denny| 31|
|Brooke| 20|
|Brooke| 25|
+------+---+
#単語の場合アルファベット順
scala> dataDFJP.sort(col("name").desc).show()
+--------+---+
| name|age|
+--------+---+
|ほしかわ| 35|
| ひぐち| 30|
| たなか| 20|
| かんだ| 31|
| いけだ| 25|
+--------+---+
# 日本語なら50音順
scala> dataDFJP.sort(col("name").asc).show()
+--------+---+
| name|age|
+--------+---+
| いけだ| 25|
| かんだ| 31|
| たなか| 20|
| ひぐち| 30|
|ほしかわ| 35|
+--------+---+
# 降順ならdesc, 昇順ならasc
scala> dataDFJP.sort($"name".asc).show()
+--------+---+
| name|age|
+--------+---+
| いけだ| 25|
| かんだ| 31|
| たなか| 20|
| ひぐち| 30|
|ほしかわ| 35|
+--------+---+
# これでもいける