はじめに
こんにちは!LOBの @RyosukeKawamura です。
この記事は 株式会社 LOB Advent Calendar 2018 19日目の記事です。
2度目の投稿です。
今私は業務でApache Flinkを用いたデータ処理を実装しているので今回はそれについて書きたいと思います。
Flinkに関しては具体的な事例はネット上に多くなく、あったとしても「試してみた」とか「Flinkの◯◯APIの説明」みたいなものくらいしかヒットしません。ので結局どうすればいいんだ?ってのが理解しづらかったのですが、ちょうど同僚にとても詳しい方がいて、いろいろ教わりつつ進めていく中で「こ、これは駆使すればめっちゃ便利やんけ。。。!」というのがようやくわかってきたので自分の知識の整理ののためにもまとめていきたいと思います。
Apache Flink とは
本家のドキュメントが結構充実しています。しっかり読めばだいたいわかる(はず)。
一言でいうと「分散ストリーム処理プラットフォーム」です。同じようなの(StormとかSparkとか)がいっぱいあってもはやようわからんですが、ストリーム処理から進化してバッチなど他の領域にも染み出してきたのがFlinkだそうな。
Flinkの良いところ
まだ真髄に触れられていはいませんが下記3点あたりかなと思っています。
①ストリーム・バッチともにサポートされていてかつ同じような処理の書き方で概ね実装できる
随時送られてくるストリームデータを処理するときも、ファイル連携であるときも、テーブルを読んで処理するときも、概ね同じような処理で実装できるのは結構嬉しいと思います。
今まさに進めているのはtsvファイルが連携されてくるバッチ処理なのですが、これはもしかするとファイルではなくログがKafka等経由で送られてくるように変わるかもしれません。プロジェクトを進めていると大きめの構成変更はちょくちょく発生すると思うので、これは結構嬉しいポイントです。とはいえストリーミング処理でないと使えないAPIなどもあるので、今回はファイル連携される有限のデータを扱うがストリーミング処理として扱うという実装をしています(詳細は「詰まったところ」で後述)。
②APIが豊富で集計時等に痒いところに手が届く
これが一番驚きました。まるで配列を処理しているかのようにサクサクとメソッドをつなげていくだけで集計などができるような仕組みになっています。関数型の言語に慣れている人は特にすんなりいけるとのこと。(ぼくは経験なし。。。)
今回はJavaで実装していますが、ScalaでもFlinkが使えるので、そちらのほうがいろいろ調子いいみたいです。
サンプルコードはこんな感じ。
③障害発生時に賢く復旧してくれてそこから処理を再開できる(ような実装が可能)
すみませんこれはまだ実装できていない部分なので詳細は理解できていません。。。
実装次第ではチェックポイントみたいなものを設けてそこからきれいにやり直し、みたいなことができるようです。
冪等性が担保しやすそうで嬉しいですね。
Flinkクラスタにデフォルトでついているコンソールもジョブ状況を確認したりするには使い勝手がよくとても捗ります。
詰まったところ
CSV等有限データをストリーミング処理として扱う
進めていたのは有限ファイルをインプットとした処理であったものの「ストリーミング処理のAPIを使いたい」「ゆくゆくストリーミング処理に変わるかも」という要件に対応する必要がありました。
ファイル読み込み->一回Tableとして読み込む->Tableからデータをロード->結果をデータストリームとして扱う
というややトリッキーなことをすれば解決しました。よくよく探せば公式のドキュメントにも載っているので、やっぱりきちんと読みましょうってことですね、、、実装はこういう感じになります。
// ストリーム処理・テーブルの処理に必要な環境を読み込む
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
// ファイル読み込み
TableSource source = new CsvTableSource.Builder()
.path("/path/to/user_data.tsv")
.ignoreFirstLine() // ヘッダー読み飛ばしがこのメソッド書くだけでOKなの地味に嬉しい
.ignoreParseErrors() // 不正なデータはこれで無視できる
.fieldDelimiter("\t") // タブ区切り
.field("cookie", Types.STRING)
.field("date", Types.TIMESTAMP)
.build();
// テーブルのデータソースとして登録
tEnv.registerTableSource("users", source);
// テーブルとしてデータを取得
Table table = tEnv
.scan("users"); // ここに.filter()とか.select()とか繋ぐとSQLチックな処理もできる
// ストリーミングに変換
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) // Entityを定義しておけばその型のストリームとして読み込める
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); // ここは省略してます(FlatMapしてaddSinkにわたすみたいなのがよくある実装の形らしい)
sEnv.execute(); // ここ忘れるとなにも動かない
初見だとそもそもどう動くのかイメージがつきづらい
これに尽きます。笑
さっきの
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);
というのだと、ストリーミングをparallelNumの数の分並行でhogeFlatMap()が処理していって、hogeFlatMap()内でoutしたものがaddSinkに渡ってくる、というような挙動になります。シェルでパイプしているみたいに処理をつなげていけるのが快適ですね。最初意味わかりませんでした。
flatMapのなかは毎回呼ばれてしまうので、この中でコネクション作ったりしてしまうとソケット接続できなくなって死んだり(コンストラクタにわたす必要がありますね)と、とにかく何がどんなタイミングで動くのかがわかりにくい、ステップ実行しても見えづらいのが最初苦労したポイントです。
おわりに
癖をつかむまで(まだつかめてないけど)イメージがわかず分かりにくいですが、使いこなせればデータパイプラインの強い味方になってくれそうな香りのするFlink。せっかくアドテクのビッグデータに触れられるチャンスがあるので、もっと勉強して使いこなせるようになっていければなと思っています。
弊社はFlinkだけじゃなく、新しいデータ処理技術が使えるチャンスがたくさんあります。特に他社では扱えない量・質のデータが扱えるので興味がある・一緒にやりたいって方がいればぜひ一緒にやりましょう!
「流通のケタを変える、広告プラットフォームを創る」ための仲間、絶賛大募集中です!
捗る調子イイデータ基盤を作ろうと日々作戦会議中です!!
https://lob-inc.com/recruit/