本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。
Apache Flink® コミュニティに従って、Flink ウィンドウでのデータ削除をより柔軟にしよう
はじめに
Flinkでは、ウィンドウは無制限のデータストリームを分割するための重要なメカニズムです。各ウィンドウは一定期間にわたってデータを収集し、これにより有限のデータセットを計算および処理できます。しかし、ウィンドウ内のすべてのデータが実際のアプリケーションで価値を持つわけではありません。それが理由で、Flinkは特定のルールに基づいてウィンドウから不要なデータをクリーンアップできる「Window Evictor」メカニズムを提供しています。
例えば、10分間隔の温度読み取り値を含むセンサーデータを処理しているとしましょう。一部の読み取り値は故障したセンサーによって生成された異常値かもしれません。これらのデータポイントがすぐにクリーンアップされない場合、後続の計算結果に影響を与えます。「Window Evictor」はこのデータクリーンアップ作業を担当するコンポーネントであり、有効なデータのみが計算に参加することを保証します。
このメカニズムをある特定の時間枠内に返却された本を整理する必要のある司書と考えてみてください。古いルールでは、最も早く返却された本からしか処理できず、それらを棚に並べる前にどの本を処理するかを決めなければなりませんでした。しかし、時には整理後に修理が必要な本を決定したい場合や、返却時刻ではなく本の状態に基づいて処理順序を決めたい場合もあります。
これが、FLIP-4以前のFlinkの「Window Evictor」が直面していた問題でした。それは、ウィンドウ関数がデータを処理する前、ウィンドウの最初からのみデータを排除することができました。まるで固定された方法で作業しなければならない司書のように、十分な柔軟性がありませんでした。FLIP-4の目標は、これらの制限を打ち破り、データ削除をよりスマートで柔軟なものにすることでした。
古いアプローチの何が問題だったのか?
元の「Window Evictor」の制限を、より直感的な例を通して理解してみましょう。
主に2つの制限がありました:
最初の制限:開始位置からのみ削除可能
常に最も早く返却された本から処理を始めなければならないというルールがあると想像してください。最近返却された本であっても即座に対応が必要な場合があります。実際のアプリケーションでは、新しいデータの方が重要である場合もあり、データの内容に基づいて保持すべきものを決定したい場合があります。
二番目の制限:処理前の削除のみ可能
これは、本を整理する前にどの本を処理するかを決める必要があるというルールのようなものです。整理中に問題を特定するのではなく、事前に判断しなければなりません。データ処理においては、処理結果を見てから特定のデータを削除するかどうかを決定したい場合があります。
これらの制限が異なるシナリオにどのような影響を与えるか見てみましょう:
例えば:
- センサーデータ処理では、時間順ではなく品質に基づいてデータを保持したい場合があります。
- 取引システムでは、バッチ統計を計算した後にいくつかの異常取引を除外する必要がある場合があります。
FLIP-4はこれをどのように解決するのか?
FLIP-4は「Window Evictor」に対して2つの重要な改善を行いました:
より柔軟な削除位置
現在、「evictor」は特定の必要性に基づいてウィンドウ内の任意の位置からデータを削除できます。返却順だけでなく、本の実際の状態に基づいて処理できるようになりました。
賢い処理タイミング
2つの処理タイミングが導入されました:
- evictBefore:ウィンドウ関数がデータを処理する前にデータ削除を行う。
- evictAfter:ウィンドウ関数がデータを処理した後にデータ削除を行う。
この設計により、処理結果に基づいてさらにデータを削除するかどうかを決定できます。
実装方法は?
FLIP-4は、改良されたインターフェース設計を通じてこれらの機能を実現しました。新しいEvictorインターフェースには以下の2つのコアメソッドが含まれています:java
void evictBefore(Iterable> elements,
int size,
W window,
EvictorContext evictorContext);
void evictAfter(Iterable> elements,
int size,
W window,
EvictorContext evictorContext);
システムはまた、いくつかの事前構築済みの実装を提供しています。CountEvictorはカウントに基づいてデータを削除し、DeltaEvictorは値の差に基づいて削除し、TimeEvictorは時間に基づいて削除します。それぞれの実装は、ウィンドウ関数の前または後に柔軟に設定できます。例えば、DeltaEvictorの場合:java
// デフォルト設定: ウィンドウ関数の前に削除
DeltaEvictor.of(threshold, deltaFunction)
// 明示的にウィンドウ関数の前に削除を指定
DeltaEvictor.of(threshold, deltaFunction, false)
// ウィンドウ関数の後に削除を設定
DeltaEvictor.of(threshold, deltaFunction, true)
第三パラメーターのdoEvictAfter
は削除タイミングを制御します。false(デフォルト)の場合、ウィンドウ関数の処理前に削除が行われます。trueの場合、処理後に削除が行われます。CountEvictorとTimeEvictorも同様の設定オプションを提供します。
このアップデートは何をもたらすのか?
このアップデートはFlinkユーザーに具体的な利点をもたらします:
まず、処理ロジックがより柔軟になります。固定ルールから解放された司書のように、実際のニーズに基づいてデータを処理する方法を決定できます。
次に、機能がより強力になります。ウィンドウ関数の前後で削除を可能にすることで、より複雑なデータ処理ロジックを実装できます。処理前には明らかに不要なデータを削除して処理時間を節約でき、処理後には結果に基づいてさらにフィルタリングできます。
「Window Evictor」と「ウィンドウ関数のフィルタリング」の違いは?
「Window Evictor」の仕組みを理解した後、次のような疑問が浮かぶかもしれません:なぜウィンドウ関数内でフィルタリングロジックを記述しないのか?なぜ「Window Evictor」を使うのか?
図書館の例を使って違いを説明しましょう:
本の整理プロセスを管理していると想像してください。「Window Evictor」を使用することは、整理中に適さない本を実際に削除する(おそらく修理エリアに移動したり、流通から取り除いたりする)ことに似ています。一方、ウィンドウ関数でのフィルタリングは、これらの本に付箋を貼ることに似ています。本は元の位置に留まりますが、最終的な統計には含まれません。
1. 異なる処理レベル
「Window Evictor」はウィンドウ管理レベルで動作し、どのデータがウィンドウに残るべきかを決定します。整理前に損傷した本を取り除く司書のようなもので、これらの本は実際に棚から除去されます。
ウィンドウ関数でのフィルタリングはビジネス処理中に発生し、付箋
DeltaEvictorsの動作変更と実用例
DeltaEvictorsの動作が変更されました。これまでは最初の一致で停止していましたが、現在ではすべての要素をチェックするようになりました。互換性のために、すべてのEvictorはデフォルトでウィンドウ関数の前に実行されます。
ケーススタディ
では、2つの具体的なケースを通じてこれらの新機能をどのように適用できるかを見てみましょう。
ケース1: リアルタイムデータ品質管理
スマートファクトリーで温度監視システムを運用していると仮定します。各センサーは毎秒温度データを送信しますが、データの品質はさまざまな理由(センサーの故障、ネットワークのジッターなど)により異なる可能性があります。このシナリオでは、改良されたWindow Evictorを次のように使用できます:java
DataStream sensorData = ...
sensorData
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.evictor(DeltaEvictor.of(
5.0, // 温度差のしきい値
(reading) -> reading.getTemperature(),
true // ウィンドウ関数の後にEvictionを実行
))
.process(new ProcessWindowFunction<...>() {
@Override
public void process(
String sensorId,
Context context,
Iterable readings,
Collector out) {
// 平均温度と標準偏差を計算
double avgTemp = calculateAverage(readings);
double stdDev = calculateStandardDeviation(readings, avgTemp);
// 結果を出力し、その後DeltaEvictorが大きな偏差を持つデータを削除
out.collect(new ProcessedReading(sensorId, avgTemp, stdDev));
}
});
このケースでは、まずウィンドウ内のすべての温度データの平均と標準偏差を計算し、その後ウィンドウ関数の実行後にDeltaEvictorを使用して5度以上逸脱する異常値を削除します。利点としては、全体のデータ分布を理解した後にどのデータが異常であるかをより正確に判断できることです。
ケース2: 金融取引の異常検知
金融分野からの例を見てみましょう。株式取引データを監視し、潜在的な異常取引を特定する必要があるとします。ここでは、事前および事後のEvictionの両方を使用できます:java
DataStream trades = ...
trades
.keyBy(StockTrade::getSymbol)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.evictor(new CustomTradeEvictor() {
@Override
public void evictBefore(
Iterable> elements,
int size,
Window window,
EvictorContext ctx) {
// 処理前に明確な異常(例えば、負の価格やゼロボリューム)を削除
Iterator<TimestampedValue<StockTrade>> iterator = elements.iterator();
while (iterator.hasNext()) {
StockTrade trade = iterator.next().getValue();
if (trade.getPrice() <= 0 || trade.getVolume() == 0) {
iterator.remove();
}
}
}
@Override
public void evictAfter(
Iterable<TimestampedValue<StockTrade>> elements,
int size,
Window window,
EvictorContext ctx) {
// 統計計算後、潜在的な疑わしい取引を削除
// 例:3標準偏差を超える価格変動
double avgPrice = getWindowAvgPrice(ctx);
double stdDev = getWindowPriceStdDev(ctx);
double threshold = 3 * stdDev;
Iterator<TimestampedValue<StockTrade>> iterator = elements.iterator();
while (iterator.hasNext()) {
StockTrade trade = iterator.next().getValue();
if (Math.abs(trade.getPrice() - avgPrice) > threshold) {
iterator.remove();
// これらの疑わしい取引はさらに分析するためにログに記録
logSuspiciousTrade(trade);
}
}
}
})
.process(new TradingAnalysisFunction());
このケースでは、2段階で動作するカスタム取引Evictorを作成しました:
- ウィンドウ関数の実行前には、明らかに無効な取引記録(負の価格やゼロボリュームなど)を削除します。
- ウィンドウ関数の実行後には、計算された統計(平均価格と標準偏差)に基づいて潜在的な異常取引を特定し削除します。
これら削除された取引は最終的な分析結果には影響を与えませんが、リスク管理チームによるさらなる分析のために別途ログに記録されます。
これらの2つのケースは、改良されたWindow Evictorが現実のシナリオでどのように機能するかを示しています。より精密なデータ品質管理を実現できるだけでなく、より複雑なビジネスロジックもサポートします。
まとめ
FLIP-4はFlinkのWindow Evictorをよりスマートにしました。現代の図書館管理システムのように、固定されたルールに縛られず、実際のニーズに基づいてデータを柔軟に処理できます。任意の位置からデータを削除し、処理の前後で操作できるようになったことで、データ処理の柔軟性と効率が大幅に向上しました。この改善は単純に見えるかもしれませんが、多くの新しい可能性を開きます。司書に優れたツールを提供して読者により良いサービスを提供できるように、アップグレードされたWindow Evictorは私たちがより精密で効率的なデータ処理を達成するのに役立ちます。この柔軟性は特にリアルタイムデータ処理のようなシナリオで重要です。