はじめに
こんにちは。
本稿では、PulsarのサードパーティのNode.jsクライアントライブラリである、Pulsar Flex(以下、pulsar-flex)を使って、簡単なメッセージの送受信をしていきます。
内部でC++クライアントライブラリを使用する公式のNode.jsクライアントライブラリとは異なり、pulsar-flexはC++クライアントライブラリに依存しないライブラリとなっています。
今回は、MacOS 1台の上に以下の環境を構築して試していきます。
-
サーバ
- OpenJDK 1.8
- Pulsar(standaloneモード) v2.10.1
-
クライアント
- Node.js v16.15.1
- pulsar-flex 1.1.1
サーバの準備
ここでは、メッセージの送受信を行うためのサーバを準備していきます。
まず、Pulsarのstandaloneモードを動かすために、Javaが必要になります。
Javaのインストール後、Pulsarのバイナリファイルが入ったアーカイブファイルをダウンロードします。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
ダウンロード完了後、アーカイブファイルを展開します。
$ tar xzf apache-pulsar-2.10.1-bin.tar.gz
$ cd apache-pulsar-2.10.1
$ ls
LICENSE NOTICE README bin/ conf/ examples/ instances/ lib/ licenses/
これでサーバの準備は完了です。
pulsar-flexのインストール
下記のコマンドを実行し、pulsar-flexをインストールします。
$ npm i pulsar-flex
メッセージの送受信
サーバとクライアントの準備ができたので、実際にメッセージの送受信をしていきます。
サーバの起動
まず、Pulsar standaloneモードでサーバを起動させます。
# サーバ側のターミナル
$ ./bin/pulsar standalone
Consumerの起動
次に、メッセージを受け取る側(以下、Consumer)を起動させ、サーバに接続させ、メッセージを受け取る状態にさせます。
Consumerに使用するコードは以下になります。
const { Consumer } = require('pulsar-flex')
const consumer = new Consumer({
topic: "persistent://public/default/my-topic",
subscription: "my-subscription",
discoveryServers: ['localhost:6650'],
subType: Consumer.SUB_TYPES.EXCLUSIVE,
})
const run = async () => {
await consumer.subscribe()
await consumer.run({
onMessage: async ({ ack, message, properties, redeliveryCount }) => {
await ack();
console.log(message.toString())
}, autoAck: false,
})
}
run().catch(console.error)
下記のコマンドを実行するとConsumerが起動されます。
# Consumer側のターミナル
$ node consumer.js
Producerの起動
次に、メッセージを送る側(以下、Producer)を起動させ、サーバに接続させ、メッセージを送ります。
Producerに使用するコードは以下になります。
const { Producer } = require('pulsar-flex')
const producer = new Producer({
topic: "persistent://public/default/my-topic",
subscription: "my-subscription",
discoveryServers: ['localhost:6650'],
})
const run = async () => {
await producer.create()
await producer.sendMessage({
payload: "Hello World!"
})
}
run().catch(console.error)
下記のコマンドを実行すると、Producerが起動されます。
# Producer側のターミナル
$ node producer.js
上記が実行されると、Producerから送信されたメッセージがConsumerに流れ、Consumer側のターミナルで以下のように表示されます。
# Consumer側のターミナル
{"level":"INFO","timestamp":"2022-09-07T10:09:10.419Z","logger":"pulsar-flex","message":"Changing consumer state -> consumer: undefined(0) STATE: INACTIVE"}
{"level":"INFO","timestamp":"2022-09-07T10:09:10.420Z","logger":"pulsar-flex","message":"Changing consumer state -> consumer: undefined(0) STATE: ACTIVE"}
Hello World!
おわりに
本稿では、pulsar-flexを使ったメッセージの送受信を行いました。
今回、シンプルなProducer、Consumerを作成して試しましたが、他にも様々な機能が実装されています。
下記のドキュメントが参考になるかと思います。