0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Flink FLIP-12:非同期I/O

Last updated at Posted at 2025-06-13

本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。

テクニカルインサイトシリーズ Perry Ma | アリババクラウド Apache Flinkリアルタイムコンピューティング プロダクトリードによる解説

はじめに

レストランで注文する場面を想像してみてください。従来のアプローチでは、注文を受けた後、ウェイターはキッチンが料理を準備する間ずっとテーブルのそばに立ち、他の顧客への対応を待機します。これは明らかに非効率的です。なぜなら、ウェイターはその待ち時間中に他の顧客に対応できるからです。このFLIP(Flink Improvement Proposal)は、マルチスレッドのウェイターのようにFlinkを動作させる方法を教えるものです!

なぜAsync I/Oが必要なのか?

ストリーム処理においては、外部システムにアクセスしてデータストリームを強化することが一般的です。例えば:

  • データベースを照会してユーザー情報を取得する
  • 外部APIを呼び出して商品詳細を取得する
  • 第三者サービスにデータ検証をリクエストする

従来の同期型アクセスには大きな問題があります:外部システムからのレスポンスを待っている間、全体の処理が遅くなるのです。まるで、ウェイターがある料理が完成するまで待たなければならず、他の顧客にサービスを提供できない状態です。非常に非効率的ですね。

_2025_06_06_15_59_56

上記の図は、同期処理と非同期処理の違いを示しています。同期処理は直列的なプロセスのようなものですが、非同期処理は複数のリクエストを同時に処理でき、効率を大幅に向上させます。

コアデザインソリューション

Async I/Oの設計は、効率的なレストランのサービスシステムを設計するようなものです。これには以下の主要な役割が含まれます:
_2025_06_06_16_00_24

  • サービスデスク (AsyncWaitOperator): 顧客の注文を受ける責任を持つ。
  • ウェイター (AsyncFunction): キッチンに注文を送るが、待機しない。
  • オーダーチケット (AsyncCollector): 各注文のステータスを記録する。
  • オーダーマネージャー (AsyncCollectorBuffer): 全ての注文の処理を調整する。

2つのサービスモード

レストランが異なるサービス方法を選択できるように、Async I/Oは2つのモードを提供します:

モード 特徴 適したシナリオ
順序付き データ到着順に厳密に出力 順次イベント処理など、順序付けられたデータ処理が必要なシナリオ
非順序 処理が完了次第出力 厳密な順序要件がなく、最大スループットを追求するシナリオ

使用例

HBaseデータベースをデータストリーム内でクエリする必要があると仮定しましょう。以下に2つの一般的な使用パターンを示します。

コールバックを使用する場合:java

public class HBaseAsyncFunction implements AsyncFunction {
private transient Connection connection;

@Override
public void asyncInvoke(String key, AsyncCollector<String> collector) {
    Get get = new Get(Bytes.toBytes(key));
    Table table = connection.getTable(TableName.valueOf("test"));
    ((AsyncableHTableInterface) table).asyncGet(get, 
        new MyCallback(collector));
}

}

// データストリームを作成
DataStream stream = AsyncDataStream.unorderedWait(
inputStream,
new HBaseAsyncFunction(),
100, // タイムアウト(ミリ秒)
20 // 最大同時リクエスト数
);

Futureを使用する場合:java

public class HBaseAsyncFunction implements AsyncFunction {
private transient Connection connection;

@Override
public void asyncInvoke(String key, AsyncCollector<String> collector) {
    Get get = new Get(Bytes.toBytes(key));
    Table table = connection.getTable(TableName.valueOf("test"));
    
    ListenableFuture<Result> future = table.asyncGet(get);
    Futures.addCallback(future,
        new FutureCallback<Result>() {
            public void onSuccess(Result result) {
                collector.collect(
                    Collections.singletonList(
                        Bytes.toString(result.getValue(
                            Bytes.toBytes("f"), 
                            Bytes.toBytes("c")
                        ))
                    )
                );
            }
            
            public void onFailure(Throwable t) {
                collector.collect(t);
            }
        }
    );
}

}

重要な考慮事項

  • リソース共有: 複数のタスクが接続(データベース接続など)を共有する必要がある場合は、static修飾子を使用しますが、スレッドセーフ性に注意してください。
  • ステート管理: Async I/O操作のステートは自動的に保存され、ジョブが失敗した際に以前の進捗を復元できます。
  • パフォーマンスチューニング:
    • 同時リクエスト制限を合理的に設定する
    • タイムアウト状況を監視する
    • 実際のニーズに基づいて順序付きまたは非順序モードを選択する

現状

このFLIPはFlink 1.2で実装されました。これにより、特に外部システムへの頻繁なアクセスが必要なシナリオにおいて、Flinkの外部データ処理の効率が大幅に改善されました。現在、この機能は以下のような生産環境で広く使用されています:

  • リアルタイムデータストリームのデータベース連携
  • リアルタイムレコメンデーションのための外部サービス呼び出し
  • データエンリッチメントのためのキャッシュシステムアクセス

まとめ

FLIP-12は、Flinkに効率的なAsync I/O処理能力をもたらしました。これは、レストランにプロフェッショナルな注文システムを導入するようなもので、各ウェイターが複数の注文を効率的に処理できるようになります。同期処理を非同期処理に変換することで、コードの単純さと保守性を維持しつつ、処理効率を大幅に向上させました。この改善により、Flinkは特に外部システムとの頻繁なやりとりが必要な複雑なデータストリームを処理する能力が向上しました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?