Dataflow/Beamを影で支えるcoderの話です。
coderとは
データをシリアライズ・デシリアライズするクラスです。Runnerがデータを一時保存する時や、バイト列同士で比較したい時に使います。
Beamのドキュメントでは、ここらへんで説明されており、
- バイト列へ変換し、OutputStreamに書き出すencodeメソッド
- InputStreamからバイト列を読み出し、デコードした値を返すdecodeメソッド
を定義する必要があります。
パイプラインからのCoderの使い方
使いたいCoderを、CoderRegistryやアノテーションでcoderを登録するか、withDestinationCoderのようにSinkやTransformに明示的に渡します。
Runnerからのcoderの使われ方の例
GroupByの例
同じキーで値をまとめるGroupByに関して以下の説明があり、coderが使われていることがわかります。
Two keys of type K are compared for equality not by regular Java Object.equals(java.lang.Object), but instead by first encoding each of the keys using the Coder of the keys of the input PCollection, and then comparing the encoded bytes. This admits efficient parallel evaluation. Note that this requires that the Coder of the keys be deterministic (see Coder.verifyDeterministic()). If the key Coder is not deterministic, an exception is thrown at pipeline construction time.
- キーの比較には、coderでエンコードしたバイト列同士でやるよ
- ので、deterministicなcoderの必要があるよ
- deterministicでない場合は例外発生するよ
deterministicについては後述します。
FileIOのDynamic Destinationの例
FileIOにはDynamic Destinationという機能があり、入力に応じて出力先のファイルを変えることが出来ます。
Dynamic Destiantionを使うと、以下の処理を行うPtransformが(裏側で)追加されます(ここらへん)。
- 入力をグループ(destination)に分類(byで分類を指定)
- グループ毎(※)に一時ファイルに書き出し
- 一時ファイルのファイル名を変更(withNamingで指定)
ここでcoderは、2.の一時ファイルに書き出す時に使われます。
destinationには任意のクラスを使うことが出来ますが、destinationによっては(※※)coderを明示的に指定する必要があり、そのcoderはdeterministicである必要があります。
※正確には、ウィンドウとシャードとグループ毎?
※※ CoderRegistryで推測出来ない場合?
deterministic
各coderはdeterministic・非deterministicに分かれ、非deterministicなcoderは使える箇所に制限があります。
deterministicなcoderが満たす条件は、Javadocに記載されており、
In order for a Coder to be considered deterministic, the following must be true:
- two values that compare as equal (via Object.equals() or Comparable.compareTo(), if supported)
have the same encoding.- the Coder always produces a canonical encoding, which is the same for an instance of an object
even if produced on different computers at different times.
- 同値な(Object.equals)オブジェクトは、同じエンコーディングを持つ
- どのコンピュータでも、いつも、同じ(正規化された)エンコーディングを作る
ことが条件です。
RunnerなどからはverifyDeterministicメソッドでdeterministic判定されるので、
- deterministicではないCoderは、NonDeterministicExceptionを発生させる
- deterministicなCoderはなにもしない
ようなメソッドを実装します。
なんでそんなの気にする必要あるの?
coderが非deterministicなら、
- 同値なオブジェクトが同じエンコーディングを持たない
- エンコーディングが正規化されていない
のどちらか、あるいは両方です。
こうなると、encodeしたバイト列を使うと、
- GroupByでは、同じキーが違うグループに分かれる
- Dynamic Destinationでは、同じDestinationが違う一時ファイルに書き込まれる
- その後、最終的なファイル名にするので(おそらく)どちらか消える
ような危険性があります。
deterministicなcoder・非deterministicなcoderの例
組み込みのcoderで言うと、
- SerializableCoder
- MapCoder
などはdeterministicではなく、
- StringUtf8Coder
- BooleanCoder
などはdeterministic、
- ListCoder
- KVCoder
などは、要素がdeterministicならdeterministicになります。
実装
自分でcoderを実装する必要時は、組み込みのcoderなどを参考に、
- encode
- decode
- verifyDeterministic
を実装しましょう。
JSONでエンコードすると、実装によっては、順番が保証されずdeterministicでなくなるので気をつけませう。