マージ処理
いろいろ調べた結果以下のように
を元にScalaコードをPythonへ移しました。
-
s3a
はs3
でもいいのだろうか?未検証 -
copyMerge
の第5引数はTrueでマージ元ディレクトリを消す - マージしたファイルをマージ元ディレクトリに入れていると一緒に削除されてしまう
ScalaのAPI, copyMerge
public static boolean copyMerge(FileSystem srcFS,
Path srcDir,
FileSystem dstFS,
Path dstFile,
boolean deleteSource,
Configuration conf,
String addString)
throws IOException
Copy all files in a directory to one output file (merge).
書いたコード
def merge(bucket, suffix, file_name):
uri = spark_context._gateway.jvm.java.net.URI
path = spark_context._gateway.jvm.org.apache.hadoop.fs.Path
file_sys = spark_context._gateway.jvm.org.apache.hadoop.fs.s3.S3FileSystem
file_util = spark_context._gateway.jvm.org.apache.hadoop.fs.FileUtil
conf = spark_context._jsc.hadoopConfiguration()
# ex) bucket = 'hangedman.s3.com'
# ex) suffix = '/output/dir'
# ex) file_name = 'output.csv'
src_path = "s3a://{}{}".format(bucket, suffix)
fs = file_sys.get(uri.create(src_path), conf)
dst_file = "s3a://{}{}/{}".format(bucket, suffix, file_name)
file_util.copyMerge(fs, path(src_path), fs, path(dst_file), False, conf, None)