2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FlinkとDroolsで動的ルールエンジンをつくってみる

Last updated at Posted at 2024-11-03

はじめに

最近、AWS ブログで次の記事が出ていました。

Build a dynamic rules engine with Amazon Managed Service for Apache Flink

Amazon Managed Service for Apache Flink を用いて、ストリーミングデータを処理する動的なルールエンジンを作る方法が紹介されています。

ここで "動的" といっているのは、アプリの再ビルド・デプロイをすることなく、データ処理のロジック(ビジネスルール)を切り替えることができるからだそうです。

ビジネスルール自体もストリーミングデータとして受信することで、動的なロジック変更を可能にしています。

今回は、OSS の Flink と Drools を用いて、似たようなことができないか検証したいと思います。

サンプルアプリ紹介

題材

題材としては、過去記事で紹介した火災判定アプリを採用します。
Flink と Drools の組み合わせについては、過去記事もご参照ください。

構成

前回記事では、「センサーで感知した温度が50度を超えているなら、火災と判断する」というルールをもとに、センサーデータを処理していました。

ビジネスルールはDRL(Drools rule Language) ファイルに記載されており、ルールを変えたければ DRL の編集と再ビルドが必要です。

Sample.drl
rule "fire detect"
    when
        $s: SensorData(temperature > 50)
    then
        FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature());
        insert(alarm);
end

今回は、センサーデータに加え「ルール更新に必要なデータ」※も Flink アプリのインプットとして受け取るようにします。
※ 以降、単に「ルールデータ」と呼びます

具体的には、「◯◯度を超えたら火災と判定するか」の境界値を受信し、火災判定ルールを動的に更新できるようにします。

以降は、サンプルアプリのコードを一部抜粋して紹介していきます。
※ 全量はGithubにアップしています

データクラス

センサーデータを保持するSensorData.java、火災検知時に生成するFireAlarm.javaについては、過去記事と同じ構造です。

SensorData.java
public class SensorData {
    
    public int id;

    // yyyy-MM-ddTHH:mm:ss.SSS
    public Date date;

    public int temperature;
FireAlarm.java
public class FireAlarm {

    public int id;

    // yyyy-MM-ddTHH:mm:ss.SSS
    public Date alarmDate;

    public int temperature;

RuleString.javaは、受信したルールデータから作られるオブジェクトです。
火災判定の境界値である温度情報から、DRL を組み立てて記録しています。

RuleString.java
public class RuleString {

    private String value;

    public RuleString() {
    }

    public RuleString(int temperature) {
        this.value = createRuleStr(temperature);
    }

    private String createRuleStr(int temperature) {
        StringBuilder sb = new StringBuilder();
        sb.append("package org.example.rulebased.streaming.flinkdroolsdynamic; \n");
        sb.append("import org.example.rulebased.streaming.flinkdroolsdynamic.dto.SensorData; \n");
        sb.append("import org.example.rulebased.streaming.flinkdroolsdynamic.dto.FireAlarm; \n");
        sb.append("rule \"fire detect\" \n");
        sb.append("    when \n");
        sb.append("        $s: SensorData(temperature > " + temperature + " ) \n");
        sb.append("    then \n");
        sb.append("        FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature()); \n");
        sb.append("        insert(alarm); \n");
        sb.append("end \n");

        sb.append("query FindAlarm(String sensorDataId) \n");
        sb.append("    $f: FireAlarm(id == sensorDataId) \n");
        sb.append("end \n");

        return sb.toString();
    }

ルール更新処理

ルール更新は、KieBase を動的に生成することで実現します。

KieBaseCreater.java
public class KieBaseCreater {

    public static KieBase createFromRuleString(RuleString ruleString) {
        KieServices ks = KieServices.Factory.get();
        KieRepository kr = ks.getRepository();
        KieFileSystem kfs = ks.newKieFileSystem();

        kfs.write("src/main/resources/org/example/rulebased/streaming/flinkdroolsdynamic/FireDetectorRule.drl", ruleString.getValue());
        KieBuilder kb = ks.newKieBuilder(kfs);
        kb.buildAll();
        KieContainer kContainer = ks.newKieContainer(kr.getDefaultReleaseId());
        return kContainer.getKieBase();
    }
}

KieBaseCreater.javaには、KieFileSystem という Drools の仕組みを用いて、ルールをランタイムで作成するための処理を記載しています。

※ KieFileSystemについては、Drools公式ドキュメント参照

火災判定ロジック

ルールを用いて火災判定を行うクラスとなります。

FireDetector.java
public class FireDetector extends KeyedBroadcastProcessFunction<Long, SensorData, RuleString, FireAlarm> {

    // KieBase Descriptor
    MapStateDescriptor<Void, KieBase> kieBaseDesc;

    Logger logger = LoggerFactory.getLogger(FireDetector.class);

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // init Descriptor
        this.kieBaseDesc = new MapStateDescriptor<>("kieBaseDesc", Types.VOID, Types.GENERIC(KieBase.class));
    }

    @Override
    public void processElement(
            SensorData sensorData,
            ReadOnlyContext ctx,
            Collector<FireAlarm> collector) throws Exception {
        // get kiebase from broadcast state
        KieBase kieBase = ctx.getBroadcastState(this.kieBaseDesc).get(null);
        if (kieBase == null) {
            logger.info("KieBase do not exist.");
            logger.info("Stopped evaluating sensor data.");
            return;
        }

        // execute rule
        var session = kieBase.newKieSession();
        session.insert(sensorData);
        session.fireAllRules();
        logger.info("Rule was fired.");

        var queryResult = session.getQueryResults("FindAlarm", sensorData.id);
        if (queryResult.size() == 1) {
            logger.info("FireAlarm was created.");
            logger.info("sensor data:" + sensorData.toString());
            FireAlarm fireAlarm = (FireAlarm) queryResult.toList().get(0).get("$f");
            collector.collect(fireAlarm);
        }
        session.dispose();

        logger.info("Query was executed.");
    }

    @Override
    public void processBroadcastElement(RuleString ruleString, Context ctx, Collector<FireAlarm> collector) throws Exception {

        BroadcastState<Void, KieBase> state = ctx.getBroadcastState(this.kieBaseDesc);
        state.put(null, KieBaseCreater.createFromRuleString(ruleString));
    }   
}

Flinkアプリで使用するルール(KieBase)の情報は、kieBaseDescフィールドのなかに保持されます。

processElementには、センサーデータを受信したときの処理が定義されます。
kieBaseDesc から KieBase を抜き出し、KieBase内の火災判定ルールをもとにセンサーデータを評価しています。
温度がルール定義の境界値を超過していた場合、FireAlarm オブジェクトを生成します。

processBroadcastElementは、ルールデータの受信時に KieBase を更新する処理となります。
先に紹介したRuleString.javaKieBaseCreater.javaの機能を使って、KieBase を動的に生成し、Flink に保持している KieBase を上書きするようになっています。

アプリ内で KieBase を記録するために、Flink の Broadcast State という機能を使用しています。
Broadcast State を使うことで、メインとなるストリーム処理(今回の場合は火災判定)に必要なオブジェクトを保持するとともに、データ受信時に当該オブジェクトを更新することもできます。

※ Flink の Broadcast Stateについては、この公式ガイドがわかりやすかったです。
  サンプルのようにKeyedBroadcastProcessFunctionを継承したうえで、必要なメソッドをオーバーライドすれば本機能が使えるようになります。

メインロジック

FireDetectJob.javaには、以下の一連の流れを実装しています。

  • Flink の実行環境定義
  • データ受信
  • FireDetector.javaによるルールデータ・センサーデータの処理
  • 処理結果の標準出力
FireDetectJob.java
public class FireDetectJob {

    public void run() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define parallelism 2
        env.setParallelism(2);

        MapStateDescriptor<Void, RuleString> bcStateDescriptor = 
         new MapStateDescriptor<>("kieBaseDesc", Types.VOID, Types.POJO(RuleString.class));

        // Rule Stream
        BroadcastStream<RuleString> ruleStream = createRuleStream(env).broadcast(bcStateDescriptor);
        
        // FireAlarm Stream
        DataStream<FireAlarm> alarms = createAlarmStream(env, ruleStream);

        // Data Sinks
        alarms.print();

        env.execute();
    }

    private DataStream<RuleString> createRuleStream(StreamExecutionEnvironment env) {
        // Data Sources
        DataStream<String> ruleDatas = env.socketTextStream("localhost", 9998);

        var rules = ruleDatas
            .flatMap(new RuleDataSplitter())
            .name("rule-updater");
        return rules;
        
    }

    private DataStream<FireAlarm> createAlarmStream(StreamExecutionEnvironment env, BroadcastStream<RuleString> ruleStream) {
        // Data Sources
        DataStream<String> sensorDatas = env.socketTextStream("localhost", 9999);

        // DataStream Transformations(Operators)
        var alarms = sensorDatas
            .flatMap(new SensorDataSplitter())
            .keyBy(value -> value.id)
            .connect(ruleStream)
            .process(new FireDetector())
            .name("fire-detector");

        return alarms;
    }
}

ルールデータ・センサーデータはソケットで受信するようにしています。
(カンマ区切り文字列として受け取ります)

createRuleStreamメソッドは、ルールデータをカンマ区切りで分割し、RuleStringオブジェクトにマッピングしているだけです。
createAlarmStreamメソッドが火災判定を行っている箇所で、こちらも文字列データをSensorDataオブジェクトにマッピングしたうえで、FireDetector.javaに流しています。

ここでruleStreamconnectメソッドで渡すことで、ルールデータを受け取ったときに KieBase の更新をすることができます。

Main メソッドは、このJobクラスを呼んでいるのみです。

Main.java
public class Main {
    
    public static void main(String[] args) throws Exception {

        var fireDetectJob = new FireDetectJob();
        fireDetectJob.run();
    }
}

アプリ実行

では、アプリを実行してみます。
WSL 環境で netstat コマンドを実行し、ルールデータ・センサーデータを待ち受けます。

// ルールデータ用
nc -lk 9998

// センサーデータ用
nc -lk 9999

その後、メイン関数を実行すると、Flink のログがばらばらと表示され待機状態に入ります。

ルールなしの場合

9999 番ポートにセンサーデータを投入しましょう。

// port 9999
1,51,Sensor

この時点では KieBase が作られていないため、デバッグログが出力されるのみで何も処理はされません。

INFO  KieBase do not exist.
INFO  Stopped evaluating sensor data.

ルールの新規作成

次に、ルールデータを送信します。

// port 9998
1,50,Rule

下記ログが出力され、KieBaseが作られたとわかります。

INFO  Start creation of KieBase: defaultKieBase
INFO  End creation of KieBase: defaultKieBase

再度センサーデータを投入すると、今度はアラーム情報がログに出るため、ルールにもとづいて火災判定が行われているようです。

// port 9999
2,51,Sensor

// log
2> FireAlarm(id=2,alarmDate=Sun Nov 03 20:16:01 JST 2024,temperature=51)

ルールの動的更新

最後に、ルールを更新してみましょう。
ルールデータを送信して、火災を判定する境界値を50度 → 60度にかえてみます。

// port 9998
2,60,Rule

今度はセンサーデータとして51度のデータを送っても、アラームは作成されません。

// port 9999
3,51,Sensor

しかし、61度のデータを送るとアラームが作成されるため、ルールが更新されているとわかりました。

// port 9999
4,61,Sensor

// log
1> FireAlarm(id=4,alarmDate=Sun Nov 03 20:16:57 JST 2024,temperature=61)

おわりに

Flink と Drools を使って、ストリーミングデータを処理するルールの実行・動的更新ができました。
今回は判定に使う境界値だけを更新しましたが、工夫次第で DRL ファイルを Input & ルールを動的生成するということもできそうです。

参考

Apache Flink

Drools

紹介したサンプルアプリ

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?