pentaho
Kafka
PDI
ETL

Kafkaを使ってPDIでリアルタイム処理を実装する

はじめに

PDIでなるべくリアルタイムに近いタイミングで処理を行ないたいというリクエストがあったので、Pentaho8から実装された「Kafka Consumer」ステップを検証してみました。
これにより、通常バッチ的に利用することを想定しているPDIでもリアルタイムに親しい処理が可能となり、PDIの利用用途が広がるのではないかと思いますので、似たような問題をお抱えの方の役に立てば幸いです。

Kafkaの準備

Kafkaについては、有益な記事がネット上にたくさんあるのでそれらを活用させていただきました。
今回はMAC上で検証しているのでこちらを参考にkafkaの環境を構築しました。

環境構築後、サービスを起動した状態で、トピックを作成しておきます。

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

PDIの準備

こちらを参考にPDIをインストールし、PDIのETL処理をデザインするクライアントツール(Spoon)を起動します。

Kafka Consumerステップの設定を行う

Kafka Consumerステップを配置する

下図のようにKafka Consumerステップを配置して、Transformationを保存します。
名前は、"kafka_test.ktr"としておきます。
kafkatest1.png

Kafka Consumerでメッセージを受け取った後に実行するTransformationを作成する

今回は、プロデューサから書き込んだメッセージを受け取った後、そのメッセージをファイルに出力するところまでの検証としたいので、下図のようにTransformationを設定しておきます。
名前は、"kafka_test_sub.ktr"としておきます。
kafka_test_sub_mod.png

Kafka Consumerステップの設定を行う

Kafka Consumerステップをダブルクリックすると下図のような画面が表示されるので、設定を行ないます。
それぞれ、左から「Setupタブ」、「Batchタブ」、「Fieldsタブ」の設定となります。
「Batchタブ」では実行タイミングを設定しており、今回は1件でもメッセージを受け取った時点でTransformationを実行するように設定しています。
kafkatest5_mod.png

動かしてみる

Spoon上で"kafka_test.ktr"を実行します。
実行すると、下図のように実行状態で待機します。
kafka_test_run1.png

この状態で、プロデューサーからメッセージをいくつか投入してみます。

$ kafka-console-producer --broker-list localhost:9092 -topic test
> Hello!
> HelloHello!
> HelloHelloHello!

すると、以下のようにTransformationが実行されます。
kafka_test_run2.png

出力したファイルの内容を確認してみると、、

$ cat output.txt
Hello!
HelloHello!
HelloHelloHello!

のようになっており、メッセージを受け取って、Transformationを実行し、ファイルに出力していることが分かるかと思います。
実際に手元で動かすと、ほぼほぼリアルタイムにデータが書き込まれることが確認できると思います。

まとめ

今回の検証としては以上ですが、動かすだけであれば難しい設定も必要なく実装することができました。
また、パーティションを増やして、今回作成した"kafka_test.ktr"をPan.shで多数起動しておくことで、分散処理も可能になります。