Spark のテーブルの filter 条件を Map から読み込む
環境
Scala 2.11.8
Spark 2.3.1
やりたいこと
- Json の中にテーブル read の条件を書きたい。
- spark.sql を使わずに read したい
読み込んだ json の中身が、
"tableName": "tableA",
"filterCondition": {
"date": "20180101",
"type": "A"
}
の場合、
val df = spark.read.table("tableA").filter($"date" === filterCondition.date && $"type" === filterCondition.type)
みたいなものを、動的に生成してほしい。
解決策
Map 型に map (ややこしい)を適用する際には、case を使う必要がある。
def loadTable(spark: SparkSession, tableName: String, filterCondition: Map[String, String]): DataFrame = {
if (filter.isEmpty) {
spark.read.table(tableName)
}
else {
spark.read.table(tableName).filter(filterCondition.map{case (key, value) => col(key) === lit(value)}.reduce(_ && _))
}
}
参考:https://stackoverflow.com/questions/8812338/scala-mismatch-while-mapping-map