9
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Sparkで集計し終わったファイルをひとまとめにする

Posted at

この問題が解決しなくて、

spark part files merge

なんてググっていたんですが解決したので共有

Hadoopはパーティションごとに処理をさせるため、最終的な結果は

part-00000
part-00001
・
・
・
・
・
part-00015

と複数のファイルに出力されてしまう。

パーティション数を1とかにすることで、一つのファイルに出力されると思うが、そもそも分散処理する際にはパーティション毎に並列に処理されるためパーティション1は分散処理する意味がなさそう。また元ファイルが馬鹿でかい場合、扱いきれるのか?という疑問もわいてくる。

今回の環境

  • EMR(Hadoop, Spark)
  • S3

どうやってひとまとめにするか

Hadoopのmerge機能を使う

具体的にはsparkがS3に出力した複数の集計ファイルをHadoopのmerge機能を使うことによって連結させ、それをS3に配置する

実際のコード

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3.S3FileSystem
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

object FileMerger {
  
  
  (例)srcPath = "s3n://バケット名/sparkでsaveしたディレクトリ"
    outFile = "s3n://バケット名/sparkでsaveしたディレクトリ/result.csv"
    
  def merge(srcPath : String, outFile : String) : Unit = {
    val config = new Configuration()
    config.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
    val dstFile : String = "%s/result.csv".format(srcPath)
    val fs = FileSystem.get(URI.create(srcPath), config)

    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(outFile), false, config, null)
  }
}

9
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?