こんにちは。Supership株式会社 プロダクト開発本部 CXソリューション開発室の久下です。
現在は「Supership Contents Connect(以下、SC2)」というプロダクトの開発に携わっています。 SC2は、ユーザー一人ひとりに最適なタイミング・チャネル・内容でコミュニケーションを届ける統合配信基盤です。
この記事は Supershipグループ Advent Calendar 2025 の 21日目の記事です。
TL;DR
- Apache Flink 1.8 → 1.20へのアップグレードで遭遇した5つのハマりポイントと解決策を紹介
- ローカル環境とAWS環境の差異(依存関係のスコープ、kinesaliteの挙動)に注意が必要
- 複数コンシューマー環境ではEnhanced Fan-Out (EFO) の導入を検討すべき
はじめに
SC2では、Webサイトにおけるリアルタイムレコメンド機能を提供しています。 その中核技術としてApache Flinkを用いたストリーム処理を採用しており、ユーザーの商品閲覧履歴をリアルタイムで収集・処理しています。
本記事では、このFlinkアプリケーションを1.20へアップグレードした際の作業内容と、ハマりポイントについて共有します。
Apache Flinkとは
Apache Flinkは、2014年にApacheソフトウェア財団のトップレベルプロジェクトとなった分散ストリーム処理フレームワークです。「Flink」はドイツ語で「素早い」を意味し、その名の通りリアルタイム処理に特化しています。
ちなみに...
アイコンのリスは、名前が持つ「素早さ」のイメージに加え、開発の一部が行われていたベルリンで親しまれている「キタリス」がモデルになっています。
かわいらしい見た目ですが、その裏では強力なストリーム処理エンジンを象徴しています。1
他のApacheプロジェクトとの違い
「Apache」の名を冠するデータ処理系プロジェクトは多数ありますが、それぞれ役割が異なります。
| プロジェクト | 主な役割 | 処理モデル | 一言で言うと |
|---|---|---|---|
| Apache Flink | ストリーム処理 | リアルタイム(イベント単位) | 「流れてくるデータを即座に処理する」 |
| Apache Spark | バッチ処理・分析 | バッチ / マイクロバッチ | 「大量のデータをまとめて高速処理する」 |
| Apache Kafka | 分散メッセージング / ストリーム基盤 | Pub/Sub | 「データを受け渡す配管(パイプライン)」 |
| Apache Airflow | ワークフロー管理 | DAG(有向非巡回グラフ) / スケジューラ | 「いつ・何を・どの順番で実行するか管理する」 |
Flink vs Spark:処理モデルの根本的な違い
| 観点 | Apache Flink | Apache Spark (Streaming) |
|---|---|---|
| 設計思想 | ストリーム処理がネイティブ | バッチ処理が基盤、ストリームは拡張機能 |
| 処理方式 | イベント単位(真のストリーミング) | マイクロバッチ(小さなバッチの連続) |
| レイテンシ | ミリ秒〜数秒 | 数秒〜数十秒 |
| 得意なユースケース | リアルタイム異常検知、IoT、金融取引 | 大規模ETL、機械学習、アドホック分析 |
Sparkは「バッチ処理の延長でストリームも扱える」のに対し、Flinkは「最初からストリーム処理のために設計されている」という根本的な違いがあります。
Apache Flinkの4つの主要な特徴
Flinkの主な特徴は以下の4点です。
| 特徴 | 説明 |
|---|---|
| 低レイテンシ | データが到着した瞬間に処理を開始(バッファリングなし)。ミリ秒単位の即時応答が可能 |
| Exactly-Once保証 | 障害発生時のリトライでも、データの「重複」や「欠損」が発生しない(チェックポイント機能による状態管理) |
| ステートフル処理 | 処理途中の「状態(State)」をメモリやディスクに保持。過去のデータに基づいた複雑な集計やウィンドウ処理が可能 |
| イベントタイム処理 | 通信遅延でデータの到着順序が前後しても、データに含まれる「発生時刻」に基づいて正しい順序で処理 |
Flinkが適しているユースケース
上記の特徴により、Flinkはデータの 「鮮度」 と 「正確性」 の両方が求められるクリティカルな領域で採用されています。
| ユースケース | 具体的な処理内容 | このケースで活きるFlinkの機能 |
|---|---|---|
| リアルタイム不正検知 | クレジットカード取引などのトランザクションを即時判定し、ブロックする | 低レイテンシ: ミリ秒単位の応答速度が必要 |
| IoTセンサー監視 | 工場機器や車両からのセンサーデータを集約し、異常値を即座に検出・アラート発報 | イベントタイム処理: ネットワーク遅延やデータの順序乱れを正しく扱える |
| ストリーミングETL | Kinesis/Kafka等のログをリアルタイムで整形・変換し、DBやDWHへ書き込む | Exactly-Once保証: データの重複や欠損が許されない |
| リアルタイムレコメンド (今回のケース) | ユーザーの直近の行動(閲覧・購入)を即座に次の表示コンテンツに反映させる | ステートフル処理: 「過去の行動履歴(State)」を保持しながら最新イベントを処理 |
SC2では、この中でも特に 「ステートフル処理」 と 「低レイテンシ」 の特性を活かし、ユーザーの"今"の興味に合わせたレコメンドを実現しています。
システム構成と開発環境
SC2のストリーミング処理パイプラインは、AWSのマネージドサービスを活用した以下のアーキテクチャで構成されています。
全体アーキテクチャの流れ
配信サーバーから出力されるアクセスログは、Fluentdを経由してAmazon Kinesis Data Streamsへ転送されます(※図中ではFluentd部分は省略)。
その後、Apache Flink(Amazon Managed Service for Apache Flink)がストリームデータを読み込み、以下の処理を行います。
| 処理フェーズ | 内容 |
|---|---|
| 入力 | Kinesisからのリアルタイムユーザー行動ログ |
| 処理 | ユーザーごとの行動履歴の集約・整形(RDSから組織情報を結合) |
| 出力 | 最新の履歴データをAmazon DynamoDBへ書き込み |
ローカル開発環境の工夫
クラウドネイティブな構成ですが、開発効率を高めるためにローカル環境でもAWSサービスを再現してテストを行っています。
Dockerコンテナを活用し、以下のツールで本番に近い環境を構築しています。
| AWSサービス | ローカル代替ツール |
|---|---|
| Amazon Kinesis Data Streams | instructure/kinesalite |
| Amazon DynamoDB | amazon/dynamodb-local |
これにより、AWSへのデプロイを待つことなく、ローカルマシン上でストリーム処理のロジック検証から結合テストまでを完結できる体制を整えています。
アップグレード後の構成
今回の移行は単なるバージョンアップではなく、言語レベル(Java/Scala)からの刷新を含む大規模な環境変更となりました。
主要コンポーネント・ランタイムの変化
主要なミドルウェアおよび言語バージョンの変更点は以下の通りです。
| コンポーネント | 旧バージョン | 新バージョン | 備考 |
|---|---|---|---|
| Apache Flink | 1.8.3 | 1.20.1 | EOL版からAWS・Flinkコミュニティ両方でサポートされる最新バージョンへ |
| Java | 8 | 11 | Flink 1.20推奨環境へ移行 |
| Scala | 2.11.12 | 2.12.8 | Flinkのサポート対応に合わせ更新 |
| AWS SDK | v1 (1.11.x) | v2 (2.20.x) | AWS SDK for Java v1は2025年12月31日にサポート終了(EOS)2 |
| SBT | 1.2.6 | 1.9.9 | ビルドツールの刷新 |
CI/CD環境(CodeBuild)の更新とビルド設定
1. ビルドイメージの変更
Flink 1.20 へのアップグレードに伴い、CodeBuildのイメージを aws/codebuild/standard:5.0 以上 へ変更しました。
- Java 11 環境の利用: Flink 1.20 のビルドに必須となる Java 11 以降を確保
-
AWS CLI v2 への移行:
aws kinesisanalyticsv2コマンドを最新の仕様で安定して利用
2. ビルドコマンドの調整(SBT)
ビルド時のメモリ不足(Metaspaceエラー等)を防ぐため、以下のオプションを明示的に指定しています。
# メモリ割り当てを強化
export SBT_OPTS="-Xmx4G -XX:MaxMetaspaceSize=2G"
# バージョンを注入してJARを作成
sbt clean "set version := \"$VERSION\"" assembly
依存ライブラリ・コネクタの詳細
Flink本体の変更に伴い、周辺ライブラリも大幅に入れ替えを行いました。特にKinesisコネクタはFlink本体から分離されたバージョン(v5系)へ移行しています。
| カテゴリ | ライブラリ名 | 旧バージョン | 新バージョン |
|---|---|---|---|
| Connector | flink-connector-kinesis | 1.8.3 | 5.0.0-1.20 |
| Flink Libs | flink-streaming-scala | 1.8.3 | 1.20.1 |
| flink-runtime-web | 1.8.3 | 1.20.1 | |
| flink-scala (新規) | - | 1.20.1 | |
| flink-core (新規) | - | 1.20.1 | |
| flink-streaming-java (新規) | - | 1.20.1 | |
| AWS | dynamodb / apache-client | 1.11.755 | 2.20.0 |
| Utils | Kryo (新規) | - | 2.24.0 |
| Plugin | sbt-assembly | 0.14.6 | 2.1.5 |
※ Kryo: Flink 1.20系でのクラスロード挙動に合わせるため、明示的に依存関係に追加しています。
ハマりどころと解決策
ここからは、実際にアップグレード作業とAWSデプロイを行う中で遭遇した「ハマりポイント」とその解決策を紹介します。
1. ローカルとAWSの差異:依存ライブラリのスコープ
ローカル環境(Docker)では動くのに、AWS環境へデプロイすると NoSuchMethodError や ClassNotFoundException でクラッシュする事象が発生しました。
原因:Fat JARの競合
AWSのランタイムには既にFlinkのコアライブラリが含まれています。アプリのJAR(Fat JAR)の中にFlink本体を含めてしまうと、環境側のクラスと競合します。
対策:Providedスコープの徹底
SBT設定で、Flink本体に関連するライブラリを provided スコープに設定し、JARから除外します。
val flinkVersion = "1.20.1"
libraryDependencies ++= Seq(
// Flink本体は provided に設定して JAR に含めない
"org.apache.flink" % "flink-clients" % flinkVersion % Provided,
"org.apache.flink" % "flink-core" % flinkVersion % Provided,
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
"org.apache.flink" % "flink-runtime-web" % flinkVersion % Provided,
// KinesisコネクタなどはAWS環境に含まれないため含める必要がある
"org.apache.flink" % "flink-connector-kinesis" % "5.0.0-1.20",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-scala" % flinkVersion,
// AWS SDK v2
"software.amazon.awssdk" % "kinesis" % "2.20.0",
// ...
)
参考:
ローカル実行時の注意
IntelliJ IDEAなどのIDEや sbt run でローカル実行する場合、provided のままだとクラスが見つからずエラーになります。開発時は以下の設定を追加してください。
# ローカル実行時(sbt run)でも provided なライブラリを読み込む設定
Compile / run := Defaults.runTask(
Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
Compile / runMain := Defaults.runMainTask(
Compile / fullClasspath,
Compile / run / runner
).evaluated
2. ローカルとAWSの差異:Kinesis読み取り位置とkinesaliteの罠
FlinkKinesisConsumer の設定において、読み取り開始位置(Starting Position)は環境ごとに厳密に切り替える必要があります。これは単なるテストの利便性だけでなく、ローカル環境で使用するモック(kinesalite)の仕様上の制約によるものです。
| 環境 | 推奨設定 | 理由 |
|---|---|---|
| ローカル開発 | TRIM_HORIZON |
kinesaliteはLATEST指定時に直前のPutデータを読み取れない場合がある3
|
| AWS環境 | LATEST |
TRIM_HORIZONだと再起動のたびに過去データを全て再処理してしまう |
val consumerConfig = new Properties()
// ローカル(kinesalite)は LATEST だと動かないため TRIM_HORIZON を強制する
// 本番は再処理事故を防ぐため LATEST (またはスナップショット復帰) を使用する
val position = sys.env.getOrElse("STREAM_START_POSITION", "LATEST")
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, position)
本番環境での注意
本番用コードに TRIM_HORIZON が残っていると、デプロイや再起動のたびにKinesisの保存期間分(デフォルト24時間〜)の過去データをすべて再処理してしまいます。DynamoDBへの負荷スパイクなどに直結するため、環境変数等で確実に切り替えてください。
3. 実装ロジックの刷新:Watermarkと時刻
Flink 1.8時代の古いAPI(AssignerWithPeriodicWatermarks)はFlink 1.11で非推奨となったため、新しい WatermarkStrategy API へ書き換えました。
Event Time と Processing Time とは
Flinkでは、ストリーム処理における「時間」の概念として、主に Event Time と Processing Time の2種類があります。
| 時間の種類 | 定義 | メリット | デメリット |
|---|---|---|---|
| Event Time | イベントが実際に発生した時刻(ログに含まれるタイムスタンプ) | 遅延データも正しい順序で処理。再処理時も同じ結果 | Watermarkの管理が必要 |
| Processing Time | Flinkがイベントを処理するサーバー時刻 | 実装がシンプルで処理が速い | 遅延データにより集計結果がブレるリスク |
Watermarkとは
Watermarkは、Event Timeベースの処理において「この時刻より前のイベントはもう到着しないと推定される」ことを示すマーカーです。Flinkはこれを基準にウィンドウを閉じるタイミングを判断します。
参考:
当初の実装(Event Time)
SC2ではユーザー行動の順序が重要であるため、当初は Event Time を採用しました。
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5)) // 5秒の遅延を許容
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {
element.timestamp // イベント内の時刻を返す
}
})
)
複数並列度での問題と暫定対応
しかし、複数並列度の環境ではウィンドウ処理が正常に終了しないという問題が発生しました。
原因:
並列度がKinesisのシャード数より大きい場合、一部のサブタスクにはシャードが割り当てられずアイドル状態となります。このとき:
- アイドルサブタスクのWatermarkは進まない(初期値のまま)
- Flink全体のWatermarkは全サブタスクの最小値で計算される
- 結果として全体のWatermarkが進まず、ウィンドウが永遠に閉じない
参考:
暫定対応:
調査・検証の時間が限られていたため、暫定対応として Processing Time に切り替えました。
// 変更前(Event Time ベース)
.timeWindow(Time.seconds(timeWindowSeconds))
// 変更後(Processing Time ベース)
.window(TumblingProcessingTimeWindows.of(Time.seconds(timeWindowSeconds)))
.timeWindow() はFlink 1.12で非推奨化
Flink 1.12以降では .timeWindow() が非推奨となり、明示的にウィンドウの種類を指定することが推奨されています。
// 旧API (非推奨)
.timeWindow(Time.seconds(n))
// 新API (推奨)
.window(TumblingProcessingTimeWindows.of(Time.seconds(n)))
Event Timeベースの場合は TumblingEventTimeWindows を使用します。
本来の解決策:withIdlenessの利用
本来は WatermarkStrategy の withIdleness メソッドを利用することで、Event Timeのまま問題を解決できます。
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {
element.timestamp
}
})
.withIdleness(Duration.ofMinutes(1)) // 1分間イベントがなければアイドルとみなす
)
withIdlenessを指定すると、一定時間イベントを受信しないサブタスクはアイドルとしてマークされ、Watermark計算から除外されます。
4. 運用と監視 (Observability)
CloudWatch Metricsによる監視では、「プロセスがいるか」だけでなく「健全か」を見る必要があります。
監視すべきメトリクス
| メトリクス | 用途 | 検知対象 |
|---|---|---|
fullRestarts |
完全再起動回数 | CrashLoop(再起動ループ)状態の検知 |
uptime |
ジョブ起動からの経過時間(ミリ秒) | アプリケーション停止の検知 |
なぜuptime監視が必要か
fullRestartsメトリクスはアプリケーションが実行中に再起動した回数をカウントします。しかし、以下のケースではfullRestartsは増加しません:
- アプリケーションが停止(READY状態)した場合(AWSによる強制停止、手動停止、リソース不足など)
-
ジョブが完了した場合(
uptimeは-1を返す)
特に注意すべきは、Apache Flink 1.6/1.8/1.11のサポート終了です。2025年7月14日以降、これらのバージョンを使用するアプリケーションは強制的にREADY状態になります。この場合、fullRestartsは増加せず、uptimeの監視がなければ停止に気付けません。
アラーム設計
| メトリクス | 検知対象 | 設定例 |
|---|---|---|
fullRestarts |
CrashLoop(再起動ループ) |
RATE(fullRestarts) > 0 が継続 |
uptime |
アプリケーション停止 |
uptime < 60秒 または データなし |
補完関係: fullRestartsは「不安定な実行状態」を、uptimeは「停止状態」を検知します。両方を組み合わせることで、網羅的な監視が可能になります。
Terraform実装例
fullRestarts監視(CrashLoop検知)
resource "aws_cloudwatch_metric_alarm" "job_fullrestarts" {
alarm_name = "flink-job-fullrestarts-my-application"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "5"
datapoints_to_alarm = "3"
threshold = "0"
alarm_description = "Job is restarting repeatedly (CrashLoop detected)"
treat_missing_data = "missing"
insufficient_data_actions = [aws_sns_topic.alarm.arn]
alarm_actions = [aws_sns_topic.alarm.arn]
ok_actions = [aws_sns_topic.alarm.arn]
# RATE関数で再起動の増加率を計算
metric_query {
id = "m1"
metric {
metric_name = "fullRestarts"
namespace = "AWS/KinesisAnalytics"
period = "60"
stat = "Maximum"
dimensions = {
Application = "my-flink-application"
}
}
}
metric_query {
id = "e1"
expression = "RATE(m1) * 60" # 1分あたりの再起動回数
label = "Job Restart Rate"
return_data = "true"
}
}
uptime監視(アプリケーション停止検知)
resource "aws_cloudwatch_metric_alarm" "flink_uptime" {
alarm_name = "flink-uptime-my-application"
comparison_operator = "LessThanThreshold"
evaluation_periods = "1"
metric_name = "uptime"
namespace = "AWS/KinesisAnalytics"
period = "60"
statistic = "Maximum"
threshold = 60000 # 60秒(ミリ秒単位)
alarm_description = "Flink application is not running"
dimensions = {
Application = "my-flink-application"
}
# 重要: メトリクスが発行されない場合もアラームを発火
treat_missing_data = "breaching"
insufficient_data_actions = [aws_sns_topic.alarm.arn]
alarm_actions = [aws_sns_topic.alarm.arn]
ok_actions = [aws_sns_topic.alarm.arn]
}
ポイント:
-
fullRestarts: RATE関数で増加率を監視。過去の再起動履歴に関係なく「今、再起動が頻発しているか」を検知 -
uptime:treat_missing_data = "breaching"が重要。アプリ停止時はメトリクス自体が発行されなくなるため
5. 拡張ファンアウト (Enhanced Fan-Out) の導入
発生した問題
データリカバリーのため、1つのKinesisストリームに対して複数のFlinkアプリケーションを同時に起動したところ、以下のエラーが発生しました:
java.lang.RuntimeException: Retries exceeded for getRecords operation - all 3 retry attempts failed.
原因
デフォルトの POLLING モードでは、Kinesisシャードあたり以下の制限があります:
| 制限項目 | 値 |
|---|---|
| GetRecords API呼び出し制限 | 5回/秒/シャード |
| 読み取りスループット制限 | 2MB/秒/シャード(全コンシューマーで共有) |
複数のコンシューマー(Flinkアプリケーション)が同じシャードからデータを取得しようとすると、このAPI制限に抵触し、リトライ失敗が発生します。
解決策:Enhanced Fan-Out (EFO) の利用
Enhanced Fan-Out (EFO) を有効化することで、各コンシューマーに専用の2MB/秒の帯域が割り当てられ、API制限を回避できます。
| 項目 | POLLING | EFO |
|---|---|---|
| データ取得方式 | Pull型(GetRecords API) | Push型(SubscribeToShard API) |
| シャードあたり帯域 | 2MB/秒(全コンシューマーで共有) | 2MB/秒(コンシューマーごとに専用) |
| API制限 | 5回/秒/シャード | なし |
| レイテンシ | 200-1000ms程度 | 平均70ms |
| 追加コスト | なし | あり |
事前準備
EFOを利用するには、以下の2つの準備が必要です。
1. Kinesisストリームコンシューマーの登録
各Flinkアプリケーションに対して、ストリームごとに一意のコンシューマー名で事前登録が必要です。
# 通常運用用
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
--consumer-name my-flink-app
# リカバリー用
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
--consumer-name my-flink-app-recovery
# 登録確認
aws kinesis list-stream-consumers \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream
2. IAMロールへの権限付与
FlinkアプリケーションのIAMロールに、EFO用の追加権限を付与する必要があります。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KinesisStreamPermissions",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:RegisterStreamConsumer",
"kinesis:ListStreamConsumers"
],
"Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream"
},
{
"Sid": "KinesisConsumerPermissions",
"Effect": "Allow",
"Action": [
"kinesis:SubscribeToShard",
"kinesis:DescribeStreamConsumer",
"kinesis:DeregisterStreamConsumer"
],
"Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream/consumer/*"
}
]
}
権限とリソースタイプの対応
各IAMアクションは、操作対象のリソースタイプ(stream または consumer)が異なります。
| 権限 | 用途 | リソースタイプ |
|---|---|---|
SubscribeToShard |
シャードからのデータ購読(EFOのコア機能) | consumer |
DescribeStreamConsumer |
コンシューマーのステータス確認 | consumer |
DeregisterStreamConsumer |
コンシューマーの登録解除 | consumer |
RegisterStreamConsumer |
EFOコンシューマーの登録 | stream |
ListStreamConsumers |
登録済みコンシューマー一覧の取得 | stream |
Scala実装例
読み取り開始位置の設定:
リカバリー用Flinkアプリケーションでは、特定の時刻からデータを読み取り直す必要があります。
// 読み取り開始位置の設定
val initPos = config.getProperty("flink.stream.initpos", "LATEST")
kinesisProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, initPos)
// 読み取り位置が「AT_TIMESTAMP」の場合、日時フォーマットと開始日時を設定
if (initPos == "AT_TIMESTAMP") {
val format = config.getProperty("flink.stream.timestamp.format", "yyyy-MM-dd'T'HH:mm:ss")
val timestamp = config.getProperty("flink.stream.timestamp", "2025-10-28T16:00:00")
kinesisProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp)
kinesisProps.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, format)
}
| 設定値 | 説明 | ユースケース |
|---|---|---|
LATEST(デフォルト) |
最新のレコードから読み取り開始 | 通常運用 |
TRIM_HORIZON |
保持期間内の最も古いレコードから読み取り | 全データ再処理 |
AT_TIMESTAMP |
指定した時刻以降のレコードから読み取り | 特定時点からのリカバリー |
Kinesisストリームの保持期間に注意
AT_TIMESTAMP で指定できるのは、ストリームの保持期間内のデータに限られます。
- デフォルト保持期間: 24時間
- 最大保持期間: 365日(要追加料金)
# 保持期間を72時間に延長
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 72
参考: データ保持期間を変更する
Enhanced Fan-Outの設定:
// Enhanced Fan-Out設定(複数Flinkアプリで同じストリームを読む場合)
val useEfo = config.getProperty("flink.stream.use_efo", "false")
if (useEfo == "true") {
val consumerName = config.getProperty("flink.stream.efo_consumer_name", "my-flink-app")
kinesisProps.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO")
kinesisProps.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, consumerName)
}
コスト比較
EFOはPOLLINGと比較して追加コストが発生します。利用前にコストを理解しておくことが重要です。
| 項目 | POLLING | EFO |
|---|---|---|
| シャード時間料金 | 基本料金に含まれる | 追加で $0.015/コンシューマー・シャード・時間 |
| データ取得料金 | なし | $0.013/GB |
コスト試算例(4シャード、2コンシューマー、100GB/月、720時間/月):
コンシューマー・シャード・時間料金:
$0.015 × 2コンシューマー × 4シャード × 720時間 = $86.40/月
データ取得料金:
$0.013 × 100GB = $1.30/月
合計: 約 $87.70/月(追加)
コスト管理のポイント
- 使用後はコンシューマー登録を解除する - 登録されている限り課金が継続
- 開発環境ではPOLLINGを使用する - 複数コンシューマーが不要な環境ではEFOを有効化しない
# コンシューマー登録の解除(リカバリー完了後)
aws kinesis deregister-stream-consumer \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
--consumer-name my-flink-app-recovery
今後の対応
今回のアップグレードで安定稼働を実現しましたが、以下の課題が残っています。
1. Scala APIからJava APIへの移行
FLIP-265に基づき、FlinkのScala APIは既に非推奨となっており、将来のバージョンで削除される予定です。
| ステータス | 内容 |
|---|---|
| 現在(Flink 1.20) | Scala API は @Deprecated(非推奨)としてマーク済み |
| 将来(Flink 2.0) | Scala API の削除予定 |
非推奨化の主な理由:
- Scala 2.12.7に限定されており、新バージョン(Scala 2.13以降)への互換性維持が困難
- Java DataStream APIの方が機能が充実しており、新機能もJavaを優先して開発される
現在のScalaベースの実装をJava DataStream APIへ段階的に移行することを検討しています。
参考:
2. Kinesis Connector の新API(KinesisStreamsSource)への移行
本記事で紹介した FlinkKinesisConsumer は レガシーAPI であり、Flink 1.15以降では FLIP-27 ベースの新しいソースAPI KinesisStreamsSource が推奨されています。
| 項目 | レガシーAPI | 新API |
|---|---|---|
| クラス名 | FlinkKinesisConsumer |
KinesisStreamsSource |
| 依存ライブラリ | flink-connector-kinesis |
flink-connector-aws-kinesis-streams |
| 設定クラス | ConsumerConfigConstants |
KinesisSourceConfigOptions |
| 設計思想 | 従来のSourceFunction | FLIP-27 ベースの統一ソースAPI |
現在の実装でレガシーAPIを採用している理由:
- ローカル開発環境で使用している kinesalite との互換性を確保するため
- 新API(
flink-connector-aws-kinesis-streams)でのカスタムエンドポイント設定(ローカルモック接続)に関するドキュメントが限定的
今後の移行方針:
- AWS環境専用の実装では
KinesisStreamsSourceへの移行を検討 - ローカル開発環境では引き続き
FlinkKinesisConsumerを使用するか、テスト戦略を見直し
参考:
3. Processing TimeからEvent Timeへの変更
「3. 実装ロジックの刷新」で述べた通り、現在は暫定対応としてProcessing Timeベースのウィンドウ処理を採用しています。
課題:
- ネットワーク遅延や処理負荷により、同じデータでも異なる集計結果になる可能性
- 障害復旧時のデータ再処理で、本番稼働時と異なる結果が得られる可能性
今後の対応方針:
WatermarkStrategy.withIdleness() を導入し、Event Timeベースの処理に戻すことを予定しています。
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = {
element.timestamp
}
})
.withIdleness(Duration.ofMinutes(1)) // アイドルサブタスクをWatermark計算から除外
)
これにより、再現性の確保(障害復旧時も同じ結果)と順序保証(イベント発生時刻に基づいた正確な処理順序)が実現できます。
まとめ
Apache Flink 1.8から1.20へのアップグレードは、単なるバージョンアップではなく、言語ランタイム(Java 8→11、Scala 2.11→2.12)やAWS SDK(v1→v2)を含む大規模な環境刷新となりました。
本記事で紹介した主なポイントは以下の通りです:
| カテゴリ | ハマりポイント | 解決策 |
|---|---|---|
| 依存関係 | ローカルでは動くがAWSでクラッシュ | Flink本体をprovidedスコープに設定 |
| ローカル開発 | kinesaliteでLATESTが動作しない |
環境変数でTRIM_HORIZONに切り替え |
| Watermark | 複数並列度でウィンドウが閉じない | 暫定でProcessing Time採用、今後withIdleness導入予定 |
| 監視 | アプリ停止を検知できない |
fullRestartsとuptimeの両方を監視 |
| 複数コンシューマー | GetRecords API制限に抵触 | Enhanced Fan-Out (EFO)を導入 |
Flinkのバージョンアップは、公式ドキュメントに書かれていない「環境差異」や「ローカルモックの挙動の違い」との戦いでもあります。本記事が同様の移行作業を行う方の参考になれば幸いです。
参考記事
- Apache Flinkとは何か?リアルタイム処理エンジンの特徴・利点とユースケースの概要を詳しく解説
- Apache Flinkを使いKinesisストリームデータを処理する
- Managed Service for Apache Flinkとは?
- Managed Service for Apache Flink デベロッパーガイド
- Apache FlinkでCEPを実現 ~イベント時刻とグルーピング~
- sbt runでprovidedな依存ライブラリをクラスパスに含める
最後に宣伝です。
Supershipではプロダクト開発やサービス開発に関わる人を絶賛募集しております。
ご興味がある方は以下リンクよりご確認ください。
Supership 採用サイト
是非ともよろしくお願いします。
