概要
あるSparkアプリで、RDDの出力を1ファイルでなく特定のKey毎に分けたくなった。
わかりやすい方法がなかったので、試してみた内容を記しておきます。
方法
まずはこちらのissueを見ます。
https://issues.apache.org/jira/browse/SPARK-3533
どうやら saveAsTextFileByKey()
というドンピシャなAPIはなく、work aroundで対応しろとのことでした。(それでcloseされてます。)
今回は、2つ紹介されている方法のうち1つ目をやってみました。
ただ、この方法は、.partitionBy(new HashPartitioner(num))
に重要な問題が潜んでいます。
具体的には、
Partitionを再マッピングする際に、特定のKeyの要素が多すぎて1Partitionに乗り切らなかった場合に、複数Partition(Executor)から同一のファイルパスに対して出力され書き込みがバッティングするです。
問題点を踏まえて、正しい解決法
まず、RDDMultipleTextOutputFormat
を作成します。
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
次に、出力対象のKey(String型になってるはず)にパーティション番号を付与します。
val targetRDD = someRDD.mapPartitionsWithIndex((p, elms) => {
elms.map { case (key, val) => (s"$key/$p.log", val) }
})
最後に、そのRDDを出力します。
targetRDD.saveAsHadoopFile("/your/base/path",
classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]
)
こうすることで、普通のRDDのsaveAsTextFileと同じように、/your/base/path/<key>/<partition番号>.log
などの形でファイルが出力されます。
異なるPartition(Executor)が同じファイルパスに出力することはないので書き込みのバッティングは起こらないはずです。