この問題が解決しなくて、
spark part files merge
なんてググっていたんですが解決したので共有
Hadoopはパーティションごとに処理をさせるため、最終的な結果は
part-00000
part-00001
・
・
・
・
・
part-00015
と複数のファイルに出力されてしまう。
パーティション数を1とかにすることで、一つのファイルに出力されると思うが、そもそも分散処理する際にはパーティション毎に並列に処理されるためパーティション1は分散処理する意味がなさそう。また元ファイルが馬鹿でかい場合、扱いきれるのか?という疑問もわいてくる。
今回の環境
- EMR(Hadoop, Spark)
- S3
どうやってひとまとめにするか
Hadoopのmerge機能を使う
具体的にはsparkがS3に出力した複数の集計ファイルをHadoopのmerge機能を使うことによって連結させ、それをS3に配置する
実際のコード
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3.S3FileSystem
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
object FileMerger {
(例)srcPath = "s3n://バケット名/sparkでsaveしたディレクトリ"
outFile = "s3n://バケット名/sparkでsaveしたディレクトリ/result.csv"
def merge(srcPath : String, outFile : String) : Unit = {
val config = new Configuration()
config.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
val dstFile : String = "%s/result.csv".format(srcPath)
val fs = FileSystem.get(URI.create(srcPath), config)
FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(outFile), false, config, null)
}
}