3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Pulsar Node.jsクライアントライブラリを使ってみた

Last updated at Posted at 2021-05-18

はじめに

こんにちは。

本稿では、PulsarのNode.jsクライアントライブラリ(以下、pulsar-client-node)を使って、簡単なメッセージの送受信をしていきます。
サーバには、standaloneモードのPulsarを起動させます。

今回は、CentOS 7.5.0 1台の上に以下の環境を構築して試していきます。

  • サーバ

    • Java 11
    • Pulsar(standaloneモード) v2.7.1
  • クライアント

    • Node.js v12.22.1
    • pulsar-client-node v1.3.0
    • pulsar-client-cpp(後述) v2.7.1

サーバの準備

ここでは、メッセージの送受信を行うためのサーバを準備していきます。

まず、Pulsarのstandaloneモードを動かすために、Javaが必要になります。
Javaのインストール後、Pulsarのバイナリファイルが入ったアーカイブファイルをダウンロードします。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/apache-pulsar-2.7.1-bin.tar.gz

ダウンロード完了後、アーカイブファイルを展開します。

$ tar xzf apache-pulsar-2.7.1-bin.tar.gz
$ cd apache-pulsar-2.7.1
$ ls
bin  conf  examples  instances  lib  LICENSE  licenses  NOTICE  README

これでサーバの準備は完了です。

クライアントの準備

ここでは、pulsar-client-nodeをインストールして、メッセージの送受信を行うためのクライアントの準備をしていきます。

pulsar-client-nodeは、ライブラリ内部でApache PulsarのC++クライアントライブラリ(以下、pulsar-client-cpp)を使っているので、下記の順番でインストールする必要があります。

  1. pulsar-client-cpp
  2. pulsar-client-node

pulsar-client-cppのインストール

こちらを参考に、pulsar-client-cppをインストールしていきます。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-2.7.1-1.x86_64.rpm
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-devel-2.7.1-1.x86_64.rpm
$ sudo rpm -ivh apache-pulsar-client*.rpm

pulsar-client-nodeのインストール

pulsar-client-nodeを動かすためには、Node.jsが必要になります。
Node.jsのインストール後、pulsar-client-nodeをインストールしていきます。

まず、pulsar-client-nodeをビルドするために使用されているnode-gypを使うために、gcc-c++をインストールします。

$ sudo yum install -y gcc-c++

次にnpmを使って、pulsar-client-nodeをインストールします。

$ npm install pulsar-client@1.3.0

これでクライアントの準備は完了です。

メッセージの送受信

サーバとクライアントの準備ができたので、実際にメッセージの送受信をしていきます。

まず、Pulsar standaloneモードでサーバを起動させます。

# サーバ側のターミナル
$ ./bin/pulsar standalone

次に、メッセージを受け取る側(以下、Consumer)を起動させ、サーバに接続させ、メッセージを受け取る状態にさせます。
Consumerに使用するコードは、pulsar-client-nodeのリポジトリにある、サンプルコードを使用します。

# Consumer側のターミナル
$ node consumer.js

次に、メッセージを送る側(以下、Producer)を起動させ、サーバに接続させ、メッセージを送ります。
Producerに使用するコードもConsumer同様、サンプルコードを使用します。

# Producer側のターミナル
$ node producer.js

すると、Producerから送信されたメッセージがConsumerに流れ、Consumer側のターミナルで以下のように表示されました。

# Consumer側のターミナル
2021-05-12 18:17:45.650 INFO  [139877982844672] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:39562 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9

TypeScriptからの利用

pulsar-client-nodeは、TypeScriptからでも利用できます。
TypeScriptで書かれたコードをコンパイルするために必要な型定義ファイルは、pulsar-client-nodeのリポジトリに用意されています。

まずは、TypeScriptとコンパイルに必要なパッケージをインストールします。

$ sudo npm install -g typescript
$ npm install --save-dev @types/node

次に、ProducerとConsumerのTypeScriptのコードを用意し、tscコマンドでコンパイルします。

producer_ts.ts
import Pulsar = require('pulsar-client');

(async () => {

  const client: Pulsar.Client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  const producer: Pulsar.Producer = await client.createProducer({
    topic: 'persistent://public/default/my-topic',
  });

  for (let i = 0; i < 10; i += 1) {
    const msg = `my-message-${i}`;
    await producer.send({
      data: Buffer.from(msg),
    });
    console.log(`Sent message: ${msg}`);
  }
  await producer.flush();

  await producer.close();
  await client.close();
})();
consumer_ts.ts
import Pulsar = require('pulsar-client');

(async () => {

  const client: Pulsar.Client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  const consumer: Pulsar.Consumer = await client.subscribe({
    topic: 'persistent://public/default/my-topic',
    subscription: 'sub1',
  });

  for (let i = 0; i < 10; i += 1) {
    const msg: Pulsar.Message = await consumer.receive();
    console.log(msg.getData().toString());
    consumer.acknowledge(msg);
  }

  await consumer.close();
  await client.close();
})();
コンパイル
$ tsc producer_ts.ts consumer_ts.ts

コンパイルが完了すると、producer_ts.jsconsumer_ts.jsが作成され実行可能な状態になります。
以下のコマンドで実行すると、先ほどと同様、Producerから送信されたメッセージがConsumerに流れました。

# Producer側のターミナル
$ node producer_ts.js

# Consumer側のターミナル
$ node consumer_ts.js
2021-05-14 09:24:53.913 INFO  [140592544884480] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:41952 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9

おわりに

本稿では、pulsar-client-nodeを使ったメッセージの送受信を行いました。
また、TypeScriptからpulsar-client-nodeを利用することもできました。

今回、シンプルなProducer、Consumerを作成して試しましたが、他にも様々な機能が実装されています。
公式ドキュメントが参考になるかと思います。

参考URL

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?