0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

杭州銀行における動的Flink CEPの適用

Last updated at Posted at 2025-03-12

本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。

抽象:

この記事は、杭州銀行のビッグデータエンジニアである唐占峰と欧阳武林によって執筆されました。この記事では、動的Flink CEP(Complex Event Processing)とは何か、その主要な概念、ユースケース、技術原理、および使用方法について説明します。記事は5つのセクションに分かれています。

  1. 動的Flink CEPとは
  2. 金融業界でのユースケース
  3. 基盤技術
  4. 動的Flink CEPの使用方法
  5. 杭州銀行における動的Flink CEPの適用

金融業界では、ビッグデータ技術が成熟段階に入っています。リアルタイムトランザクション監視や分析、不正検出(マネーロンダリングなど)、および規制遵守において、データの鮮度は不可欠です。急速に進化するビジネス環境では、静的なルールはサービスの再起動を必要とするため、対応が遅れ、サービスが中断されます。この課題に対処するために、杭州銀行は動的Flink CEPを活用して、サービス停止なしでルールを更新し、急速に変化するビジネス要件に効果的に適応しました。FlinkCEPは、Flink上に実装された複雑イベント処理(CEP)ライブラリです。これは、データストリーム内のイベントパターンを検出し、一致したイベントに基づいてアクションを起こすために使用されます。動的Flink CEPは、FlinkCEPの機能を拡張し、リアルタイムでのルール更新をサポートすることで、システムの柔軟性と反応速度を向上させ、運用オーバーヘッドと複雑さを大幅に削減します。

動的Flink CEPとは?

1. 定義と主要な概念

動的Flink CEPは、Apache Flinkの高度なストリーム処理機能です。これにより、DataStreamプログラム内でリアルタイムにルールを変更し、最新のルールを迅速に適用して、データストリームをキャプチャ、クリーンアップ、分析できます。動的Flink CEPは、ユーザーが重要なデータパターンをリアルタイムで認識することを支援します。

主要な概念

パターン: パターンはルールとその定義方法を指します。パターンにはシングルトンパターンとループパターンがあります。シングルトンパターンは単一のイベントを受け入れる一方で、ループパターンは複数のイベントを受け入れることができます。複数のシンプルなパターンが組み合わさって複雑なパターンシーケンスとなり、これをPatternProcessorと呼びます。

イベントストリーム: イベントストリームは、Kafkaやデータベース(例:リアルタイムトランザクションデータストリーム)などの異種の上流システムから来る可能性があります。動的Flink CEPジョブが開始されると、Flinkは入力イベントストリーム内で定義されたPatternProcessorを検出し、処理結果を生成しようとします。

動的マッチング: 動的Flink CEPは、イベントストリームの変化をリアルタイムで識別し、継続的に下流の演算子に送信します。下流の演算子は受信したイベントを解析および逆シリアル化し、実際に使用するPatternProcessorを生成し、その後、最新のPatternProcessorで定義された更新されたパターンに従って一致するパターンを見つけます。

2. 動的Flink CEPの台頭

Flink CEPは、事前定義されたパターンに一致するイベントを検出するために使用されます。しかし、取引や簿記など、急速に進化するビジネスシーンでは、パターンの修正または追加が必要になる場合があります。例えば、リスクのある資金移動の典型的なしきい値は1分間に3回ですが、特別な状況では適切なしきい値はもっと高く設定されるべきです。残念ながら、従来のFlink CEPは動的なルール変更をサポートしていません。更新されたルールを実装するには、Javaコードを書き直し、Flink CEPジョブを再起動する必要があります。銀行のリスク管理のような遅延に敏感なビジネスでは、コードの再開発、パッケージ化、再デプロイを許容できません。さらに、Flink CEPジョブは通常複数のパターンを含むため、1つのパターンを更新すると他のユーザーに影響を与える大規模なメンテナンスが必要となり、運用上の課題が生じます。動的Flink CEPはこれらの課題に対応し、ダウンタイムなしでパターンの更新をサポートします。以下はその特長です:

動的なルール更新: 従来、パターンを更新するにはFlink CEPジョブを再デプロイして再起動する必要があり、これによりサービスが停止し、システムの即時性と可用性に影響を与えます。動的Flink CEPは、Flink CEPジョブを再起動することなくリアルタイムでのパターン更新をサポートします。

複数のルールのサポート: 従来のFlink CEPは、複数のパターンに対して複数のcep演算子を作成します。これにより余分なデータコピーが発生し、データ処理のオーバーヘッドが増加します。動的Flink CEPは、1つのcep演算子に複数のパターンを持つことをサポートし、データコピーを削減し、処理効率を向上させます。

条件のパラメータ化のサポート: 動的Flink CEPは、JSON形式のパターン記述で条件をパラメータ化することを許可します。これにより、カスタム条件の拡張性が向上し、ユーザーは新しい条件クラスを動的に追加できます。

金融業界でのユースケース

動的Flink CEPは、インテリジェントな監視システムの構築、オンラインリスク検出(マネーロンダリングや詐欺など)のサポート、リアルタイムマーケティングによるビジネス成長の促進に役立ちます。動的Flink CEPの金融業界でのユースケースは以下の通りです。

1. 反マネーロンダリング

動的Flink CEPは、銀行口座のトランザクション活動を監視し、潜在的なマネーロンダリング活動を特定するのに役立ちます。例えば、短時間内に頻繁な入金や出金を検出するパターンを追加したり、マネーロンダリングスキームに関与している口座間の資金フローを追跡することができます。さらに、動的Flink CEPはビッグデータや機械学習技術と統合され、リスク監視モデルを構築し、不審なトランザクションの検出精度を向上させ、マネーロンダリングに関連する可能性のある顧客を特定します。また、動的Flink CEPはエンドツーエンドのトランザクション処理にも役立ちます。ナレッジグラフやリアルタイムインテリジェンスと組み合わせることで、銀行の顧客関係グラフを構築し、トランザクションデータ内の不審なパターンを統合して資金フローの全体像を提供できます。

2. 反詐欺

通信詐欺が蔓延している国々では、金融における有効な反詐欺システムは資金の動きを止めることが可能であり、被害者の財政損失を大幅に減少させます。このようなシステムは分散型で、リアルタイムかつ柔軟なルール更新や複雑なルールマッチングをサポートする必要があります。動的Flink CEPは理想的なソリューションを提供します。分散ストリーム処理エンジンであるApache Flink上に構築されており、複雑なパターンのマッチングやダウンタイムなしでのパターン変更の機能を提供します。

3. リアルタイムマーケティング

クレジットカードの申請時には、通常、基本情報の入力や身元確認など複数のステップを経ます。ユーザーはさまざまな理由で途中でプロセスを終了したり、失敗やタイムアウトを経験することがあります。ユーザの行動ログを入力として利用することで、動的Flink CEPジョブはデータ内のさまざまなパターンを識別し、計算を行い、出力を生成できます。銀行はその洞察を使用して、クーポンの発

PatternProcessorjava

public interface PatternProcessor extends Serializable, Versioned {
String getId();

default Long getTimestamp() {
    return Long.MIN_VALUE;
}

Pattern<IN, ?> getPattern(ClassLoader classLoader);

PatternProcessFunction<IN, ?> getPatternProcessFunction();

}

PatternProcessorインターフェースは、動的なFlink CEPにおけるルールを定義するために使用されます。その実装クラスには、イベントのマッチング方法を記述する特定のパターンと、マッチしたイベントを処理する方法を指定するPatternProcessFunctionが含まれています。さらに、idやオプションのバージョンなどの識別プロパティも含まれます。java
@PublicEvolving
public class DefaultPatternProcessor implements PatternProcessor {

/** パターンプロセッサのID */
private final String id;

/** パターンプロセッサのバージョン */
private final Integer version;

/** パターンプロセッサのパターン */
private final String patternStr;

private final @Nullable PatternProcessFunction<T, ?> patternProcessFunction;

public DefaultPatternProcessor(
        final String id,
        final Integer version,
        final String pattern,
        final @Nullable PatternProcessFunction<T, ?> patternProcessFunction,
        final ClassLoader userCodeClassLoader) {
    this.id = checkNotNull(id);
    this.version = checkNotNull(version);
    this.patternStr = checkNotNull(pattern);
    this.patternProcessFunction = patternProcessFunction;
}

@Override
public String toString() {
    return DefaultPatternProcessor{
            + id=
            + id
            + \
            + , version=
            + version
            + , pattern=
            + patternStr
            + , patternProcessFunction=
            + patternProcessFunction
            + };
}

@Override
public String getId() {
    return id;
}

@Override
public int getVersion() {
    return version;
}

@Override
public Pattern<T, ?> getPattern(ClassLoader classLoader) {
    try {
        return (Pattern<T, ?>) CepJsonUtils.convertJSONStringToPattern(patternStr, classLoader);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

@Override
public PatternProcessFunction<T, ?> getPatternProcessFunction() {
    return patternProcessFunction;
}

}

DefaultPatternProcessorクラスは、PatternProcessorのデフォルト実装として機能します。このクラスは、idversion、パターン文字列、PatternProcessFunction、およびClassLoaderなどのパラメータを受け取ります。checkNotNullを使用して、patternProcessFunction以外のすべてのパラメータが提供されていることを確認します。getPattern()メソッドでは、convertJSONStringToPattern()メソッドを使用してJSON文字列をFlink CEPで認識可能なパターンに変換します。次のコードスニペットは、指定されたクラスローダーを入力パラメータとして受け取るようconvertJSONStringToPattern()メソッドをオーバーライドしています。java
public static Pattern, ?> convertJSONStringToPattern(
String jsonString, ClassLoader userCodeClassLoader) throws Exception {
if (userCodeClassLoader == null) {
LOG.warn(
"The given userCodeClassLoader is null. Will try to use ContextClassLoader of current thread.");
return convertJSONStringToPattern(jsonString);
}
GraphSpec deserializedGraphSpec = objectMapper.readValue(jsonString, GraphSpec.class);
return deserializedGraphSpec.toPattern(userCodeClassLoader);
}

toPattern()は、PatternProcessorのコアメソッドの1つであり、パターンのシリアライズとデシリアライズを行うためのツールであるGraphSpecクラスに関連しています。toPattern()メソッドは、ノードとエッジのグラフを処理します。ノードは個々のパターンまたは埋め込みのGraphSpecであり、エッジはノード間の関係とデータフローの方向を定義します。このグラフは、データベースに保存されているルールDAGと密接に関連しています。以下はtoPattern()メソッドの実装です。java
public Pattern, ?> toPattern(final ClassLoader classLoader) throws Exception {
// 後で使用するためにノードとエッジのキャッシュを構築
final Map nodeCache = new HashMap<>();
for (NodeSpec node : nodes) {
nodeCache.put(node.getName(), node);
}
final Map edgeCache = new HashMap<>();
for (EdgeSpec edgeSpec : edges) {
edgeCache.put(edgeSpec.getSource(), edgeSpec);
}
String currentNodeName = findBeginPatternName();
Pattern, ?> prevPattern = null;
String prevNodeName = null;
while (currentNodeName != null) {
NodeSpec currentNodeSpec = nodeCache.get(currentNodeName);
EdgeSpec edgeToCurrentNode = edgeCache.get(prevNodeName);
Pattern, ?> currentPattern =
currentNodeSpec.toPattern(
prevPattern,
afterMatchStrategy.toAfterMatchSkipStrategy(),
prevNodeName == null
? ConsumingStrategy.STRICT
: edgeToCurrentNode.getType(),
classLoader);
if (currentNodeSpec instanceof GraphSpec) {
ConsumingStrategy strategy =
prevNodeName == null
? ConsumingStrategy.STRICT
: edgeToCurrentNode.getType();
prevPattern =
buildGroupPattern(
strategy, currentPattern, prevPattern, prevNodeName == null);
} else {
prevPattern = currentPattern;
}
prevNodeName = currentNodeName;
currentNodeName =
edgeCache.get(currentNodeName) == null
? null
: edgeCache.get(currentNodeName).getTarget();
}
// ウィンドウセマンティクスを追加
if (window != null && prevPattern != null) {
prevPattern.within(this.window.getTime(), this.window.getType());
}

return prevPattern;

}

toPattern()メソッドは、GraphSpecクラスにとって重要なメソッドであり、シリアライズされたGraphSpecオブジェクトをパターンにデシリアライズします。内部ロジックは以下の通りです:

ノードとエッジのキャッシュ構築nodeCacheedgeCacheというマップを作成し、それぞれNodeSpecEdgeSpecインスタンスを保持します。これにより、後続のノードまたはエッジの効率的な取得が可能になります。

開始ノードの検索findBeginPatternName()メソッドを使用してcurrentNodeName変数を初期化し、グラフ処理が開始ノードから始まるようにします。

パターンの反復作成
すべてのノードをループで処理します。開始ノードから始め、エッジ情報に基づいてパターンを前方に構築します。各反復では、nodeCacheから現在のノードのNodeSpecを取得し、前のノードから現在のノードへのEdgeSpecedgeCacheから取得します(EdgeSpecが存在する場合)。その後、NodeSpecEdgeSpecを使用して新しいパターンを組み立てたり、現在のパターンを更新したりします。消費戦略は、Pattern.begin()Pattern.next()Pattern.followedBy()、またはPattern.followedByAny()といったパターン結合方法の選択に影響を与えます。次に、次の反復に備えてprevPatternprevNodeNameを更新します。最後に、構築されたパターンオブジェクトを返します。

このセクションでは、PatternProcessorインターフェースの実装方法、そのコアメソッド、およびパターン構築プロセスについて詳しく紹介しました。次のセクションでは、PatternProcessorDiscovererインターフェースとその実装を紹介します。

2. PatternProcessorDiscovererjava

public abstract interface PatternProcessorDiscoverer extends Closeable {
public abstract void discoverPatternProcessorUpdates(PatternProcessorManager paramPatternProcessorManager);
}

PatternProcessorDiscovererインターフェースは、パターンプロセッサの更新を検出します。アリババクラウドが提供する外部ストレージを定期的にスキャンするPeriodicPatternProcessorDiscovererクラスに基づき、JDBCデータベースから最新のルールを取得するJDBCPeriodicPatternProcessorDiscovererクラスを定義しました。java
public class JDBCPeriodicPatternProcessorDiscoverer
extends PeriodicPatternProcessorDiscoverer {

private static final Logger LOG =
        LoggerFactory.getLogger(JDBCPeriodicPatternProcessorDiscoverer.class);

private final String tableName;
private final String userName;
private final String password;
private final String

if (statement == null) {
try {
this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);
this.statement = this.connection.createStatement();
} catch (SQLException e) {
LOG.error(データベースへの接続エラー!, e);
throw e;
}
}
try {
String sql = buildQuerySql();
LOG.info(ステートメント実行SQLは次のとおりです: {}, sql);
resultSet = statement.executeQuery(sql);
Map<String, Tuple4<String, Integer, String, String>> currentPatternProcessors = new ConcurrentHashMap<>();
while (resultSet.next()) {
LOG.debug(check getLatestPatternProcessors start :{}, resultSet.getString(1));
String id = resultSet.getString(id);
if (currentPatternProcessors.containsKey(id)
&& currentPatternProcessors.get(id).f1 >= resultSet.getInt(version)) {
continue;
}
currentPatternProcessors.put(
id,
new Tuple4<>(
requireNonNull(resultSet.getString(id)),
resultSet.getInt(version),
requireNonNull(resultSet.getString(pattern)),
resultSet.getString(function)));
}
if (latestPatternProcessors == null
|| isPatternProcessorUpdated(currentPatternProcessors)) {
LOG.debug(最新のパターンプロセッサのサイズは次のとおりです: {}, currentPatternProcessors.size());
latestPatternProcessors = currentPatternProcessors;
return true;
} else {
return false;
}
} catch (SQLException e) {
LOG.error(
パターンプロセッサの発見に失敗しました。ルール変更を確認できませんでした。接続を再作成します。, e);
try {
statement.close();
connection.close();
connection = DriverManager.getConnection(requireNonNull(this.jdbcUrl), this.userName, this.password);
statement = connection.createStatement();
} catch (SQLException ex) {
LOG.error(パターンプロセッサの発見用データベースへの接続エラー。, ex);
throw new RuntimeException(データベースへの接続を再作成できません。);
}
}
return false;
}
arePatternProcessorsUpdated()メソッドは、データベース内のPatternProcessorが更新されたかどうかを確認します。以下のように動作します。
まず、未処理のPatternProcessorがあるかどうかをinitialPatternProcessorsリストで確認します。見つかった場合、trueを返します。その後、このメソッドはデータベース接続を確立し、buildQuerySql()メソッドを呼び出して、tableNameで指定されたテーブルからすべてのテナントまたは特定のテナントのPatternProcessorを取得するSQLクエリを生成して実行します。SQLが実行された後、結果を処理します。各currentPatternProcessorについて、すでに存在するか、以前のバージョンがあるかどうかを確認します。以前のバージョンが既に存在する場合、currentPatternProcessorは無視されます。そうでない場合、currentPatternProcessorsマップが更新されます。latestPatternProcessorsがnullであるか更新されている場合、currentPatternProcessorsを使用してlatestPatternProcessorsを更新し、更新があったことを示すためにtrueを返します。
@Override
public List<PatternProcessor<T>> getLatestPatternProcessors() throws Exception {
LOG.debug(パターンプロセッサをデフォルトのパターンプロセッサに変換を開始します。);
return latestPatternProcessors.values().stream()
.map(
patternProcessor -> {
try {
String patternStr = patternProcessor.f2;
GraphSpec graphSpec =
CepJsonUtils.convertJSONStringToGraphSpec(patternStr);
LOG.debug(最新のパターンプロセッサは次のとおりです: {},
CepJsonUtils.convertGraphSpecToJSONString(graphSpec));
PatternProcessFunction<T, ?> patternProcessFunction = null;
String id = patternProcessor.f0;
int version = patternProcessor.f1;
if (!StringUtils.isNullOrWhitespaceOnly(patternProcessor.f3)) {
patternProcessFunction =
(PatternProcessFunction<T, ?>)
this.userCodeClassLoader
.loadClass(patternProcessor.f3)
.getConstructor(String.class, int.class, String.class)
.newInstance(id, version, tenant);
}
return new DefaultPatternProcessor<>(
patternProcessor.f0,
patternProcessor.f1,
patternStr,
patternProcessFunction,
this.userCodeClassLoader);
} catch (Exception e) {
LOG.error(
発見者の最新のパターンプロセッサの取得に失敗しました。- , e);
e.printStackTrace();
}
return null;
}).filter(pre -> pre != null).collect(Collectors.toList());
}
getLatestPatternProcessors()メソッドは、データベースから最新のPatternProcessorを取得します。ストリームAPIを使用して、ConcurrentHashMapに格納されたPatternProcessorデータをPatternProcessorオブジェクトのリストに変換します。主な手順は次のとおりです。PatternProcessorのクラス名(patternProcessor.f3)に基づいて、classLoaderによってPatternProcessFunctionがロードされ、インスタンス化されます。クラス名がnullまたは空でない場合、Javaクラスに変換され、getConstructor()メソッドが呼び出されて、PatternProcessorのID、バージョン、およびテナント情報が提供されます。この情報は、パターン文字列、パターンプロセッサ関数、およびclassLoaderを含むDefaultPatternProcessorインスタンスを構築するために使用されます。最後に、Flink CEPで一致するイベントを識別するために使用できる最新のPatternProcessorインスタンスのリストが返されます。
3. PatternProcessorDiscoverer
DynamicCepOperatorCoordinatorクラスは、PatternProcessorDiscovererインターフェースを呼び出して、データベースから最新のシリアライズされたPatternProcessorを取得し、関連するDynamicCEPOpタスクに送信します。

public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);

private final DynamicCepOperatorCoordinatorContext cepCoordinatorContext;
private final PatternProcessorDiscovererFactory discovererFactory;
private final String operatorName;
private boolean started;
private volatile boolean closed;

public DynamicCepOperatorCoordinator(String operatorName, PatternProcessorDiscovererFactory discovererFactory, DynamicCepOperatorCoordinatorContext context) {
this.cepCoordinatorContext = context;
this.discovererFactory = discovererFactory;
this.operatorName = operatorName;
this.started = false;
this.closed = false;
}

@Override
public void start() throws Exception {
Preconditions.checkState(!started, 動的Cepオペレータコーディネーターが起動しました!);
LOG.info( {}:{} のコーディネーターを起動しています, this.getClass().getSimpleName(), operatorName);
cepCoordinatorContext.runInCoordinatorThreadWithFixedRate(()->{
if (discovererFactory instanceof PeriodicPatternProcessorDiscovererFactory) {
try {
PeriodicPatternProcessorDiscoverer patternProcessorDiscoverer =
(PeriodicPatternProcessorDiscoverer) discovererFactory
.createPatternProcessorDiscoverer(cepCoordinatorContext.getUserCodeClassloader());
boolean updated = patternProcessorDiscoverer.arePatternProcessorsUpdated();
if (updated && started) {
Set<Integer> subtasks = cepCoordinatorContext.getSubtasks();
if (!patternProcessorDiscoverer.getLatestPatternProcessors().isEmpty()) {
UpdatePatternProcessorEvent updatePatternProcessorEvent =
new UpdatePatternProcessorEvent(patternProcessorDiscoverer.getLatestPatternProcessors());
subtasks.forEach(subtaskId -> {
cepCoordinatorContext.sendEventToOperator(subtaskId, updatePatternProcessorEvent);
});
}
}
} catch (Exception e) {
LOG.error(コーディネーターの起動に失敗しました, e);
}
}
});
started = true;
}

@Override
public void close() throws Exception {
closed = true;
cepCoordinatorContext.close();
}

@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
LOG.info(オペレーター {} からイベント {} を受信しました。, event, subtask);
}

@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
// cepCoordinatorContext.runInCoordinatorThread(() -> {
LOG.debug(チェックポイント {} のためのオペレーター {} の状態スナップショットを取得しています, operatorName, checkpointId);
try {
resultFuture.complete(Dynamic cep.getBytes(StandardCharsets.UTF_8));
} catch (Throwable e) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
resultFuture.completeExceptionally(
new CompletionException(
String.format(
動的CEP %s のチェックポイントに失敗しました,
operatorName),
e));
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {

}

@Override
public void subtaskReset(int subtask, long checkpointId) {
}

@Override
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
cepCoordinatorContext.subtaskNotReady(subtask);
}

@Override
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
cepCoordinatorContext.subtaskReady(gateway);
}
}
DynamicCepOperatorCoordinatorクラスの中心となるのはstart()メソッドであり、これによりコーディネーターが初期化され、アクティブになります。start()メソッドの動作は以下の通りです。
cepCoordinatorContext.runInCoordinatorThreadWithFixedRateを使用
public static void main(String[] args) throws Exception {
// 実行環境の設定
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// クラスローダーの初期化
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 引数の処理
// FLIP-27に基づく新しいSource APIを使用してKafkaソースを構築
Properties prop =new Properties();
prop.setProperty(security.protocol,SASL_PLAINTEXT);
prop.setProperty(sasl.mechanism,SCRAM-SHA-256);
prop.setProperty(sasl.jaas.config,
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule +
required username=\100670\ password=\000000000;);
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
.setBootstrapServers(123.4.50.105:9292,123.4.60.106:9292,123.4.50.107:9292)
.setTopics(cep_test1).setGroupId(test).setStartingOffsets(OffsetsInitializer.earliest())
.setProperties(prop).setValueOnlyDeserializer((new KafkaJsonDeserializer())).build();
env.setParallelism(1);
DataStream<Event> input = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), source);
// userIdとproductionIdでkeyByを行う
// 注記:同じキーを持つイベントのみが一致するかどうか確認するために処理されます
KeyedStream<Event, Tuple2<String, String>> keyedStream =
input.keyBy(
new KeySelector<Event, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Event value) throws Exception {
return Tuple2.of(value.getName(), value.getName());
}
});
①実行環境を設定します。 ②Kafkaソースを設定し、KeyBy演算子を使用して名前キーによってイベントストリームを分割します。 2. 動的なFlink CEPプログラムを設定します。
long time = 1000;
SingleOutputStreamOperator<String> output = CEP.dynamicPatterns(
keyedStream,
new JDBCPeriodicPatternProcessorDiscovererFactory<>(
jdbc:mysql//123.45.6.789:3306/cep_demo_db,
com.mysql.cj.jdbc.Driver,
rds_demo,
riskcollateral,
riskcollateral,
null,
null,
timer),
TimeBehaviour.ProcessingTime,
TypeInformation.of(new TypeHint<String>()){
}));
output.addSink(new PrintSinkFunction<>().name(cep));
env.excute(CEPDemo);
}
}
3. Flinkプログラムのデプロイと実行
Apache StreamParkは動的Flink CEPプログラムの管理および運用プラットフォームとして使用されます。以下の手順に従ってFlinkジョブを作成します:
①JARパッケージをアップロードします。 ! img
②ジョブを追加します。 ! img
③必要な設定を完了します。 ! img
④アプリケーションを公開して開始します。 ! img
! img
4. ルールの挿入
①CEPルールを保存するためのrds_demoテーブルを作成します。 ! [img](https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5dda13218d8d08755faf17eefddc5f9af58e70b814913bc360a414d3de9277d871abf3af1cbd75249222c7055fbabc23d40d21c8c12239700e997a8710282c42f4d0d146bd8133c0

イベントシーケンスのパターンマッチングとDynamic Flink CEPの適用

このパターンは、開始ノードがアクションが0であるイベントに一致し、終了ノードがxxxpackage.dynamic.cep.core.EndConditionクラスによって定義されたイベントに一致するイベントシーケンスを照合するために使用されます:java
public class EndCondition extends SimpleCondition {
@Override
public boolean filter(Event value) throws Exception {
return value.getAction() != 1;
}
}

上記のコードでは、EndConditionはイベントのaction属性が1ではないかどうかを確認します。もしactionが1でない場合、filterメソッドはtrueを返し、そのイベントが終了ノードの条件を満たしていることを示します。要するに、このパターンシーケンスは、最初のイベントのactionが0であり、最後のイベントのactionが1ではないイベントシーケンスに一致します。終了ノードの条件が満たされると、パターンマッチングが終了します。functionフィールドは、DemoPatternProcessFunctionクラスの完全修飾名によって指定されます。このフィールドは、以下のコードスニペットで定義されているように、一致したイベントをどのように処理するかを指定します:java
public class DemoPatternProcessFunction extends PatternProcessFunction {
String id;
int version;
String tenant;

public DemoPatternProcessFunction(String id, int version, String tenant) {
    this.id = id;
    this.version = version;
    this.tenant = tenant;
}

@Override
public void processMatch(
        final Map<String, List<IN>> match, final Context ctx, final Collector<String> out) {
    StringBuilder sb = new StringBuilder();
    sb.append("A match for Pattern of (id, version): (")
            .append(id)
            .append(", ")
            .append(version)
            .append(") is found. The event sequence: ").append("\n");
    for (Map.Entry<String, List<IN>> entry : match.entrySet()) {
        sb.append(entry.getKey()).append(": ").append(entry.getValue().get(0).toString()).append("\n");
    }
    out.collect(sb.toString());
}

}

PatternProcessorが事前に定義されたパターンに一致するイベントシーケンスを検出すると、processMatch()メソッドは一致を説明する文字列を構築します。その後、その文字列はCollectorを通じて下流のオペレーターによって出力されます。

5. イベントストリームの入力

次のイベントシーケンスがFlinkに流入すると仮定します:java
private static void sendEvents(Producer producer, String topic) {
ObjectMapper objectMapper = new ObjectMapper();

Event[] events = {
        new Event("ken", 1, 1, 0, 1662022777000L),
        new Event("ken", 2, 1, 0, 1662022778000L),
        new Event("ken", 3, 1, 1, 1662022779000L),
        new Event("ken", 4, 1, 2, 1662022780000L),
        new Event("ken", 5, 1, 1, 1662022780000L)
};
while (true) {
    try {
        for (Event event : events) {
            String json = objectMapper.writeValueAsString(event);
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    LOG.error("Failed to send data to Kafka: ", exception);
                } else {
                    System.out.println(metadata.topic());
                    LOG.info("Data sent successfully to topic {} at offset {}",
                            metadata.topic(), metadata.offset());
                }
            });
        }
    } catch (Exception e) {
        LOG.error("Error while sending events to Kafka: ", e);
    }
}

}

上記のイベントをKafkaトピックに挿入します。開始ノードは、action属性が0である最初の2つのイベントに一致します。終了ノードは、actionが1ではない4番目のイベントに一致し、これによりパターンマッチングが終了します。5番目のイベントは完了済みのマッチに影響を与えません。

杭州銀行におけるDynamic Flink CEPの適用

杭州銀行は、イベントセンターの行動シーケンスモジュールに動的なFlink CEPを適用しています。イベントセンターは、イベントトラッキングデータを処理・分析して情報に基づいた意思決定を行うためのプラットフォームです。行動シーケンスモジュールは、杭州銀行が独自に最適化したバージョンの動的Flink CEPを使用しています。

img

行動シーケンスを追加します。基本情報を入力し、イベントの有効期限を設定した後、ユーザーはイベントまたはイベントセットを追加できます。

以下は行動シーケンステンプレートを示しています:
img

下図に示すように、イベント1から5は原子イベントであり、追跡されたユーザクリックを表しており、順次Flinkにストリーミングされます。事前定義された有効期限内(この例では20分)に完了したイベントが一致します。例えば、最初の4つのイベントのみが20分以内に完了した場合、イベント1から4が一致します。結果は、ユーザー名やエラーの原因、ユーザーの電話番号などの情報と共にメッセージキュー(Kafka、RocketMQなど)に出力され、ビジネスに関する洞察を提供したり、パーソナライズされた推奨を促進します。
![img](https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5dda13218d8d08755faf17eefddc5f9af58e70b814913bc360a414d3de9277d871abf3af1cbd752496a07a270e41e0fca6fd9f21c0c50d5952de6aad9730632a66a240b58ce84531d0e7993a7b25a8b0cfc653b69905bac42?tmpCode=bbb71cb1-85bc-49c7-89b2-3a6500a6

FlinkCEP – Flink向けの複雑イベント処理

FlinkCEP は、Flink における複雑イベント処理 (Complex Event Processing; CEP) のためのライブラリです。詳細なドキュメントはこちらをご覧ください: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/

また、サンプルデモおよび関連リソースは以下のリンクからアクセスできます: https://github.com/RealtimeCompute/ververica-cep-demohttps://github.com/RealtimeCompute/ververica-cep-demo

注意点:

  • マークダウン形式で出力しました。
  • URLリンクはそのまま保持しています。
  • 専門用語やサービス名(例: FlinkCEP, Flink)は翻訳せず、原文のまま表記しています。
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?