はじめに
こんにちは。
本稿では、PulsarのNode.jsクライアントライブラリ(以下、pulsar-client-node)を使って、簡単なメッセージの送受信をしていきます。
サーバには、standaloneモードのPulsarを起動させます。
今回は、CentOS 7.5.0 1台の上に以下の環境を構築して試していきます。
-
サーバ
- Java 11
- Pulsar(standaloneモード) v2.7.1
-
クライアント
- Node.js v12.22.1
- pulsar-client-node v1.3.0
- pulsar-client-cpp(後述) v2.7.1
サーバの準備
ここでは、メッセージの送受信を行うためのサーバを準備していきます。
まず、Pulsarのstandaloneモードを動かすために、Javaが必要になります。
Javaのインストール後、Pulsarのバイナリファイルが入ったアーカイブファイルをダウンロードします。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/apache-pulsar-2.7.1-bin.tar.gz
ダウンロード完了後、アーカイブファイルを展開します。
$ tar xzf apache-pulsar-2.7.1-bin.tar.gz
$ cd apache-pulsar-2.7.1
$ ls
bin conf examples instances lib LICENSE licenses NOTICE README
これでサーバの準備は完了です。
クライアントの準備
ここでは、pulsar-client-nodeをインストールして、メッセージの送受信を行うためのクライアントの準備をしていきます。
pulsar-client-nodeは、ライブラリ内部でApache PulsarのC++クライアントライブラリ(以下、pulsar-client-cpp)を使っているので、下記の順番でインストールする必要があります。
- pulsar-client-cpp
- pulsar-client-node
pulsar-client-cppのインストール
こちらを参考に、pulsar-client-cppをインストールしていきます。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-2.7.1-1.x86_64.rpm
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-devel-2.7.1-1.x86_64.rpm
$ sudo rpm -ivh apache-pulsar-client*.rpm
pulsar-client-nodeのインストール
pulsar-client-nodeを動かすためには、Node.jsが必要になります。
Node.jsのインストール後、pulsar-client-nodeをインストールしていきます。
まず、pulsar-client-nodeをビルドするために使用されているnode-gypを使うために、gcc-c++
をインストールします。
$ sudo yum install -y gcc-c++
次にnpmを使って、pulsar-client-nodeをインストールします。
$ npm install pulsar-client@1.3.0
これでクライアントの準備は完了です。
メッセージの送受信
サーバとクライアントの準備ができたので、実際にメッセージの送受信をしていきます。
まず、Pulsar standaloneモードでサーバを起動させます。
# サーバ側のターミナル
$ ./bin/pulsar standalone
次に、メッセージを受け取る側(以下、Consumer)を起動させ、サーバに接続させ、メッセージを受け取る状態にさせます。
Consumerに使用するコードは、pulsar-client-nodeのリポジトリにある、サンプルコードを使用します。
# Consumer側のターミナル
$ node consumer.js
次に、メッセージを送る側(以下、Producer)を起動させ、サーバに接続させ、メッセージを送ります。
Producerに使用するコードもConsumer同様、サンプルコードを使用します。
# Producer側のターミナル
$ node producer.js
すると、Producerから送信されたメッセージがConsumerに流れ、Consumer側のターミナルで以下のように表示されました。
# Consumer側のターミナル
2021-05-12 18:17:45.650 INFO [139877982844672] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:39562 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9
TypeScriptからの利用
pulsar-client-nodeは、TypeScriptからでも利用できます。
TypeScriptで書かれたコードをコンパイルするために必要な型定義ファイルは、pulsar-client-nodeのリポジトリに用意されています。
まずは、TypeScriptとコンパイルに必要なパッケージをインストールします。
$ sudo npm install -g typescript
$ npm install --save-dev @types/node
次に、ProducerとConsumerのTypeScriptのコードを用意し、tsc
コマンドでコンパイルします。
import Pulsar = require('pulsar-client');
(async () => {
const client: Pulsar.Client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
const producer: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
});
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
await producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})();
import Pulsar = require('pulsar-client');
(async () => {
const client: Pulsar.Client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
const consumer: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
});
for (let i = 0; i < 10; i += 1) {
const msg: Pulsar.Message = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
await consumer.close();
await client.close();
})();
$ tsc producer_ts.ts consumer_ts.ts
コンパイルが完了すると、producer_ts.js
とconsumer_ts.js
が作成され実行可能な状態になります。
以下のコマンドで実行すると、先ほどと同様、Producerから送信されたメッセージがConsumerに流れました。
# Producer側のターミナル
$ node producer_ts.js
# Consumer側のターミナル
$ node consumer_ts.js
2021-05-14 09:24:53.913 INFO [140592544884480] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:41952 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9
おわりに
本稿では、pulsar-client-nodeを使ったメッセージの送受信を行いました。
また、TypeScriptからpulsar-client-nodeを利用することもできました。
今回、シンプルなProducer、Consumerを作成して試しましたが、他にも様々な機能が実装されています。
公式ドキュメントが参考になるかと思います。