TL;DR
Apache Flinkについての概念を解説し、簡単にローカル環境でFlinkを利用したデータ処理を行います。
想定読者
- Flinkについて概要を知りたい方
- ローカル環境でFlinkを試してみたい方
話すこと / 話さないこと
- 話すこと
- Flinkの概要
- Flinkのローカル環境構築と簡単なデータ処理
- 話さないこと
- Flinkの詳細なアーキテクチャ
- Flinkの高度な使い方
Flinkとは?
Overview | Apache Flink を参考に説明します。
Flinkはリアルタイムのデータ処理を行うための分散型ストリーム処理フレームワークです。
Streaming処理と呼ばれ、Batch処理と比較されます。
Batch処理は境界が明確なデータ・セットを一括で処理するのに対し、Streaming処理は境界のない連続的なデータを処理します。
FlinkはデータソースとしてKafkaやKinesisのようなメッセージングシステムを連携します。
これだけ見ると、Flinkは不要でKafkaやKinesisに対してのコンシューマーを自分で実装すれば十分に思えますが、
あるkeyに基づいてデータを集約したり、ウィンドウ処理を行ったり、状態管理を行ったりといった複雑な処理を簡単に実装できる点がFlinkの強みです。
上記の内容は Redistributing StreamとStateful Steam Processingの2つの内容を利用しています。
また、状態管理のためにstate snapshotやsteam replayという仕組みを備えており、障害発生時にも正確なデータ処理を保証します。
復旧方法の詳細は、Stateful Stream Processing | Apache Flinkを、
タイムスタンプ管理の詳細は、Watermark | Apache Flinkを参照してください。
Flinkのローカル環境構築と簡単なデータ処理
FlinkはMLOpsでも利用されることがあります (Ref: [1802.05872] Online Machine Learning in Big Data Streams)。
ここでは、非常に簡単なMLOps風のFlinkアプリケーションをローカル環境で実行し、Kafkaからデータを読み取って特徴量を計算する例を示します。
コードの詳細及び実行方法は下記を参照してください。Dockerとdevboxの利用ができれば実行可能です。
Flinkの基本機能と実行フロー
上記コードで使用されているFlinkの主要な機能は以下のとおりです:
1. ストリーミング環境の作成
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Flinkの実行環境を作成します。
2. データソース(Kafka Source)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("raw-features")
.build();
-
機能: Kafkaトピックからデータを継続的に読み取り
-
実行時: Kafkaブローカーに接続し、指定されたトピック(
raw-features)からメッセージをストリーミング -
実際の動作:
"user_A,10.5"のような形式でメッセージを受信 -
Ref: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
3. データ変換(Map処理)
.map(line -> {
String[] parts = line.split(",");
return Tuple2.of(parts[0].trim(), Double.parseDouble(parts[1].trim()));
})
-
機能: 文字列データをTuple2<ユーザーID, 値>に変換
-
実行時: 受信した各メッセージをパースして型安全なオブジェクトに変換
-
エラー処理: 不正な形式のデータはnullとして返してフィルタリング
4. キーによるストリーム分割(KeyBy)
.keyBy(tuple -> tuple.f0)
-
機能: ユーザーIDをキーとしてストリームを分割
-
実行時: 同一ユーザーのデータは必ず同じタスク(並列実行単位)に送られる
-
重要性: 状態管理(移動平均計算)において、同一キーのデータが順序付けられることを保証
-
Ref: https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/learn-flink/etl/#keyby
5. 状態保持処理(KeyedProcessFunction)
.process(new MovingAverageFeatureFunction())
-
機能: キーごとに状態を保持して複雑なビジネスロジックを実装
-
状態管理: 各ユーザーの直前の特徴量値を
ValueStateで保持 -
実行時:
- 初回データ: 状態がないため初回メッセージを出力
- 2回目以降: 前回値と現在値の移動平均を計算
6. 結果の出力
.print();
- 機能: 処理結果を標準出力に表示
実際のMLOpsでは: 特徴量ストア(Feature Store)やデータベースへの書き込みに置き換えられると思います。
Flinkが提供しているConnectorから、Apache Behirが提供しているConnectorを利用することも可能です。
https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/datastream/overview/ のDBがサポートされています。
実行フローの詳細
- 起動時: Flinkアプリケーションが起動
- Kafka接続: 指定されたKafkaブローカーに接続し、トピックを購読
- データ受信: メッセージを受信するたびに処理パイプラインが実行
- 並列処理: 複数のユーザーのデータが並列に処理される
- 状態更新: 各ユーザーの状態が自動的に管理される
- 出力: 計算された特徴量が指定された出力先に送信
このパターンにより、リアルタイムでの特徴量生成と機械学習パイプラインを効率的に構築できます。
感想
- 今回はFlinkの基本的な概念とローカル環境での簡単なデータ処理を紹介しましたが、複数のノードでの分散処理や障害復旧など、Flinkの強力な機能はまだまだ多くあるため、時間があればさらに深掘りしたいと感じました(keyの分散方法やStateの保存方法(OOMなど発生しないかなど))
- MLOpsにおいてもリアルタイムデータ処理のニーズが高まっているため、Flinkのようなストリーム処理フレームワークの理解は非常に重要であると再認識しました。
- Flinkの実装もJavaがメインで書かれているため、機会があればOSSのコードリーディングも行いたいと思います。