2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkでsaveAsBinaryFileする

Last updated at Posted at 2018-05-14

この記事の内容

2018/06/17追記
SparkのAPIに則った、正しいやり方の記事を書きましたので、
このタイトルの目的のためには、基本的にはそちらを参照して下さい。
この記事は、mapPartitionsWithIndexの使い方例としてはご利用いただけます。

SparkでPartitionごとのデータを書き出すのに、
saveAsTextFileはあるのに、saveAsBinaryFileはありません。
saveAsObjectFileはありますが、これはHadoop用のSequenceFile形式になってしまいます。
バイナリの情報をそのまま書き出すAPIがなさそうなので、その方法を探していました。
やりたいことができたので記事にします。

Javaのコード

直近で必要になりそうなのがJavaなので、とりあえずJavaのコードだけです。
今後ScalaやPythonでも書くかも。

SaveBinary.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;

import java.io.*;
import java.util.*;

public class SaveBinary {
    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TestSpark"));
        // 画像ファイルを読み込んでbyte[]のRDDを作成
        JavaRDD<byte[]> bytes = sc.binaryFiles("someImages/*").values().map(x -> x.toArray());
        saveAsBinaryFile(bytes, "out");
    }

    // 受け入れるRDDは byte[] だけ
    private static void saveAsBinaryFile(JavaRDD<byte[]> bytes, String path) {

        // 匿名クラスで、mapPartitionsWithIndexの中身を実装
        Function2 writeBinary = new Function2<Integer, Iterator<byte[]>, Iterator<byte[]>>() {
            @Override
            public Iterator<byte[]> call(Integer idx, Iterator<byte[]> iterator) throws Exception {
                // 出力ファイル名はHDFS出力風だが、実際はNFSに出力
                FileOutputStream fos = new FileOutputStream(path + "/part-" + String.format("%05d", idx));
                while (iterator.hasNext()) {
                    fos.write(iterator.next());
                }
                fos.close();
                return iterator;
            }
        };

        // mapPartitionsWithIndexで、partition毎の操作を行う
        bytes.mapPartitionsWithIndex(writeBinary, false).count();
    }
}

解説

mapPartitionsWithIndexは、RDD内の各パーティション毎に操作を行います。
また名前の通り、パーティション毎に番号がふられるので、それで別ファイル名として保存しています。

mapPartitionsWithIndexに渡す処理内容はFunction2クラスとして実装します。
RDDの中身がパーティション毎にIteratorに入ってくるので、それを使って目的の処理を実装します。
上記のように匿名クラスとして実装した方が見通しがいいかな、と思います。

mapPartitionsWithIndex自体は、RDDを返すメソッドですが、
今回はsaveAsBinaryFileの戻り値をvoidにしていますし、mapPartitionsWithIndexの出力は一切気にしていません。
しかし、mapPartitionsWithIndexの後のcountは重要です。
mapPartitionsWithIndexは遅延処理されるので、その後アクションがなければ実行されません。
現に上記コードではcountを忘れると何も出力されません。(それで結構ハマった…)

補足

このmapPartitionsWithIndexを使ってバイナリを書き出す方法ですが、このヒントはsaveAsTextFileにありました。
saveAsTextFile実装を見てみると、中でmapPartitionsが使われています。
mapPartitionsmapPartitionsWithIndexもほぼ同様の処理だと思いますので、
この方法で実装したsaveAsBinaryFileは、saveAsTextFileとほぼ同等の性能で動くと思います。

参考

StackOverflow: Apache Spark mapPartitionsWithIndex

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?