前回の記事では、Flinkの「ステートフルなストリーム処理」の優位性について解説しました。今回は、実際に私がブロックチェーンの異常検知やリアルタイム分析で活用している、具体的な実装コードを紹介します。
Flinkには複数の記述方法がありますが、実務で主流の**「Flink SQL」と、AIエンジニアに馴染み深い「PyFlink」**の2パターンを見ていきましょう。
1. Flink SQL:宣言的に「異常」を定義する
Flink SQLの最大の利点は、複雑な時系列の集計(Window処理)を、標準的なSQLに近い形で簡潔に記述できる点です。
シナリオ:同一アドレスからの急激な送金回数の増加を検知
例えば、「過去1分間に10回以上の送金があったアドレス」を抽出する場合、以下のようなクエリになります。
-- 1. 送金データが流れてくるソーステーブルを定義
CREATE TABLE transactions (
sender_address STRING,
receiver_address STRING,
amount DOUBLE,
ts TIMESTAMP(3),
-- イベント時間としてtsカラムを指定し、3秒の遅延(Watermark)を許容
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ethereum-tx',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 2. ホッピングウィンドウ(10秒ごとに更新される過去1分間の統計)で集計
SELECT
sender_address,
COUNT(*) as tx_count,
HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE) as window_start
FROM transactions
GROUP BY
sender_address,
HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE)
HAVING COUNT(*) > 10; -- 閾値を超えたら異常として出力
ポイント:
- WATERMARK: ネットワーク遅延で順番が前後したデータを、数学的にどこまで待つかを定義しています。
- HOP: スライディングウィンドウをSQLで直感的に表現しています。
2. PyFlink:機械学習モデルをストリームに組み込む
SQLでは難しい「独自のアルゴリズム」や「学習済みモデル(ONNX等)の推論」を行いたい場合は、Python APIであるPyFlinkが真価を発揮します。
シナリオ:取引額の移動平均からの乖離を判定する
以下は、ユーザーごとに「直近の取引傾向」をメモリ(State)に保持し、そこから大きく外れた取引を検知するコードのスケルトンです。
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.udf import ScalarFunction, udf
# 1. 実行環境のセットアップ
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings=settings)
# 2. 独自の異常検知ロジック(UDF: User Defined Function)を定義
class AnomalyDetector(ScalarFunction):
def eval(self, amount, avg_amount):
# 単純な閾値判定の例(実際にはここにMLモデルの推論等を入れる)
if amount > avg_amount * 3:
return "ANOMALY"
return "NORMAL"
anomaly_detect = udf(AnomalyDetector(), result_type='STRING')
# 3. データソースの定義(SQL DDLを使用可能)
t_env.execute_sql("""
CREATE TABLE source_table (...) WITH (...)
""")
# 4. Table APIを用いた処理パイプラインの記述
source_table = t_env.from_path("source_table")
result_table = source_table.window(
Over.partition_by(col("sender_address"))
.order_by(col("ts"))
.preceding(row_interval(10)) # 直近10件を対象
.as_("w")
).select(
col("sender_address"),
col("amount"),
col("amount").avg.over(col("w")).as_("avg_amount")
).select(
col("sender_address"),
anomaly_detect(col("amount"), col("avg_amount")) # 異常判定UDFの適用
)
# 5. 結果の出力
result_table.execute_insert("sink_table").wait()
ポイント:
- Over Window: ユーザーごとの「過去N件」という統計量を、外部DBにアクセスせずメモリ内で計算しています。
- ScalarFunction: Pythonのライブラリ(NumPyやScikit-learn)をストリームの中でそのまま活用できる拡張性があります。
3. 実装上の「落とし穴」と投資家エンジニアの対策
実務でこれらのコードを動かす際、私は以下の2点に最も注意を払います。
-
Stateの肥大化(TTLの設定):
ユーザーごとの状態を際限なく保存すると、メモリ(VRAM/RAM)を食いつぶします。table.exec.state.ttlを設定し、一定期間アクセスのない状態は自動で破棄(ガベージコレクション)させるのが鉄則です。 -
Backpressure(背圧)の監視:
異常検知ロジックが重すぎると、Kafkaからのデータ流入速度に追いつけなくなります。Flinkのダッシュボードで各オペレーターのbackpressure指標を監視し、必要に応じて並列度(Parallelism)を調整します。
結論:どちらを選ぶべきか?
-
Flink SQLを選ぶべき時:
-
統計的な集計(平均、カウント、最大最小)がメインの時。
-
開発スピードを優先し、データ分析官とロジックを共有したい時。
-
PyFlinkを選ぶべき時:
-
学習済みモデル(ML)をリアルタイム推論させたい時。
-
文字列解析や複雑な条件分岐など、SQLでは記述が困難なロジックを含む時。
「どの技術が優れているか」ではなく、「ビジネスの要求(精度・速度・保守性)に対して、どの実装が最もROIが高いか」。この視点を持つことが、プロフェッショナルなAIエンジニアへの第一歩です。