1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FlinkとDroolsでストリーム処理

Last updated at Posted at 2024-01-16

はじめに

本記事ではApache FlinkとDroolsとを組み合わせ、ストリーム処理を行う方法について書いていきます。
ストリーム処理とは、システムにおいて継続的に発生するデータを、リアルタイムで収集・変換・解析することで、ビジネスにとって有用な結果を導きだすことを指します。

金融商品の価格データをもとにした自動売買の仕組みや、工場のセンサーデータをモニタリングすることによる異常検知など、様々なシーンで応用されています。

Flink(Apache Flink)について

Javaで利用できるOSSで、ストリーム処理を実装するための豊富なAPIが提供されています。
データを受け取り、特定の操作をしたうえで後続の処理に流す事が可能です。
Flinkでは、下図のようにData Source・DataStream Transformation(Operator)・Data Sinkという3段階の手続きを記述することでストリーム処理を実装することができます。

  • Data Source
    • データを収集する手続きです
    • ファイル・ソケット通信などからデータを受け取ることが可能です
    • Apache Kafka等との連携も可能なようです
  • DataStream Transformation(Operator)
    • Data Sourceの手続きで収集したデータに対して、変換・集約・フィルターなどの処理が可能です
  • Data Sink
    • データを出力する手続きです
    • ファイル・ソケット・外部システムなどにデータを送信することが可能です

Droolsについて

Javaで利用できるOSSのルールエンジンです。
以下のようなDRL (Drools Rule Language)記法で定義したルールをもとに、入力データを操作することができます。

rule "ルール名"
    when
        {データに対する条件}
    then
        {データが条件をみたしたときに実行する処理}
end

Droolsを使用することで、データを操作するルールをDRLとして記述し、プログラムコードから分離管理することができます。
複雑な業務ロジックを実装する際には、DRLとして記述するほうが理解しやすいものとなり、結果保守性の向上が期待されます。

FlinkとDroolsとの連携

今回は、FlinkアプリのOperator部分にDroolsを組み込むことで、ストリーム処理における一部タスクをルールエンジンで実行することを考えます。
(下イメージ図)

単にストリーム処理をするだけであれば、Flinkのみでも実現できます。
しかし、このような構成をとることで、データ変換処理をJavaコードから分離管理でき、Droolsを使用した際のメリットを享受できると思われます。
次章では、上記構成を実現するサンプルアプリを紹介していきます。

サンプルアプリ

サンプルとして、センサーデータをもとに施設内の火災を検知するアプリを紹介します。
「センサーが感知した温度が50度を超えているなら、火災と判断する」というルールをルールエンジンに定義して、Flinkアプリ上で動かしてみます。

アプリのコードは一部のみ載せているため、全体構成はGithubをご参照ください。

環境、ライブラリバージョン

  • windows11 (WSL2)
  • openjdk 17
  • Apache Flink 1.18.0
  • Drools 8.44.0.Final

データ

最初に、アプリケーションで扱うデータオブジェクトについて紹介します。

まずはセンサーデータのオブジェクトです。
ルールでの判定に用いる温度やタイムスタンプなどの情報をもっています。

SensorData.java
public class SensorData {
    
    public int id;

    // yyyy-MM-ddTHH:mm:ss.SSS
    public Date date;

    public int temperature;
...

Flinkアプリ実行の結果、生成される警報オブジェクトです。

FireAlarm.java
public class FireAlarm {

    public int id;

    // yyyy-MM-ddTHH:mm:ss.SSS
    public Date alarmDate;

    public int temperature;
...

メインロジック

Flinkアプリのメインロジックです。

FireDetectJob.java
public class FireDetectJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Data Sources
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);

        // DataStream Transformations(Operators)
        DataStream<FireAlarm> alarms = dataStream
            .flatMap(new Splitter())
            .keyBy(value -> value.id)
            .process(new FireDetector())
            .name("fire-detector");

        // Data Sinks
        alarms.print();

        env.execute();
    }

    public static class Splitter implements FlatMapFunction<String, SensorData> {
        @Override
        public void flatMap(String sentence, Collector<SensorData> out) throws Exception {
            String[] words = sentence.split(",");
            
            out.collect(new SensorData(Integer.parseInt(words[0]), Date.from(Instant.now()), Integer.parseInt(words[1])));
        }
    }
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
の箇所では、Flinkでストリーム処理を実施するために実行環境を取得しています。

DataStream<String> dataStream = env.socketTextStream("localhost", 9999);の部分では、localhostの9999ポートに届いたデータを文字列型として保持することを宣言しています。

続く、

        // DataStream Transformations(Operators)
        DataStream<FireAlarm> alarms = dataStream
            .flatMap(new Splitter())
            .keyBy(value -> value.id)
            .process(new FireDetector())
            .name("fire-detector");

の箇所でデータの変換や火災の判定処理を行っています。

.flatMap(new Splitter())は、ソケットから受け取ったデータをSensorDataオブジェクトに変換するための処理となります。
今回、ソケットからは{id},{temperature}というカンマ区切り文字列のデータを受け取るので、カンマの位置で分割し、SensorDataオブジェクトにマッピングしています。

.process(new FireDetector())は、FireDetectorクラスのロジックをもとに火災判定を行うロジックです。(後ほど詳しく見ていきます。)

最後に、

        // Data Sinks
        alarms.print();

と宣言して実行結果をコンソール出力することとしています。

Oeprator部分

火災判定を行うロジックであるFireDetectorクラスについてみていきます。

FireDetector.java
public class FireDetector extends KeyedProcessFunction<Integer, SensorData, FireAlarm> {

    KieSession session;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // rule resource
        KieServices kieServices = KieServices.Factory.get();
        KieContainer kContainer = kieServices.getKieClasspathContainer();
        KieBase kieBase = kContainer.getKieBase("fireDetect");
        this.session = kieBase.newKieSession();
        System.out.println("Rule Init Completed!");
    }

    @Override
    public void processElement(
            SensorData sensorData,
            Context context,
            Collector<FireAlarm> collector) throws Exception {

        // execute rule
        session.insert(sensorData);
        session.fireAllRules();

        // query result
        var queryResult = session.getQueryResults("FindAlarm", sensorData.id);
        if (queryResult.size() == 1) {
            FireAlarm fireAlarm = (FireAlarm) queryResult.toList().get(0).get("$f");
            collector.collect(fireAlarm);
        }

        // debug sensor data
        var sensorDataQueryResult = session.getQueryResults("FindSensorData");
        if (sensorDataQueryResult.size() > 0) {
            var debugSensorDatas = sensorDataQueryResult.toList();
            System.out.println(debugSensorDatas);
        }
    }  
}

コードのうちopen関数はFlinkアプリが起動したときに呼ばれる処理を記載しています。
ここではDroolsでルールを実行できるように、ルールエンジンの初期化処理を記載しています。

processElement関数で、SensorDataオブジェクトがFlinkアプリに渡されたときに逐次実行される処理を定義しております。
ここでは、ルールエンジンとのインタフェースであるsessionオブジェクトにデータを渡し、ルールを実行しています。

ルールの実行の結果fireAlarmが生成されていれば、Flinkの機能であるcollector.collectを呼んで、後続の処理に流すようにしています。

大まかな処理の流れについては以上ですが、最後にDrools側の設定について紹介します。

Droolsの設定

Droolsでルールを実行する方法は色々ありますが、今回は以下を作成する方法をとっています。

  • kmodule.xml
    • ルールエンジン初期化のための設定ファイルです
    • KieContainerを生成するときに参照されます
  • DRLファイル
    • 実行するルールの実態です

実際に作成したファイル

kmodule.xml
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools.org/xsd/kmodule">
  <kbase name="fireDetect" eventProcessingMode="stream" packages="org.example.rulebased.streaming.flinkdroolspattern">
    <ksession name="default" type="stateful" />
  </kbase>
</kmodule>
Sample.drl
declare SensorData
    @role( event )
    @timestamp( date )
    @expires( 10s )
end

declare FireAlarm
    @role( event )
    @timestamp( alarmDate )
    @expires( 5s )
end


rule "fire detect"
    when
        $s: SensorData(temperature > 50)
    then
        FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature());
        insert(alarm);
end
....

ルールとしては、SensorDataのtemperatureフィールド(温度)の値を固定値と比較する単純なものとなっています。

kmodule.xmlにある、eventProcessingMode="stream"という設定について補足しておきます。
実はDroolsには、ルールによるストリーム処理をするための便利機能がいくつか用意されており、上記のように指定することで、当該機能を利用できるようになります。

DRLファイルで使用しているアノテーションが当該機能です。
各々の意味合いとしては、以下のようになっています。

  • @role( event ):アノテーションを付与したデータをイベントデータとする(ストリーム処理機能の対象とする)こと
  • @timestamp( {field名} ):指定したフィールドをイベントのタイムスタンプとすること
  • @expires( {時間} ): アノテーションを付与したデータが、指定した時間以降はGCの対象になってよいこと

特に@expiresは、ルールエンジンの中にイベントデータがたまり続けてOutOfMemoryとならないために重要な設定です。

※ Droolsのストリーム処理について詳細は、公式ドキュメントを参照

アプリ実行

長くなりましたが、、ここまでがコードの大まかな説明になります。
それではアプリを実行してみます。
事前にnetcatコマンドを実行して、9999 portでデータの入力を待ち受けるようにします。

nc -lk 9999

その後Mainクラスを実行します。(ばらばらとFlink関連のログがでてきます)

最後にncコマンドを実行した側のターミナルで、以下の値を入力しEnterを押下します。

1,70

するとFlinkアプリ側のターミナルにログが出力されます。

14> FireAlarm(id=1,alarmDate=Sun Jan 14 01:04:06 JST 2024,temperature=70)
[{$s=SensorData(id=1,date=Sun Jan 14 01:04:06 JST 2024,temperature=70)}]

温度が閾値を超えていたので、FireAlarmオブジェクトが生成されたことがわかります。

参考

Apache Flink 公式ドキュメント

Drools 公式ドキュメント

Drools 公式ブログ(今回使用したストリーム処理に関する機能が紹介されています。)

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?