12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkでKey毎にファイル出力をしたい

Last updated at Posted at 2016-05-15

概要

あるSparkアプリで、RDDの出力を1ファイルでなく特定のKey毎に分けたくなった。
わかりやすい方法がなかったので、試してみた内容を記しておきます。

方法

まずはこちらのissueを見ます。
https://issues.apache.org/jira/browse/SPARK-3533

どうやら saveAsTextFileByKey() というドンピシャなAPIはなく、work aroundで対応しろとのことでした。(それでcloseされてます。)

今回は、2つ紹介されている方法のうち1つ目をやってみました。

ただ、この方法は、.partitionBy(new HashPartitioner(num))に重要な問題が潜んでいます。

具体的には、
Partitionを再マッピングする際に、特定のKeyの要素が多すぎて1Partitionに乗り切らなかった場合に、複数Partition(Executor)から同一のファイルパスに対して出力され書き込みがバッティングするです。

問題点を踏まえて、正しい解決法

まず、RDDMultipleTextOutputFormatを作成します。

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

次に、出力対象のKey(String型になってるはず)にパーティション番号を付与します。

val targetRDD = someRDD.mapPartitionsWithIndex((p, elms) => {
        elms.map { case (key, val) => (s"$key/$p.log", val) }
      })

最後に、そのRDDを出力します。

targetRDD.saveAsHadoopFile("/your/base/path",
        classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]
      )

こうすることで、普通のRDDのsaveAsTextFileと同じように、/your/base/path/<key>/<partition番号>.logなどの形でファイルが出力されます。
異なるPartition(Executor)が同じファイルパスに出力することはないので書き込みのバッティングは起こらないはずです。

12
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?