6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Sparkでバイナリ固定長ファイルを読み込む

Last updated at Posted at 2016-07-03

SparkはテキストファイルやJSON, CSVなど様々な形式のファイルを読み込むことができます。
HDFSから読み込んで、そのままRDDに変換されるので難しいこと考えずに、取り合え使えてとても便利です。

ただ、業務アプリを書いてると独自フォーマットのバイナリデータとか普通にあります。初めてのSparkを読むとSparkContext#newAPiHadoopFileやSparkContext#hadoopFile使えばできるよ! って書いてあるんですが、使い方がさっぱり記載されてません...

Spark使ってる人はHadoopを通過してるからその辺の説明は不要なのか、単にニーズの問題か、私のグーグル力の無さか、いずれか分かりませんが、やり方と振る舞いを把握するのに結構時間がかかってしまいました。なので、備忘として現状の理解を残しておこうと思います。

以前、ミーハーに買ってみたけど、実は特に読んでなくて死蔵してたHadoop 第2版とかをひっくり返しながらの理解なので、なんか間違ってるところあれば、コメントなりツイッターなりでツッコミください。

HDFS基礎

まずは、そもそもHDFSってなんなの? ってところからです。分散ファイルシステムだー、とかファイルを3個コピーしてるらしい、とかLocalityがあるらしいとかは薄っすら知ってましたが、アプリケーションから、どうやってアクセスするのかを把握してなかったので、そこから苦労しました。

HDFSとブロック

HDFSはメタ情報を管理するNameNodeと実データを管理するDataNodeからなり、巨大なデータを各サーバのローカルディスクに分割して保存する仕組みです。
超乱暴に言えば、サーバレベルでやる壮大なRAID10ですね。

例えば、下記のようなテキストファイルがあるとしまします。

It never fails.
Whenever I tell you humans this simple facts, you always react the same way.
It makes no sense at all !
Why are you humans so sensitive about the kind of container that souls are housed in ?

これは改行コードにより区切られた4行のテキストファイルですが、3台のマシン(DataNode)からなるHDFS上では下記のように分割されて保存されます。

hdfs-block.png
図1. ブロックの分散

DataNode毎に、改行基準で作られた論理レコードを無視して、固定長でバラバラに保存されていますね?
こんなの人間に読めない。「わけがわからないよ!」と叫びたくなるところですが、そこをうまくやるのがHDFSです。
この、細切れにされた固定長のデータを「ブロック」と呼びます。
また、図1は例として68バイトのブロックにしましたが、実際には1ブロックあたり64MBから128MB以上といった大きめのサイズにすることで、スループットを高めるのが一般的です。

InputFormatとRecordReader

Sparkでファイルを読み込みるときはSparkContext#hadoopFileまたはSparkContext#newAPiHadoopFileを使います。
hadoopFile()メソッドは第2引数にとるInputFormatによりどのような形式でファイルを読むかを決めます。

さらにInputFormatはgetRecordReader()を必ずオーバライドする必要があります。
このメソッドはレコードのRecordReaderを返すします。
今回、固定長読み込みのために使用しているFixedLengthRecordInputFormatはファイルをダータソースとするので、最も基本的なベースクラスになるFileInputFormatを継承しています。

RecordReaderはHDFS上のファイルを物理的な実態であるブロックから論理的なレコード単位に変換してアプリケーションに提供する高レベルのAPIです。

reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
..
val key: K = reader.createKey()
val value: V = reader.createValue()

override def getNext(): (K, V) = {
    try {
        finished = !reader.next(key, value) // 出力パラメータとしてkeyとvalueを渡す
    } catch {
        case eof: EOFException =>
        finished = true
    ...
    (key, value) // next()メソッド内で書き換えられたkeyとvalueをTuple2で返す
}

hadoopFile()メソッドを追っていくと、RDDHadoop内に上記のようなコードがあります。

これが、RDDにRecordを渡しているところの実体で、next()メソッドで、引数にとった値を出力パラメータとして扱い値を詰めます。

その結果を元にgetNext()はkeyとvalueのTupleを戻り値として、返しています。このgetNextメソッドの戻り値が最終的にRDDの各要素になります。

InputSplit

論理的なレコードは先程説明したRecoerdReaderにより提供されます。
一方で並列性の単位、すなわちパーティションの処理単位となるのがInputSplitです。つまり、前述したgetNext()メソッドはInputSplit毎に並列に呼ばれことになります。
なお、今回はファイル操作なのでFileSplitを使うため、特別に断りがなければスプリットはFileSplitのことになります。

InputSplitはバイト単位のストレージ上のファイルの位置および開始位置を返します。また、InputSpritは単なるメタ情報なのでデータそのものは持っていません。

InputSplitはInputFormatのgetSplits()メソッドにより作られるます。
重要な点として、InputSplitはあくまで並列性の単位であり、FileSplitのサイズは必ずしもBlockのサイズとは一致しない、ということです。これは後ほど詳しく説明します。

FileSplitの基本的な使い方は、そのスプリットの開始値、終端値、そして実データを下記のように取得して取り扱います。また、その数はminPartionsが最終的にinputFormat.getSplits()の引数になることで決まります。

long start = split.getStart(); // Splitの開始位置
long end = start + split.getLength();//Splitの終端位置
FSDataInputStream input = fs.open(split.getPath()); // データ実態のストリームを初期化

FSDataInputStream

先程、FileSplitのパスから作ったのがFSDataInputStreamです。これは複数のブロックを抽象化したストリームです。
つまり、分割された状態であるFileSplitの範囲のデータしか参照できないわけではなく、どののデータも読み込む事ができます。

DataFlow2.png
図2. データアクセスフロー

DriverはDistributedFileSystemを経由してNameNodeから必要なデータがどこにあるのかを確認します。
その情報を元にFileSplitを作成し、適切なLocalityを持つサーバのExecutorへ処理を渡します。ExecutorはFSDataInputStreamを経由して、ブロックにアクセスします。

FSDataInputStream#seekやFSDataInputStream#readでブロックの境界を超えた場合、FSDataInputStreamはNameNodeに再アクセスして別のDataNodeへアクセスを切り替えて値を返します。
通常このコスト小さいらしいですが、発生頻度を小さくするためにはFileSplitの数をブロックサイズと近づける必要があります。

HDFSまとめ

HDFSのファイル読み込みはInputFormatがRecoardReaderという論理的なレコードを扱う高レベルのAPIを提供し最終的にBlockの値を読込みとなり、リモートアクセスが発生した場合も隠蔽されれます。
各関係をまとめると下記のようになります。

Name Description
RecordReader 改行区切り、固定長などの論理的に意味のあるレコードを返す
InputSplit 入力ソースをパーティーションで切った単位。SparkでのRDD数はっこが基準になる
FSDataInputStraem HDFS上でのBlockにまたがったファイルストリーム。Blockをまたがるときはリモート通信をする
Block HDFS上に保存される固定長のファイル実態

また、Block, 並列単位(InptSPlit)、レコード(RecordReader#next())は下記のような関係になります。

block_and_record.png
図3. ブロック/Split/レコードの関係

バイナリ固定長ファイルの読み込み

HDFSの基礎を抑えた上で、ようやく表題のバイナリ固定長ファイルをどう読むよ? という点に入ります。
今回、理解をするために作ったFixedLengthRecordReaderの実装を見ながら進めていきます。

先に、重要な点を書いておくと「どうやってブロック/Split/レコード」の境界を超えるか、ということに始終するので、図3をしっかりと理解することが一番大事です。

レコードの端数計算とSkip処理

まず、FileSplitのStart位置とレコードのstart位置は必ずしも一致しないということを思い出してください。
通常のテキスト形式のような可変長だけではなく、固定長でもSplit数を適切にコントロールしないと図3のようにSplitの開始と終端の境界でレコードのそれと差分が出てきます。

これを解決するために、RecordReaderのコンストラクタでFileSplit#getStartの値とアプリケーションから指定したレコード長(reacordLenght)を元に下記のようにレコードの端数を求め、skip()メソッドに渡しています。

fraction = start % recordLength;
skip(fraction)

skipメソッドではfractionが0でない時に、本来のレコード長とfractionの差分ほど進めた位置おwstartにするメソッドです。

this.start -= fraction;
this.input.seek(this.start);
if (fraction != 0) {
    Tuple2<Boolean, ByteBuffer> record = readLine(this.input, this.recordLength);
    this.start += record._2().array().length;
}

詳細は後述しますが、next()メソッドではSPlitの終端地がレコード超で割り切れない場合も必ずレコードを読んでいます。
そのため、仮にfractionがある場合は、fraction分のバイト超がすでに読み込まれているため、重複しないように読み飛ばしています。

next()メソッド

next()メソッドは論理的なレコードを出力パラメータvalueに返すメソッドです。keyには一般的に位置情報を入れるが行番号を入れるわけではなく、Split内の位置を返しています。

public boolean next(LongWritable key, BytesWritable value) throws IOException {
    if (this.pos < this.end) {
        key.set(this.pos);

        Tuple2<Boolean, ByteBuffer> record = readLine(this.input, this.recordLength);
        if (record._1()) {
            setValue(record._2(), value);
        }
        return record._1();
    }
    return false;
}

これがNewHadoop内で呼ばれていたメソッドの実体となります。
readLineで1レコードを読み込みますが、この時点ではSplitの終端を読み込もうとしてるレコード長が超えてないかを考慮していません。

具体的には図3でSplitが"1"の終端で、レコード3を完全に読むとSplitの範囲を超えますが、Split1としてこれは読んでいます。そのため、Split2の開始点ではSplit1が超過する予定の読み込み位置まで、カーソルを進める必要があるため、先程のskip処理を実行しています。

続いて、readLineはレコード長文読み込んで、"(EOF判定, 読込んだレコード)"というタプルを返すメソッドです。

private Tuple2<Boolean, ByteBuffer> readLine(FSDataInputStream in, int length) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(this.recordLength);
    return new Tuple2<>(_readLine(in, length, buffer), buffer);
}

private boolean _readLine(FSDataInputStream in, int length, ByteBuffer result) throws IOException {
    byte[] buff = new byte[length];
    int readSize = in.read(buff);
    if (readSize <= 0) {
        return false;
    }
    if (length != readSize) {
        byte[] tmp = trim(buff, readSize);
        result.put(tmp);
        _readLine(in, length - readSize, result);
    } else {
        result.put(buff);
    }
    return true;
}

なぜ、1レコード読むだけの_readLineを再帰で実装してるかといえば、内部で使っているFSDataInputStream.read(byte[])はbyte配列の長さ分の値を読むことが保証されてないためです。InputStream#readメソッド自体がそれを保証するI/Fじゃないので何の問題もないのですが。

おそらくローカル環境では問題が起こりませんが、YARNクラスタ環境では、例えば本来80バイトが期待する値だとしても、未定義のタイミングで15バイトなど期待してない値になるケースがあるので、目的の値になるまで再帰を繰り返すようにしています。
最初これに気付かず検証環境で大いにハマりました。。。

まとめ

今回は、固定長に関して扱いましたが、可変長データ構造でも下記が重要になるかと思います。

  • Splitとレコードの開始位置の調整
  • FSDataInputStream.readは1レコードになるまで再帰的に呼び出す

Sparkから見たHDFSの振る舞いとか、そもそもどう実装すれば良いのかがあまり情報がないので、この辺は充実してほしいなー。
ちなみに、固定長は既存ライブラリがあるっぽいので、そっちを使う方が良さげです。

それでは、Happy Hacking!

6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?