0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pulsar Flexを使ってみた

Last updated at Posted at 2022-09-07

はじめに

こんにちは。

本稿では、PulsarのサードパーティのNode.jsクライアントライブラリである、Pulsar Flex(以下、pulsar-flex)を使って、簡単なメッセージの送受信をしていきます。

内部でC++クライアントライブラリを使用する公式のNode.jsクライアントライブラリとは異なり、pulsar-flexはC++クライアントライブラリに依存しないライブラリとなっています。

今回は、MacOS 1台の上に以下の環境を構築して試していきます。

  • サーバ

    • OpenJDK 1.8
    • Pulsar(standaloneモード) v2.10.1
  • クライアント

    • Node.js v16.15.1
    • pulsar-flex 1.1.1

サーバの準備

ここでは、メッセージの送受信を行うためのサーバを準備していきます。

まず、Pulsarのstandaloneモードを動かすために、Javaが必要になります。
Javaのインストール後、Pulsarのバイナリファイルが入ったアーカイブファイルをダウンロードします。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz

ダウンロード完了後、アーカイブファイルを展開します。

$ tar xzf apache-pulsar-2.10.1-bin.tar.gz
$ cd apache-pulsar-2.10.1
$ ls
LICENSE    NOTICE     README     bin/       conf/      examples/  instances/ lib/       licenses/

これでサーバの準備は完了です。

pulsar-flexのインストール

下記のコマンドを実行し、pulsar-flexをインストールします。

$ npm i pulsar-flex

メッセージの送受信

サーバとクライアントの準備ができたので、実際にメッセージの送受信をしていきます。

サーバの起動

まず、Pulsar standaloneモードでサーバを起動させます。

# サーバ側のターミナル
$ ./bin/pulsar standalone

Consumerの起動

次に、メッセージを受け取る側(以下、Consumer)を起動させ、サーバに接続させ、メッセージを受け取る状態にさせます。
Consumerに使用するコードは以下になります。

consumer.js
const { Consumer } = require('pulsar-flex')

const consumer = new Consumer({
  topic: "persistent://public/default/my-topic",
  subscription: "my-subscription",
  discoveryServers: ['localhost:6650'],
  subType: Consumer.SUB_TYPES.EXCLUSIVE,
})

const run = async () => {
  await consumer.subscribe()
  await consumer.run({
    onMessage: async ({ ack, message, properties, redeliveryCount }) => {
      await ack();
      console.log(message.toString())
    }, autoAck: false,
  })
}

run().catch(console.error)

下記のコマンドを実行するとConsumerが起動されます。

# Consumer側のターミナル
$ node consumer.js

Producerの起動

次に、メッセージを送る側(以下、Producer)を起動させ、サーバに接続させ、メッセージを送ります。
Producerに使用するコードは以下になります。

producer.js
const { Producer } = require('pulsar-flex')

const producer = new Producer({
  topic: "persistent://public/default/my-topic",
  subscription: "my-subscription",
  discoveryServers: ['localhost:6650'],
})

const run = async () => {
  await producer.create()
  await producer.sendMessage({
    payload: "Hello World!"
  })
}

run().catch(console.error)

下記のコマンドを実行すると、Producerが起動されます。

# Producer側のターミナル
$ node producer.js

上記が実行されると、Producerから送信されたメッセージがConsumerに流れ、Consumer側のターミナルで以下のように表示されます。

# Consumer側のターミナル
{"level":"INFO","timestamp":"2022-09-07T10:09:10.419Z","logger":"pulsar-flex","message":"Changing consumer state -> consumer: undefined(0) STATE: INACTIVE"}
{"level":"INFO","timestamp":"2022-09-07T10:09:10.420Z","logger":"pulsar-flex","message":"Changing consumer state -> consumer: undefined(0) STATE: ACTIVE"}
Hello World!

おわりに

本稿では、pulsar-flexを使ったメッセージの送受信を行いました。
今回、シンプルなProducer、Consumerを作成して試しましたが、他にも様々な機能が実装されています。
下記のドキュメントが参考になるかと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?