細かすぎて伝わらないApache Beam/Dataflow選手権#1です。
FileIOって?
Apache Beamのファイル一般を扱うIOクラスです。
ドキュメントによるとS3/GCS/HDFS/ローカルのファイルを、扱う事が出来るようです。
Dynamic Destinationって?
PCollectionの中身によって出力先を変える機能です。
具体的な設定はFileIO.Writeで定義されており、
- byで、入力(UserT)をグループ(DestinationT)に分類するクラスを指定
- viaで、入力(UserT)を出力(OutputT)に変換するクラスを指定
- withNamingで、グループ(DestinationT)からファイル名を決めるクラスを指定
の3つが、Dynamic Destinationの主な設定です。
DestinationTって
入力(UserT)分類し、グループ化するためのラベルとして使われるクラス(ジェネリクス)です。
境界(extends)が定義されてはいないので、特に必要なメソッドや親クラスは指定されていません。
って思うじゃん‥
詰まった
やろうとしていたこと
- 数十万件のPub/Subメッセージを、Dynamic Destinationで仕分け(5種類くらい)してGCSに書き込みたい
- Dynamic Destinationなしでは、楽勝に処理出来ていた
起きたこと
- ファイルを書き出す段階で、WorkerでOut Of Memory(OOM)が起きた
- (Dynamic Destination無しと、同じインスタンス種類の場合)
- ごく一部しか結果がファイルに書き込まれなかった
- ワーカーのメモリを巨大(64GB)にすると、ゆっくり書き出される
- オートスケールにしても台数は増えない
原因
DestinationTの実装に
- equals
- hashCode
を(正しく)実装していなかったのが原因。
なぜ必要か
端的にはDestinationTがHashMapで使われているためです。
細かい流れが気になる人向けに:
Dynamic Destinationでは、以下の流れで書き込みを行います。
- 入力(UserT)をファイルに割り振るために、番号を付ける(ここらへん)
- シャードの連番とDestinationTをencodeした結果のハッシュを使っています
- encodeにはCoderのencodeメソッドが使われます
下のような理由で、DestinationTがHashMapに使われています:
- 入力を連番に割り振る時(1)に、ハッシュが衝突する可能性がある
- 衝突すると、異なるDestinationがGroupByKeyでまとめられる
- そのため、一時ファイル書き出し(3)の部分でもDestinationTを計算。異なるDestinationTを異なるファイルに書き出すようにしている
- この時、DestinationTと書き出し先を管理するために、DestinationTをHashMapに突っ込んでいる
このため、DestinationTがhashCode/equalsを(正しく)実装していないと、
- 入力毎に違う書き出し先とみなされる
- つまり1ファイル1行
- 超絶細かい単位で書き出すので時間がかかる
- 出力が終わるまで、入力データが保持するので、メモリ使用量が多くなる
のではないかと推測しています。
今回学んだこと
- equals/hashCodeを実装しないでHashMapを使うと死ぬ
- Effective Java大事
- DataflowでもDumpが取れる
- Googleの人がわかりやすい資料を書いてくれています
- (がダンプしても分かるとは限らない)
- Beamのファイル書き出しは、ちょっと複雑な処理をしている