Java
Spark

SparkでsaveAsBinaryFileする

この記事の内容

SparkでPartitionごとのデータを書き出すのに、
saveAsTextFileはあるのに、saveAsBinaryFileはありません。
saveAsObjectFileはありますが、これはHadoop用のSequenceFile形式になってしまいます。
バイナリの情報をそのまま書き出すAPIがなさそうなので、その方法を探していました。
やりたいことができたので記事にします。

Javaのコード

直近で必要になりそうなのがJavaなので、とりあえずJavaのコードだけです。
今後ScalaやPythonでも書くかも。

SaveBinary.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;

import java.io.*;
import java.util.*;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
        // 画像ファイルを読み込んでbyte[]のRDDを作成
        JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
        saveAsBinaryFile(bytes, "out");
    }

    // 受け入れるRDDは byte[] だけ
    private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {

        // 匿名クラスで、mapPartitionsWithIndexの中身を実装
        Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
            @Override
            public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
                // 出力ファイル名はHDFS出力風だが、実際はNFSに出力
                FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
                while (iterator.hasNext()) {
                    fos.write(iterator.next());
                }
                fos.close();
                return iterator;
            }
        };

        // mapPartitionsWithIndexで、partition毎の操作を行う
        bytes.mapPartitionsWithIndex(writeBinary, false).count();
    }
}

解説

mapPartitionsWithIndexは、RDD内の各パーティション毎に操作を行います。
また名前の通り、パーティション毎に番号がふられるので、それで別ファイル名として保存しています。

mapPartitionsWithIndexに渡す処理内容はFunction2クラスとして実装します。
RDDの中身がパーティション毎にIteratorに入ってくるので、それを使って目的の処理を実装します。
上記のように匿名クラスとして実装した方が見通しがいいかな、と思います。

mapPartitionsWithIndex自体は、RDDを返すメソッドですが、
今回はsaveAsBinaryFileの戻り値をvoidにしていますし、mapPartitionsWithIndexの出力は一切気にしていません。
しかし、mapPartitionsWithIndexの後のcountは重要です。
mapPartitionsWithIndexは遅延処理されるので、その後アクションがなければ実行されません。
現に上記コードではcountを忘れると何も出力されません。(それで結構ハマった…)

補足

このmapPartitionsWithIndexを使ってバイナリを書き出す方法ですが、このヒントはsaveAsTextFileにありました。
saveAsTextFile実装を見てみると、中でmapPartitionsが使われています。
mapPartitionsmapPartitionsWithIndexもほぼ同様の処理だと思いますので、
この方法で実装したsaveAsBinaryFileは、saveAsTextFileとほぼ同等の性能で動くと思います。

参考

StackOverflow: Apache Spark mapPartitionsWithIndex