0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

実装編:Apache Flinkで異常検知をどう書くか?(Flink SQL & PyFlink)

0
Posted at

前回の記事では、Flinkの「ステートフルなストリーム処理」の優位性について解説しました。今回は、実際に私がブロックチェーンの異常検知やリアルタイム分析で活用している、具体的な実装コードを紹介します。

Flinkには複数の記述方法がありますが、実務で主流の**「Flink SQL」と、AIエンジニアに馴染み深い「PyFlink」**の2パターンを見ていきましょう。

1. Flink SQL:宣言的に「異常」を定義する

Flink SQLの最大の利点は、複雑な時系列の集計(Window処理)を、標準的なSQLに近い形で簡潔に記述できる点です。

シナリオ:同一アドレスからの急激な送金回数の増加を検知

例えば、「過去1分間に10回以上の送金があったアドレス」を抽出する場合、以下のようなクエリになります。

-- 1. 送金データが流れてくるソーステーブルを定義
CREATE TABLE transactions (
    sender_address STRING,
    receiver_address STRING,
    amount DOUBLE,
    ts TIMESTAMP(3),
    -- イベント時間としてtsカラムを指定し、3秒の遅延(Watermark)を許容
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND 
) WITH (
    'connector' = 'kafka',
    'topic' = 'ethereum-tx',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- 2. ホッピングウィンドウ(10秒ごとに更新される過去1分間の統計)で集計
SELECT 
    sender_address,
    COUNT(*) as tx_count,
    HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE) as window_start
FROM transactions
GROUP BY 
    sender_address, 
    HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE)
HAVING COUNT(*) > 10; -- 閾値を超えたら異常として出力

ポイント:

  • WATERMARK: ネットワーク遅延で順番が前後したデータを、数学的にどこまで待つかを定義しています。
  • HOP: スライディングウィンドウをSQLで直感的に表現しています。

2. PyFlink:機械学習モデルをストリームに組み込む

SQLでは難しい「独自のアルゴリズム」や「学習済みモデル(ONNX等)の推論」を行いたい場合は、Python APIであるPyFlinkが真価を発揮します。

シナリオ:取引額の移動平均からの乖離を判定する

以下は、ユーザーごとに「直近の取引傾向」をメモリ(State)に保持し、そこから大きく外れた取引を検知するコードのスケルトンです。

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.udf import ScalarFunction, udf

# 1. 実行環境のセットアップ
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings=settings)

# 2. 独自の異常検知ロジック(UDF: User Defined Function)を定義
class AnomalyDetector(ScalarFunction):
    def eval(self, amount, avg_amount):
        # 単純な閾値判定の例(実際にはここにMLモデルの推論等を入れる)
        if amount > avg_amount * 3:
            return "ANOMALY"
        return "NORMAL"

anomaly_detect = udf(AnomalyDetector(), result_type='STRING')

# 3. データソースの定義(SQL DDLを使用可能)
t_env.execute_sql("""
    CREATE TABLE source_table (...) WITH (...)
""")

# 4. Table APIを用いた処理パイプラインの記述
source_table = t_env.from_path("source_table")

result_table = source_table.window(
    Over.partition_by(col("sender_address"))
        .order_by(col("ts"))
        .preceding(row_interval(10)) # 直近10件を対象
        .as_("w")
).select(
    col("sender_address"),
    col("amount"),
    col("amount").avg.over(col("w")).as_("avg_amount")
).select(
    col("sender_address"),
    anomaly_detect(col("amount"), col("avg_amount")) # 異常判定UDFの適用
)

# 5. 結果の出力
result_table.execute_insert("sink_table").wait()

ポイント:

  • Over Window: ユーザーごとの「過去N件」という統計量を、外部DBにアクセスせずメモリ内で計算しています。
  • ScalarFunction: Pythonのライブラリ(NumPyやScikit-learn)をストリームの中でそのまま活用できる拡張性があります。

3. 実装上の「落とし穴」と投資家エンジニアの対策

実務でこれらのコードを動かす際、私は以下の2点に最も注意を払います。

  1. Stateの肥大化(TTLの設定):
    ユーザーごとの状態を際限なく保存すると、メモリ(VRAM/RAM)を食いつぶします。table.exec.state.ttl を設定し、一定期間アクセスのない状態は自動で破棄(ガベージコレクション)させるのが鉄則です。
  2. Backpressure(背圧)の監視:
    異常検知ロジックが重すぎると、Kafkaからのデータ流入速度に追いつけなくなります。Flinkのダッシュボードで各オペレーターの backpressure 指標を監視し、必要に応じて並列度(Parallelism)を調整します。

結論:どちらを選ぶべきか?

  • Flink SQLを選ぶべき時:

  • 統計的な集計(平均、カウント、最大最小)がメインの時。

  • 開発スピードを優先し、データ分析官とロジックを共有したい時。

  • PyFlinkを選ぶべき時:

  • 学習済みモデル(ML)をリアルタイム推論させたい時。

  • 文字列解析や複雑な条件分岐など、SQLでは記述が困難なロジックを含む時。

「どの技術が優れているか」ではなく、「ビジネスの要求(精度・速度・保守性)に対して、どの実装が最もROIが高いか」。この視点を持つことが、プロフェッショナルなAIエンジニアへの第一歩です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?