説明
このドキュメントはFlinkの公式ドキュメントのTry FinkにあるFraud Detection with the DataStream API をステップバイステップで動作確認する際に理解用に翻訳したものです。
DataStream API を使った不正検知
Apache Flink は、堅牢でステートフルなストリーミング・アプリケーションを構築するための DataStream API を提供します。これはステートと時間をきめ細かく制御でき、先進的なイベント駆動システムの実装を可能にします。このステップバイステップのガイドでは、Flink の DataStream API を使ってステートフルなストリーミング・アプリケーションを構築する方法を学びます。 
何を作る?
デジタル時代において、クレジットカード詐欺は増大する懸念事項です。犯罪者は、詐欺やセキュアでないシステムへの侵入によってカード番号を盗みます。盗んだ番号は、1ドル以下などの小額決済を1回以上行ってテストされます。うまくいけば、その後、転売可能な品物を得るため、あるいは自分で使うために、より高額の購入を行います。
このチュートリアルでは、怪しいクレジットカード取引に対してアラートを出す不正検知システムを作ります。単純なルール集合を用いながら、Flink がどのように高度なビジネスロジックを実装し、リアルタイムに動作できるかを見ていきます。 
前提条件
このウォークスルーは、あなたが Java にある程度なじみがあると仮定しますが、別のプログラミング言語の出身であっても、問題なく読み進められるはずです。 
IDE での実行
IDEでプロジェクトを実行すると、java.lang.NoClassDefFoundError が発生することがあります。これは、おそらく必要な Flink 依存関係がクラスパスに暗黙的に読み込まれていないためです。
• IntelliJ IDEA: Run > Edit Configurations > Modify options > include dependencies with "Provided" scope を選択します。この実行構成により、IDE内からアプリケーションを実行するのに必要なクラスがすべて含まれるようになります。 
助けて、詰まりました!
行き詰まったら、コミュニティのサポートリソースを参照してください。とりわけ Apache Flink のユーザメーリングリストは、Apache プロジェクトの中でも一貫して最も活発なものの一つに数えられており、素早く助けを得るのに最適です。 
ついて来る方法
一緒に手を動かす場合、次が必要です:
• Java 11
• Maven
用意された Flink Maven Archetype を使うと、必要な依存関係をすべて備えたスケルトン・プロジェクトを素早く作成できるため、ビジネスロジックの実装に集中できます。含まれる依存には、すべてのFlinkストリーミング・アプリの中核である flink-streaming-java と、このウォークスルーに特化したデータジェネレータや補助クラスを含む flink-walkthrough-common があります。 
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=2.2-SNAPSHOT \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
Maven 3.0 以降では、コマンドラインから -DarchetypeCatalog でリポジトリを指定することはできません。この変更の詳細は【Maven の公式ドキュメント】を参照してください。スナップショット・リポジトリ を使いたい場合は、settings.xml にリポジトリエントリを追加する必要があります(例):
<settings>
<activeProfiles>
<activeProfile>apache</activeProfile>
</activeProfiles>
<profiles>
<profile>
<id>apache</id>
<repositories>
<repository>
<id>apache-snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
</profile>
</profiles>
</settings>
``` [oai_citation:8‡Apache Nightlies](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/datastream/)
groupId、artifactId、package は好みに応じて変更可能です。上記のパラメータでは、Maven はfrauddetection というフォルダを作成し、このチュートリアルを完了するために必要な依存関係をすべて含むプロジェクトを配置します。エディタにインポート後、FraudDetectionJob.java というファイルに以下のコードがあり、IDE 内からそのまま実行できます。データストリームの至るところにブレークポイントを設定し、DEBUGモードでコードを実行して、仕組みへの理解を深めてください。 
必要であれば、groupId、artifactId、package は自由に編集できます。上記のパラメータを使うと、Maven はfrauddetection という名前のフォルダを作成し、このチュートリアルを完了するために必要な依存関係をすべて含んだプロジェクトを配置します。エディタにインポートした後、FraudDetectionJob.java というファイルに以下のコードがあり、IDE の中からそのまま実行できます。データストリームのあちこちにブレークポイントを設定し、DEBUG モードで実行して、仕組みがどのように動くか体感してみてください。 
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}

package spendreport;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}

コードの分解(Breaking Down the Code)
ここでは、2つのファイルのコードを段階的に見ていきます。FraudDetectionJob クラスはアプリケーションのデータフローを定義し、FraudDetector クラスは不正なトランザクションを検知する関数のビジネスロジックを定義します。まずは FraudDetectionJob クラスの main メソッドで、ジョブがどのように組み立てられているかを説明します。 
実行環境(The Execution Environment)
最初の行では、StreamExecutionEnvironment をセットアップします。実行環境は、ジョブのプロパティ設定、ソースの作成、そして最終的にジョブの実行トリガに用います。 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ソースの作成(Creating a Source)
ソースは、Apache Kafka、Rabbit MQ、Apache Pulsar などの外部システムから Flink ジョブにデータを取り込みます。このウォークスルーでは、処理対象として無限のクレジットカード取引ストリームを生成するソースを使います。各トランザクションは、アカウント ID(accountId)、そのトランザクションが発生した時刻のタイムスタンプ(timestamp)、そして米ドル建ての金額(amount)を含みます。ソースに付けた name はデバッグ用途で、問題が起きたときにエラーがどこで発生したかを特定しやすくするためのものです。 
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
イベントのパーティション分割と不正検知(Partitioning Events & Detecting Fraud)
transactions ストリームには多数のユーザからの大量のトランザクションが含まれるため、複数の不正検知タスクで並列に処理する必要があります。不正はアカウント単位で発生するので、同じアカウントのすべてのトランザクションが、不正検知オペレータの同一の並列タスクで処理されるようにしなければなりません。 
特定のキーに対して同じ物理タスクがすべてのレコードを処理することを保証するには、DataStream#keyBy でストリームをパーティション分割します。process() 呼び出しは、ストリーム内の各パーティション済み要素に関数を適用するオペレータを追加します。一般に、keyBy 直後のオペレータ(ここではFraudDetector)はキー付きコンテキスト内で実行されると言います。 
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
結果の出力(Outputting Results)
シンクは DataStream を外部システム(Apache Kafka、Cassandra、AWS Kinesis など)へ書き出します。ここでの AlertSink は永続ストレージへ書く代わりに、各 Alert レコードを INFO レベルのログとして出力します。これにより、結果を簡単に確認できます。 
alerts.addSink(new AlertSink());
不正検知器(The Fraud Detector)
不正検知器は KeyedProcessFunction として実装されています。そのメソッド KeyedProcessFunction#processElement は、受け取るすべてのトランザクションイベントに対して呼び出されます。この最初のバージョンは、(保守的すぎると言う人もいるかもしれませんが)すべてのトランザクションでアラートを出す動作をします。以降のステップでは、より意味のあるビジネスロジックへと不正検知器を拡張していきます。 
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}

Writing a Real Application (v1)
最初のバージョンでは、不正検知器は「小額の取引の直後に大額の取引」があったアカウントに対してアラートを出すべきです。ここで、小額とは $1.00 未満、大額とは $500 超 を指します。あなたの不正検知器が、特定のアカウントに対する以下のトランザクションのストリームを処理しているところを想像してください。
トランザクション 3 と 4 は、不正としてマークされるべきです。なぜなら $0.09 という小額の取引の後に $510 という大額の取引が続いているからです。これに対して、トランザクション 7、8、9 は不正ではありません。$0.02 という小額の直後に大額が来ているわけではなく、その間にパターンを崩す中間のトランザクションがあるためです。 
これを行うには、不正検知器はイベントをまたいで情報を記憶しなければなりません。すなわち、「大額の取引」が不正となるのは、直前の取引が小額だった場合のみです。イベントをまたいで情報を記憶するにはstateが必要であり、これが私たちがKeyedProcessFunctionを使うことにした理由です。これは 状態と時間 の両方をきめ細かく制御でき、このウォークスルー全体を通じて、より複雑な要件に応じてアルゴリズムを進化させることが可能になります。 
最もまっすぐな実装は、小額の取引を処理したときに設定されるブールのフラグです。大額の取引が流れてきたら、そのアカウントに対してフラグが設定されているかどうかを単に確認できます。 
しかし、FraudDetector クラスのメンバ変数としてフラグを実装するだけでは機能しません。Flinkは、同じFraudDetector のオブジェクトインスタンスで複数アカウントのトランザクションを処理することがあります。つまり、アカウント A と B が同じ FraudDetector のインスタンスを通るようにルーティングされる場合、アカウントAのトランザクションがフラグを true に設定し、その後アカウントBのトランザクションが誤ったアラートを引き起こす可能性があります。 
もちろん、個々のキーに対するフラグを追跡するために Mapのようなデータ構造を使うことも可能ですが、単なるメンバ変数ではフォールトトレラントではありませんし、障害発生時にはその情報がすべて失われます。したがって、アプリケーションが障害からの復旧のために再起動しなければならない場合、不正検知器はアラートを見落とす可能性があります。 
これらの課題に対処するために、Flink は、通常のメンバ変数とほとんど同じくらい簡単に使えるフォールトトレラントな状態のためのプリミティブを提供します。Flink における状態の最も基本的な型はValueStateで、ラップする任意の変数にフォールトトレランスを付与するデータ型です。ValueState はキー付き状態の一形式であり、DataStream#keyBy の直後に適用されるオペレータのように、キー付きコンテキストで実行されるオペレータでのみ利用可能です。オペレータのキー付き状態は、現在処理しているレコードのキーのスコープに自動的に限定されます。 
この例では、keyBy() によって宣言されたとおり、キーは現在のトランザクションのアカウントIDであり、FraudDetector はアカウントごとに独立した状態を保持します。ValueState は、Flink がその変数をどのように管理すべきかに関するメタデータを含む ValueStateDescriptor を使って作成されます。状態は、関数がデータの処理を開始する前に登録されるべきです。これに適したフックが open() メソッドです。
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private transient ValueState<Boolean> flagState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
}
}
ValueState は、Java 標準ライブラリの AtomicReference や AtomicLong に似たラッパークラスです。その内容とやり取りするための3つのメソッドを提供します。update は状態を設定し、value は現在値を取得し、clear はその内容を削除します。特定のキーに対する状態が空の場合(アプリケーションの開始直後や ValueState#clear を呼び出した後など)、ValueState#value は null を返します。 
ValueState#value が返すオブジェクトに対する変更は、システムに認識される保証がありません。したがって、すべての変更は ValueState#update を使って行わなければなりません。それ以外については、フォールトトレランスは Flink が内部で自動的に管理するため、通常の変数と同じように扱うことができます。以下に、フラグ状態を使って潜在的な不正トランザクションを追跡する方法の例を示します。
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// 現在のキーに対する現在の状態を取得
Boolean lastTransactionWasSmall = flagState.value();
// フラグが立っているか確認
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
// 下流へアラートを出力
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// 状態をクリーンアップ
flagState.clear();
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// フラグを true に設定
flagState.update(true);
}
}
すべてのトランザクションについて、不正検知器はそのアカウントのフラグ状態を確認します。ValueState は常に現在のキー(=アカウント)のスコープであることを思い出してください。フラグが null でなければ、そのアカウントにとって直前のトランザクションは小額であり、したがって今回のトランザクションの金額が大額であれば、検知器は不正アラートを出力します。 
そのチェックの後、フラグ状態は無条件にクリアされます。現在のトランザクションが不正アラートを引き起こした場合はパターンが終わり、引き起こさなかった場合はパターンが崩れており再開が必要だからです。 
最後に、現在のトランザクションの金額が小額かどうかを確認します。小額であれば、次のイベントで確認できるようにフラグを設定します。ValueState はすべての ValueState が nullable であるため、未設定(null)、true、false の3つの状態をとり得ることに注意してください。このジョブでは、フラグが設定されているかどうかを確認する目的で、未設定(null)と true のみを使用します。
Fraud Detector v2: State + Time = ❤️
詐欺師は、テスト取引に気づかれる確率を下げるために、大きな購入を長く待ちません。たとえば、不正検知器に 1分のタイムアウト を設定したいとします。すなわち前の例では、トランザクション 3 と 4 が互いに1分以内に発生した場合にのみ不正と見なす、ということです。Flink の KeyedProcessFunction は、将来のある時点でコールバックメソッドを起動するタイマーを設定できるようにします。 
新しい要件に適合するようにジョブをどう変更できるか見ていきましょう。 
- フラグが true に設定されるたび、1分後のタイマーも設定
- タイマーが発火したら、状態をクリアしてフラグをリセット
- フラグがクリアしたら、そのタイマーはキャンセルされるべき
タイマーをキャンセルするには、それがどの時刻に設定されたかを覚えておく必要があります。記憶には状態が必要であるため、フラグ状態とあわせてタイマー用の状態も作るところから始めます。
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
KeyedProcessFunction#processElement は、タイマーサービスを含む Context とともに呼び出されます。タイマーサービスは、現在時刻の問い合わせ、タイマーの登録、タイマーの削除に使用できます。これを使って、フラグが設定されるたびに1分後のタイマーを設定し、そのタイムスタンプを timerState に保存できます。
if (transaction.getAmount() < SMALL_AMOUNT) {
// フラグを true に設定
flagState.update(true);
// タイマーとその状態を設定
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
処理時間(processing time) は壁時計の時刻であり、オペレータを実行しているマシンのシステムクロックによって決まります。タイマーが発火すると、KeyedProcessFunction#onTimer が呼び出されます。フラグをリセットするためのコールバックは、このメソッドをオーバーライドして実装します。
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// 1分後にフラグを除去
timerState.clear();
flagState.clear();
}
最後に、タイマーをキャンセルするには、登録済みのタイマーを削除し、タイマー状態を削除する必要があります。これをヘルパーメソッドに包み、flagState.clear()の代わりにこのメソッドを呼び出すことができます。
private void cleanUp(Context ctx) throws Exception {
// タイマーを削除
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// すべての状態をクリーンアップ
timerState.clear();
flagState.clear();
}
以上です。完全に機能する、ステートフルで分散なストリーミングアプリケーションができました!
Final Application
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
cleanUp(context);
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
Expected Output
2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
ライセンス表記
- 原文:Fraud Detection with the DataStream API の非公式翻訳。
原文は Apache Software Foundation により提供されています。
