この記事の内容
前回の記事で、Sparkを使ってバイナリ列をそのまま書き出す方法を書きましたが、
mapPartitionsWithIndexを使って書き出すよりもシンプルなやり方がありました。
前回のやり方ではHDFSへ書き出すことができなかったのですが、
今回はHDFSへも書き出せますし、"正しい"やり方であると思いますので、
前回の記事はmapPartitionsWithIndexの使い方として捉えて頂ければと思います。
Javaのコード
今回もJavaで行います。
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;
public class SaveBinary {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf());
// bytesのRDDを適当に作る
ArrayList<byte[]> bytes = new ArrayList<>();
for (int i=0; i<10; i++) {
byte[] b = new byte[10];
for (int j=0; j<10; j++) {
b[j] = (byte)(i*10+j);
}
bytes.add(b);
}
JavaRDD<byte[]> rdd = sc.parallelize(bytes, 2);
/* RDD内のイメージ
rdd[0] = 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
rdd[1] = 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
・・・
*/
// byte[]をBytesWritableへ変換、JavaPairRDDにするため、valueとしてNullWritableを使う
rdd.mapToPair(x->new Tuple2<>(new BytesWritable(x),NullWritable.get()))
//BytesOutputFormat(自作)を指定してsaveAsNewAPIHadoopFileを行う
.saveAsNewAPIHadoopFile("./out", BytesWritable.class, NullWritable.class, BytesOutputFormat.class);
}
}
import java.io.*;
import java.util.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
public class BytesOutputFormat extends SequenceFileOutputFormat<BytesWritable,NullWritable> {
@Override
public RecordWriter<BytesWritable,NullWritable> getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
// BytesOutputFormatはBytesRecordWriterを呼びだすだけ
BytesRecordWriter writer = new BytesRecordWriter(job);
return writer;
}
}
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
public class BytesRecordWriter extends RecordWriter<BytesWritable,NullWritable> {
private boolean saveToHdfs_ = true;
private OutputStream os_;
public BytesRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException {
// SaveBinary.javaで指定した保存先はここに格納されている
String outDir = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
String taskId = job.getTaskAttemptID().getTaskID().toString();
// HDFSかNFSかが違っても、os_のクラスを変えるだけで良い
if (saveToHdfs_) {
FileSystem hdfs = FileSystem.get(job.getConfiguration());
os_ = hdfs.create(new Path(outDir + "/part-" + taskId.substring(taskId.length()-6)));
} else {
os_ = new FileOutputStream(outDir + "/part-" + taskId.substring(taskId.length()-6));
}
}
@Override
public void write(BytesWritable key, NullWritable value) throws IOException {
os_.write(key.getBytes(), 0, key.getLength());
}
@Override
public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
os_.close();
os_ = null;
}
}
解説
JavaPairRDDには、Hadoop系のsaveメソッドが準備されているので、
saveAsNewAPIHadoopFile
を使って保存を行います。
("NewAPI"とつかないものもありますが、違いはよくわかりません。新しいのを使えば良いと思います。)
saveAsNewAPIHadoopFile
はOutputFormatを指定できますので、
後は自分が保存したい形式に沿ったRecordWriterを作って、それを渡すOutputFormatを指定してやります。
この辺はHadoopのOutputFormat周りを実装したことがない人にはややこしいかもしれませんが、
上記コードを見ればやっていることはすぐにわかるかと思います。
OutputStreamを作って、write
メソッドが各要素毎に呼ばれているだけですね。
出力結果
一応出力結果を示しておきます。
今回はわざわざpartitionを2にして実行していたので、出力ファイルは2つです。
00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
10 11 12 13 14 15 16 17 18 19 1A 1B 1C 1D 1E 1F
20 21 22 23 24 25 26 27 28 29 2A 2B 2C 2D 2E 2F
30 31
32 33 34 35 36 37 38 39 3A 3B 3C 3D 3E 3F 40 41
42 43 44 45 46 47 48 49 4A 4B 4C 4D 4E 4F 50 51
52 53 54 55 56 57 58 59 5A 5B 5C 5D 5E 5F 60 61
62 63
特に余分なデータもなく出力されたことがわかります。
余談ですが、part-000000
とpart-000001
でデータは一続きになっているので、
これらのファイルをcat
してやれば1partitionとして出力したのと同じファイルが得られます。