はじめに
最近、AWS ブログで次の記事が出ていました。
Build a dynamic rules engine with Amazon Managed Service for Apache Flink
Amazon Managed Service for Apache Flink を用いて、ストリーミングデータを処理する動的なルールエンジンを作る方法が紹介されています。
ここで "動的" といっているのは、アプリの再ビルド・デプロイをすることなく、データ処理のロジック(ビジネスルール)を切り替えることができるからだそうです。
ビジネスルール自体もストリーミングデータとして受信することで、動的なロジック変更を可能にしています。
今回は、OSS の Flink と Drools を用いて、似たようなことができないか検証したいと思います。
サンプルアプリ紹介
題材
題材としては、過去記事で紹介した火災判定アプリを採用します。
Flink と Drools の組み合わせについては、過去記事もご参照ください。
構成
前回記事では、「センサーで感知した温度が50度を超えているなら、火災と判断する」というルールをもとに、センサーデータを処理していました。
ビジネスルールはDRL(Drools rule Language) ファイルに記載されており、ルールを変えたければ DRL の編集と再ビルドが必要です。
rule "fire detect"
when
$s: SensorData(temperature > 50)
then
FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature());
insert(alarm);
end
今回は、センサーデータに加え「ルール更新に必要なデータ」※も Flink アプリのインプットとして受け取るようにします。
※ 以降、単に「ルールデータ」と呼びます
具体的には、「◯◯度を超えたら火災と判定するか」の境界値を受信し、火災判定ルールを動的に更新できるようにします。
以降は、サンプルアプリのコードを一部抜粋して紹介していきます。
※ 全量はGithubにアップしています
データクラス
センサーデータを保持するSensorData.java
、火災検知時に生成するFireAlarm.java
については、過去記事と同じ構造です。
public class SensorData {
public int id;
// yyyy-MM-ddTHH:mm:ss.SSS
public Date date;
public int temperature;
public class FireAlarm {
public int id;
// yyyy-MM-ddTHH:mm:ss.SSS
public Date alarmDate;
public int temperature;
RuleString.java
は、受信したルールデータから作られるオブジェクトです。
火災判定の境界値である温度情報から、DRL を組み立てて記録しています。
public class RuleString {
private String value;
public RuleString() {
}
public RuleString(int temperature) {
this.value = createRuleStr(temperature);
}
private String createRuleStr(int temperature) {
StringBuilder sb = new StringBuilder();
sb.append("package org.example.rulebased.streaming.flinkdroolsdynamic; \n");
sb.append("import org.example.rulebased.streaming.flinkdroolsdynamic.dto.SensorData; \n");
sb.append("import org.example.rulebased.streaming.flinkdroolsdynamic.dto.FireAlarm; \n");
sb.append("rule \"fire detect\" \n");
sb.append(" when \n");
sb.append(" $s: SensorData(temperature > " + temperature + " ) \n");
sb.append(" then \n");
sb.append(" FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature()); \n");
sb.append(" insert(alarm); \n");
sb.append("end \n");
sb.append("query FindAlarm(String sensorDataId) \n");
sb.append(" $f: FireAlarm(id == sensorDataId) \n");
sb.append("end \n");
return sb.toString();
}
ルール更新処理
ルール更新は、KieBase を動的に生成することで実現します。
public class KieBaseCreater {
public static KieBase createFromRuleString(RuleString ruleString) {
KieServices ks = KieServices.Factory.get();
KieRepository kr = ks.getRepository();
KieFileSystem kfs = ks.newKieFileSystem();
kfs.write("src/main/resources/org/example/rulebased/streaming/flinkdroolsdynamic/FireDetectorRule.drl", ruleString.getValue());
KieBuilder kb = ks.newKieBuilder(kfs);
kb.buildAll();
KieContainer kContainer = ks.newKieContainer(kr.getDefaultReleaseId());
return kContainer.getKieBase();
}
}
KieBaseCreater.java
には、KieFileSystem という Drools の仕組みを用いて、ルールをランタイムで作成するための処理を記載しています。
※ KieFileSystemについては、Drools公式ドキュメント参照
火災判定ロジック
ルールを用いて火災判定を行うクラスとなります。
public class FireDetector extends KeyedBroadcastProcessFunction<Long, SensorData, RuleString, FireAlarm> {
// KieBase Descriptor
MapStateDescriptor<Void, KieBase> kieBaseDesc;
Logger logger = LoggerFactory.getLogger(FireDetector.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// init Descriptor
this.kieBaseDesc = new MapStateDescriptor<>("kieBaseDesc", Types.VOID, Types.GENERIC(KieBase.class));
}
@Override
public void processElement(
SensorData sensorData,
ReadOnlyContext ctx,
Collector<FireAlarm> collector) throws Exception {
// get kiebase from broadcast state
KieBase kieBase = ctx.getBroadcastState(this.kieBaseDesc).get(null);
if (kieBase == null) {
logger.info("KieBase do not exist.");
logger.info("Stopped evaluating sensor data.");
return;
}
// execute rule
var session = kieBase.newKieSession();
session.insert(sensorData);
session.fireAllRules();
logger.info("Rule was fired.");
var queryResult = session.getQueryResults("FindAlarm", sensorData.id);
if (queryResult.size() == 1) {
logger.info("FireAlarm was created.");
logger.info("sensor data:" + sensorData.toString());
FireAlarm fireAlarm = (FireAlarm) queryResult.toList().get(0).get("$f");
collector.collect(fireAlarm);
}
session.dispose();
logger.info("Query was executed.");
}
@Override
public void processBroadcastElement(RuleString ruleString, Context ctx, Collector<FireAlarm> collector) throws Exception {
BroadcastState<Void, KieBase> state = ctx.getBroadcastState(this.kieBaseDesc);
state.put(null, KieBaseCreater.createFromRuleString(ruleString));
}
}
Flinkアプリで使用するルール(KieBase)の情報は、kieBaseDesc
フィールドのなかに保持されます。
processElement
には、センサーデータを受信したときの処理が定義されます。
kieBaseDesc
から KieBase を抜き出し、KieBase内の火災判定ルールをもとにセンサーデータを評価しています。
温度がルール定義の境界値を超過していた場合、FireAlarm オブジェクトを生成します。
processBroadcastElement
は、ルールデータの受信時に KieBase を更新する処理となります。
先に紹介したRuleString.java
・KieBaseCreater.java
の機能を使って、KieBase を動的に生成し、Flink に保持している KieBase を上書きするようになっています。
アプリ内で KieBase を記録するために、Flink の Broadcast State という機能を使用しています。
Broadcast State を使うことで、メインとなるストリーム処理(今回の場合は火災判定)に必要なオブジェクトを保持するとともに、データ受信時に当該オブジェクトを更新することもできます。
※ Flink の Broadcast Stateについては、この公式ガイドがわかりやすかったです。
サンプルのようにKeyedBroadcastProcessFunction
を継承したうえで、必要なメソッドをオーバーライドすれば本機能が使えるようになります。
メインロジック
FireDetectJob.java
には、以下の一連の流れを実装しています。
- Flink の実行環境定義
- データ受信
-
FireDetector.java
によるルールデータ・センサーデータの処理 - 処理結果の標準出力
public class FireDetectJob {
public void run() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define parallelism 2
env.setParallelism(2);
MapStateDescriptor<Void, RuleString> bcStateDescriptor =
new MapStateDescriptor<>("kieBaseDesc", Types.VOID, Types.POJO(RuleString.class));
// Rule Stream
BroadcastStream<RuleString> ruleStream = createRuleStream(env).broadcast(bcStateDescriptor);
// FireAlarm Stream
DataStream<FireAlarm> alarms = createAlarmStream(env, ruleStream);
// Data Sinks
alarms.print();
env.execute();
}
private DataStream<RuleString> createRuleStream(StreamExecutionEnvironment env) {
// Data Sources
DataStream<String> ruleDatas = env.socketTextStream("localhost", 9998);
var rules = ruleDatas
.flatMap(new RuleDataSplitter())
.name("rule-updater");
return rules;
}
private DataStream<FireAlarm> createAlarmStream(StreamExecutionEnvironment env, BroadcastStream<RuleString> ruleStream) {
// Data Sources
DataStream<String> sensorDatas = env.socketTextStream("localhost", 9999);
// DataStream Transformations(Operators)
var alarms = sensorDatas
.flatMap(new SensorDataSplitter())
.keyBy(value -> value.id)
.connect(ruleStream)
.process(new FireDetector())
.name("fire-detector");
return alarms;
}
}
ルールデータ・センサーデータはソケットで受信するようにしています。
(カンマ区切り文字列として受け取ります)
createRuleStream
メソッドは、ルールデータをカンマ区切りで分割し、RuleString
オブジェクトにマッピングしているだけです。
createAlarmStream
メソッドが火災判定を行っている箇所で、こちらも文字列データをSensorData
オブジェクトにマッピングしたうえで、FireDetector.java
に流しています。
ここでruleStream
をconnect
メソッドで渡すことで、ルールデータを受け取ったときに KieBase の更新をすることができます。
Main メソッドは、このJobクラスを呼んでいるのみです。
public class Main {
public static void main(String[] args) throws Exception {
var fireDetectJob = new FireDetectJob();
fireDetectJob.run();
}
}
アプリ実行
では、アプリを実行してみます。
WSL 環境で netstat コマンドを実行し、ルールデータ・センサーデータを待ち受けます。
// ルールデータ用
nc -lk 9998
// センサーデータ用
nc -lk 9999
その後、メイン関数を実行すると、Flink のログがばらばらと表示され待機状態に入ります。
ルールなしの場合
9999 番ポートにセンサーデータを投入しましょう。
// port 9999
1,51,Sensor
この時点では KieBase が作られていないため、デバッグログが出力されるのみで何も処理はされません。
INFO KieBase do not exist.
INFO Stopped evaluating sensor data.
ルールの新規作成
次に、ルールデータを送信します。
// port 9998
1,50,Rule
下記ログが出力され、KieBaseが作られたとわかります。
INFO Start creation of KieBase: defaultKieBase
INFO End creation of KieBase: defaultKieBase
再度センサーデータを投入すると、今度はアラーム情報がログに出るため、ルールにもとづいて火災判定が行われているようです。
// port 9999
2,51,Sensor
// log
2> FireAlarm(id=2,alarmDate=Sun Nov 03 20:16:01 JST 2024,temperature=51)
ルールの動的更新
最後に、ルールを更新してみましょう。
ルールデータを送信して、火災を判定する境界値を50度 → 60度にかえてみます。
// port 9998
2,60,Rule
今度はセンサーデータとして51度のデータを送っても、アラームは作成されません。
// port 9999
3,51,Sensor
しかし、61度のデータを送るとアラームが作成されるため、ルールが更新されているとわかりました。
// port 9999
4,61,Sensor
// log
1> FireAlarm(id=4,alarmDate=Sun Nov 03 20:16:57 JST 2024,temperature=61)
おわりに
Flink と Drools を使って、ストリーミングデータを処理するルールの実行・動的更新ができました。
今回は判定に使う境界値だけを更新しましたが、工夫次第で DRL ファイルを Input & ルールを動的生成するということもできそうです。
参考
Apache Flink
Drools
紹介したサンプルアプリ