はじめに
本記事ではApache FlinkとDroolsとを組み合わせ、ストリーム処理を行う方法について書いていきます。
ストリーム処理とは、システムにおいて継続的に発生するデータを、リアルタイムで収集・変換・解析することで、ビジネスにとって有用な結果を導きだすことを指します。
金融商品の価格データをもとにした自動売買の仕組みや、工場のセンサーデータをモニタリングすることによる異常検知など、様々なシーンで応用されています。
Flink(Apache Flink)について
Javaで利用できるOSSで、ストリーム処理を実装するための豊富なAPIが提供されています。
データを受け取り、特定の操作をしたうえで後続の処理に流す事が可能です。
Flinkでは、下図のようにData Source・DataStream Transformation(Operator)・Data Sinkという3段階の手続きを記述することでストリーム処理を実装することができます。
- Data Source
- データを収集する手続きです
- ファイル・ソケット通信などからデータを受け取ることが可能です
- Apache Kafka等との連携も可能なようです
- DataStream Transformation(Operator)
- Data Sourceの手続きで収集したデータに対して、変換・集約・フィルターなどの処理が可能です
- Data Sink
- データを出力する手続きです
- ファイル・ソケット・外部システムなどにデータを送信することが可能です
Droolsについて
Javaで利用できるOSSのルールエンジンです。
以下のようなDRL (Drools Rule Language)記法で定義したルールをもとに、入力データを操作することができます。
rule "ルール名"
when
{データに対する条件}
then
{データが条件をみたしたときに実行する処理}
end
Droolsを使用することで、データを操作するルールをDRLとして記述し、プログラムコードから分離管理することができます。
複雑な業務ロジックを実装する際には、DRLとして記述するほうが理解しやすいものとなり、結果保守性の向上が期待されます。
FlinkとDroolsとの連携
今回は、FlinkアプリのOperator部分にDroolsを組み込むことで、ストリーム処理における一部タスクをルールエンジンで実行することを考えます。
(下イメージ図)
単にストリーム処理をするだけであれば、Flinkのみでも実現できます。
しかし、このような構成をとることで、データ変換処理をJavaコードから分離管理でき、Droolsを使用した際のメリットを享受できると思われます。
次章では、上記構成を実現するサンプルアプリを紹介していきます。
サンプルアプリ
サンプルとして、センサーデータをもとに施設内の火災を検知するアプリを紹介します。
「センサーが感知した温度が50度を超えているなら、火災と判断する」というルールをルールエンジンに定義して、Flinkアプリ上で動かしてみます。
アプリのコードは一部のみ載せているため、全体構成はGithubをご参照ください。
環境、ライブラリバージョン
- windows11 (WSL2)
- openjdk 17
- Apache Flink 1.18.0
- Drools 8.44.0.Final
データ
最初に、アプリケーションで扱うデータオブジェクトについて紹介します。
まずはセンサーデータのオブジェクトです。
ルールでの判定に用いる温度やタイムスタンプなどの情報をもっています。
public class SensorData {
public int id;
// yyyy-MM-ddTHH:mm:ss.SSS
public Date date;
public int temperature;
...
Flinkアプリ実行の結果、生成される警報オブジェクトです。
public class FireAlarm {
public int id;
// yyyy-MM-ddTHH:mm:ss.SSS
public Date alarmDate;
public int temperature;
...
メインロジック
Flinkアプリのメインロジックです。
public class FireDetectJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Data Sources
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
// DataStream Transformations(Operators)
DataStream<FireAlarm> alarms = dataStream
.flatMap(new Splitter())
.keyBy(value -> value.id)
.process(new FireDetector())
.name("fire-detector");
// Data Sinks
alarms.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String, SensorData> {
@Override
public void flatMap(String sentence, Collector<SensorData> out) throws Exception {
String[] words = sentence.split(",");
out.collect(new SensorData(Integer.parseInt(words[0]), Date.from(Instant.now()), Integer.parseInt(words[1])));
}
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
の箇所では、Flinkでストリーム処理を実施するために実行環境を取得しています。
DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
の部分では、localhostの9999ポートに届いたデータを文字列型として保持することを宣言しています。
続く、
// DataStream Transformations(Operators)
DataStream<FireAlarm> alarms = dataStream
.flatMap(new Splitter())
.keyBy(value -> value.id)
.process(new FireDetector())
.name("fire-detector");
の箇所でデータの変換や火災の判定処理を行っています。
.flatMap(new Splitter())
は、ソケットから受け取ったデータをSensorData
オブジェクトに変換するための処理となります。
今回、ソケットからは{id},{temperature}
というカンマ区切り文字列のデータを受け取るので、カンマの位置で分割し、SensorData
オブジェクトにマッピングしています。
.process(new FireDetector())
は、FireDetector
クラスのロジックをもとに火災判定を行うロジックです。(後ほど詳しく見ていきます。)
最後に、
// Data Sinks
alarms.print();
と宣言して実行結果をコンソール出力することとしています。
Oeprator部分
火災判定を行うロジックであるFireDetector
クラスについてみていきます。
public class FireDetector extends KeyedProcessFunction<Integer, SensorData, FireAlarm> {
KieSession session;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// rule resource
KieServices kieServices = KieServices.Factory.get();
KieContainer kContainer = kieServices.getKieClasspathContainer();
KieBase kieBase = kContainer.getKieBase("fireDetect");
this.session = kieBase.newKieSession();
System.out.println("Rule Init Completed!");
}
@Override
public void processElement(
SensorData sensorData,
Context context,
Collector<FireAlarm> collector) throws Exception {
// execute rule
session.insert(sensorData);
session.fireAllRules();
// query result
var queryResult = session.getQueryResults("FindAlarm", sensorData.id);
if (queryResult.size() == 1) {
FireAlarm fireAlarm = (FireAlarm) queryResult.toList().get(0).get("$f");
collector.collect(fireAlarm);
}
// debug sensor data
var sensorDataQueryResult = session.getQueryResults("FindSensorData");
if (sensorDataQueryResult.size() > 0) {
var debugSensorDatas = sensorDataQueryResult.toList();
System.out.println(debugSensorDatas);
}
}
}
コードのうちopen関数はFlinkアプリが起動したときに呼ばれる処理を記載しています。
ここではDroolsでルールを実行できるように、ルールエンジンの初期化処理を記載しています。
processElement
関数で、SensorData
オブジェクトがFlinkアプリに渡されたときに逐次実行される処理を定義しております。
ここでは、ルールエンジンとのインタフェースであるsession
オブジェクトにデータを渡し、ルールを実行しています。
ルールの実行の結果fireAlarm
が生成されていれば、Flinkの機能であるcollector.collect
を呼んで、後続の処理に流すようにしています。
大まかな処理の流れについては以上ですが、最後にDrools側の設定について紹介します。
Droolsの設定
Droolsでルールを実行する方法は色々ありますが、今回は以下を作成する方法をとっています。
-
kmodule.xml
- ルールエンジン初期化のための設定ファイルです
- KieContainerを生成するときに参照されます
- DRLファイル
- 実行するルールの実態です
実際に作成したファイル
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools.org/xsd/kmodule">
<kbase name="fireDetect" eventProcessingMode="stream" packages="org.example.rulebased.streaming.flinkdroolspattern">
<ksession name="default" type="stateful" />
</kbase>
</kmodule>
declare SensorData
@role( event )
@timestamp( date )
@expires( 10s )
end
declare FireAlarm
@role( event )
@timestamp( alarmDate )
@expires( 5s )
end
rule "fire detect"
when
$s: SensorData(temperature > 50)
then
FireAlarm alarm = new FireAlarm($s.getId(), $s.getDate(), $s.getTemperature());
insert(alarm);
end
....
ルールとしては、SensorDataのtemperatureフィールド(温度)の値を固定値と比較する単純なものとなっています。
kmodule.xml
にある、eventProcessingMode="stream"
という設定について補足しておきます。
実はDroolsには、ルールによるストリーム処理をするための便利機能がいくつか用意されており、上記のように指定することで、当該機能を利用できるようになります。
DRLファイルで使用しているアノテーションが当該機能です。
各々の意味合いとしては、以下のようになっています。
-
@role( event )
:アノテーションを付与したデータをイベントデータとする(ストリーム処理機能の対象とする)こと -
@timestamp( {field名} )
:指定したフィールドをイベントのタイムスタンプとすること -
@expires( {時間} )
: アノテーションを付与したデータが、指定した時間以降はGCの対象になってよいこと
特に@expires
は、ルールエンジンの中にイベントデータがたまり続けてOutOfMemoryとならないために重要な設定です。
※ Droolsのストリーム処理について詳細は、公式ドキュメントを参照
アプリ実行
長くなりましたが、、ここまでがコードの大まかな説明になります。
それではアプリを実行してみます。
事前にnetcatコマンドを実行して、9999 portでデータの入力を待ち受けるようにします。
nc -lk 9999
その後Mainクラスを実行します。(ばらばらとFlink関連のログがでてきます)
最後にncコマンドを実行した側のターミナルで、以下の値を入力しEnterを押下します。
1,70
するとFlinkアプリ側のターミナルにログが出力されます。
14> FireAlarm(id=1,alarmDate=Sun Jan 14 01:04:06 JST 2024,temperature=70)
[{$s=SensorData(id=1,date=Sun Jan 14 01:04:06 JST 2024,temperature=70)}]
温度が閾値を超えていたので、FireAlarmオブジェクトが生成されたことがわかります。
参考
Apache Flink 公式ドキュメント
Drools 公式ドキュメント
Drools 公式ブログ(今回使用したストリーム処理に関する機能が紹介されています。)