1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkでsaveAsBinaryFileする(その2)

Last updated at Posted at 2018-06-17

この記事の内容

前回の記事で、Sparkを使ってバイナリ列をそのまま書き出す方法を書きましたが、
mapPartitionsWithIndexを使って書き出すよりもシンプルなやり方がありました。
前回のやり方ではHDFSへ書き出すことができなかったのですが、
今回はHDFSへも書き出せますし、"正しい"やり方であると思いますので、
前回の記事はmapPartitionsWithIndexの使い方として捉えて頂ければと思います。

Javaのコード

今回もJavaで行います。

SaveBinary.java
import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf());

        // bytesのRDDを適当に作る
        ArrayList<byte[]> bytes = new ArrayList<>();
        for (int i=0; i<10; i++) {
            byte[] b = new byte[10];
            for (int j=0; j<10; j++) {
                b[j] = (byte)(i*10+j);
            }
            bytes.add(b);
        }
        JavaRDD<byte[]> rdd = sc.parallelize(bytes, 2);
        /* RDD内のイメージ
            rdd[0] =  0,  1,  2,  3,  4,  5,  6,  7,  8,  9
            rdd[1] = 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
            ・・・
        */
        // byte[]をBytesWritableへ変換、JavaPairRDDにするため、valueとしてNullWritableを使う
       rdd.mapToPair(x->new Tuple2<>(new BytesWritable(x),NullWritable.get()))
          //BytesOutputFormat(自作)を指定してsaveAsNewAPIHadoopFileを行う
          .saveAsNewAPIHadoopFile("./out", BytesWritable.class, NullWritable.class, BytesOutputFormat.class);
    }
}
BytesOutputFormat.java
import java.io.*;
import java.util.*;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesOutputFormat extends SequenceFileOutputFormat<BytesWritable,NullWritable> {
    @Override
    public RecordWriter<BytesWritable,NullWritable> getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        // BytesOutputFormatはBytesRecordWriterを呼びだすだけ
        BytesRecordWriter writer = new BytesRecordWriter(job);
        return writer;
    }
}
ByteRecordWriter.java
import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;

public class BytesRecordWriter extends RecordWriter<BytesWritable,NullWritable> {
    private boolean saveToHdfs_ = true;
    private OutputStream os_;
    public BytesRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
        // SaveBinary.javaで指定した保存先はここに格納されている
        String outDir = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
        String taskId = job.getTaskAttemptID().getTaskID().toString();
        // HDFSかNFSかが違っても、os_のクラスを変えるだけで良い
        if (saveToHdfs_) {
            FileSystem hdfs = FileSystem.get(job.getConfiguration());
            os_ = hdfs.create(new Path(outDir + "/part-" + taskId.substring(taskId.length()-6)));
        } else {
            os_ = new FileOutputStream(outDir + "/part-" + taskId.substring(taskId.length()-6));
        }
    }
    @Override
    public void write(BytesWritable key, NullWritable value) throws IOException {
        os_.write(key.getBytes(), 0, key.getLength());
    }

    @Override
    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
        os_.close();
        os_ = null;
    }
}

解説

JavaPairRDDには、Hadoop系のsaveメソッドが準備されているので、
saveAsNewAPIHadoopFileを使って保存を行います。
("NewAPI"とつかないものもありますが、違いはよくわかりません。新しいのを使えば良いと思います。)

saveAsNewAPIHadoopFileはOutputFormatを指定できますので、
後は自分が保存したい形式に沿ったRecordWriterを作って、それを渡すOutputFormatを指定してやります。

この辺はHadoopのOutputFormat周りを実装したことがない人にはややこしいかもしれませんが、
上記コードを見ればやっていることはすぐにわかるかと思います。
OutputStreamを作って、writeメソッドが各要素毎に呼ばれているだけですね。

出力結果

一応出力結果を示しておきます。
今回はわざわざpartitionを2にして実行していたので、出力ファイルは2つです。

out/part-000000
00 01 02 03 04 05 06 07  08 09 0A 0B 0C 0D 0E 0F
10 11 12 13 14 15 16 17  18 19 1A 1B 1C 1D 1E 1F
20 21 22 23 24 25 26 27  28 29 2A 2B 2C 2D 2E 2F
30 31
out/part-000001
32 33 34 35 36 37 38 39  3A 3B 3C 3D 3E 3F 40 41
42 43 44 45 46 47 48 49  4A 4B 4C 4D 4E 4F 50 51
52 53 54 55 56 57 58 59  5A 5B 5C 5D 5E 5F 60 61
62 63

特に余分なデータもなく出力されたことがわかります。
余談ですが、part-000000part-000001でデータは一続きになっているので、
これらのファイルをcatしてやれば1partitionとして出力したのと同じファイルが得られます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?