LoginSignup
0
2

More than 5 years have passed since last update.

PySparkでS3上のファイルマージ

Last updated at Posted at 2018-11-20

マージ処理

いろいろ調べた結果以下のように

を元にScalaコードをPythonへ移しました。

  • s3as3でもいいのだろうか?未検証
  • copyMergeの第5引数はTrueでマージ元ディレクトリを消す
  • マージしたファイルをマージ元ディレクトリに入れていると一緒に削除されてしまう

ScalaのAPI, copyMerge

public static boolean copyMerge(FileSystem srcFS,
                Path srcDir,
                FileSystem dstFS,
                Path dstFile,
                boolean deleteSource,
                Configuration conf,
                String addString)
                         throws IOException
Copy all files in a directory to one output file (merge).

書いたコード

    def merge(bucket, suffix, file_name):
        uri = spark_context._gateway.jvm.java.net.URI
        path = spark_context._gateway.jvm.org.apache.hadoop.fs.Path
        file_sys = spark_context._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem
        file_util = spark_context._gateway.jvm.org.apache.hadoop.fs.FileUtil
        conf = spark_context._jsc.hadoopConfiguration()

        # ex) bucket = 'hangedman.s3.com'
        # ex) suffix = '/output/dir'
        # ex) file_name = 'output.csv'
        src_path = "s3a://{}{}".format(bucket, suffix)
        fs = file_sys.get(uri.create(src_path), conf)
        dst_file = "s3a://{}{}/{}".format(bucket, suffix, file_name)
        file_util.copyMerge(fs, path(src_path), fs, path(dst_file), False, conf, None)
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2