この記事の内容
2018/06/17追記
SparkのAPIに則った、正しいやり方の記事を書きましたので、
このタイトルの目的のためには、基本的にはそちらを参照して下さい。
この記事は、mapPartitionsWithIndexの使い方例としてはご利用いただけます。
SparkでPartitionごとのデータを書き出すのに、
saveAsTextFile
はあるのに、saveAsBinaryFile
はありません。
saveAsObjectFile
はありますが、これはHadoop用のSequenceFile形式になってしまいます。
バイナリの情報をそのまま書き出すAPIがなさそうなので、その方法を探していました。
やりたいことができたので記事にします。
Javaのコード
直近で必要になりそうなのがJavaなので、とりあえずJavaのコードだけです。
今後ScalaやPythonでも書くかも。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import java.io.*;
import java.util.*;
public class SaveBinary {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
// 画像ファイルを読み込んでbyte[]のRDDを作成
JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
saveAsBinaryFile(bytes, "out");
}
// 受け入れるRDDは byte[] だけ
private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {
// 匿名クラスで、mapPartitionsWithIndexの中身を実装
Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
@Override
public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
// 出力ファイル名はHDFS出力風だが、実際はNFSに出力
FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
while (iterator.hasNext()) {
fos.write(iterator.next());
}
fos.close();
return iterator;
}
};
// mapPartitionsWithIndexで、partition毎の操作を行う
bytes.mapPartitionsWithIndex(writeBinary, false).count();
}
}
解説
mapPartitionsWithIndex
は、RDD内の各パーティション毎に操作を行います。
また名前の通り、パーティション毎に番号がふられるので、それで別ファイル名として保存しています。
mapPartitionsWithIndex
に渡す処理内容はFunction2
クラスとして実装します。
RDDの中身がパーティション毎にIteratorに入ってくるので、それを使って目的の処理を実装します。
上記のように匿名クラスとして実装した方が見通しがいいかな、と思います。
mapPartitionsWithIndex
自体は、RDDを返すメソッドですが、
今回はsaveAsBinaryFile
の戻り値をvoidにしていますし、mapPartitionsWithIndex
の出力は一切気にしていません。
しかし、mapPartitionsWithIndex
の後のcount
は重要です。
mapPartitionsWithIndex
は遅延処理されるので、その後アクションがなければ実行されません。
現に上記コードではcount
を忘れると何も出力されません。(それで結構ハマった…)
補足
このmapPartitionsWithIndex
を使ってバイナリを書き出す方法ですが、このヒントはsaveAsTextFile
にありました。
saveAsTextFile
の実装を見てみると、中でmapPartitions
が使われています。
mapPartitions
もmapPartitionsWithIndex
もほぼ同様の処理だと思いますので、
この方法で実装したsaveAsBinaryFile
は、saveAsTextFile
とほぼ同等の性能で動くと思います。