概要
- Apache Pulsar は MQ (メッセージキュー) プラットフォーム
- Apache Pulsar を使う Java のサンプルコードを動かして挙動を見る
- 環境: Apache Pulsar 2.7.0 (スタンドアローンモード) + Apache Pulsar Java Client 2.7.0 + Java 11 (AdoptOpenJDK 11.0.9.1+1) + Gradle 6.7.1 + macOS Catalina
Apache Pulsar をインストールしてスタンドアローンモードで起動
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
$ tar xfz apache-pulsar-2.7.0-bin.tar.gz
$ cd apache-pulsar-2.7.0
$ bin/pulsar standalone
テナントとネームスペースを作成
別のターミナルにて pulsar-admin コマンドでテナントとネームスペースを作成する。
$ bin/pulsar-admin tenants create my-tenant
$ bin/pulsar-admin namespaces create my-tenant/my-namespace
サンプルプログラム
サンプルプログラムは3つ
Producer が送信したメッセージを2つの Consumer に分散して処理する。
- トピックにメッセージを送信する Producer が1つ
- HelloProducer
- トピックからメッセージを受信する Consumer が2つ
- HelloConsumerFirst
- HelloConsumerSecond
HelloProducer (トピックにメッセージを送信するプログラム)
ソースコード一覧
├── build.gradle
└── src
└── main
└── java
└── HelloProducer.java
build.gradle
apply plugin: ApplicationPlugin
application.mainClassName = 'HelloProducer' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'
src/main/java/HelloProducer.java
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
class HelloProducer {
public static void main(String[] args) throws Exception {
// PulsarClient を生成
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
.build();
// Producer を生成
// 永続化ポリシー persistent とメッセージの送信先となるトピックを指定
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.create();
for (int i = 0; i < 5; i++) { // メッセージを5個送信
// トピックにメッセージを送信
producer.send(String.format("こんにちは世界-%d", i).getBytes());
}
// Broker サーバーとのコネクションを切断
pulsarClient.close();
}
}
HelloConsumerFirst (トピックからメッセージを受信するプログラム)
ソースコード一覧
├── build.gradle
└── src
└── main
└── java
└── HelloConsumerFirst.java
build.gradle
apply plugin: ApplicationPlugin
application.mainClassName = 'HelloConsumerFirst' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'
src/main/java/HelloConsumerFirst.java
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
class HelloConsumerFirst {
public static void main(String[] args) throws Exception {
// PulsarClient を生成
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
.build();
// Consumer を生成
// 永続化ポリシー persistent とメッセージの送信元となるトピックを指定
// 複数の Consumer で分散処理するためサブスクリプションタイプに Shared を指定
// 1つのサブスクリプションを複数の Consumer で処理するため同じサブスクリプション名を指定
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscribtion-name")
.subscribe();
for (int i = 0; i < 100; i++) { // メッセージを100個まで受信
// トピックからメッセージを受信
Message msg = consumer.receive();
System.out.println("First がメッセージを受信: " + new String(msg.getData()));
// Broker がメッセージを削除できるように確認応答 ACK を送信
consumer.acknowledge(msg);
}
// Broker サーバーとのコネクションを切断
pulsarClient.close();
}
}
HelloConsumerSecond (トピックからメッセージを受信するプログラム)
ソースコード一覧
├── build.gradle
└── src
└── main
└── java
└── HelloConsumerSecond.java
build.gradle
apply plugin: ApplicationPlugin
application.mainClassName = 'HelloConsumerSecond' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'
src/main/java/HelloConsumerSecond.java
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
class HelloConsumerSecond {
public static void main(String[] args) throws Exception {
// PulsarClient を生成
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
.build();
// Consumer を生成
// 永続化ポリシー persistent とメッセージの送信元となるトピックを指定
// 複数の Consumer で分散処理するためサブスクリプションタイプに Shared を指定
// 1つのサブスクリプションを複数の Consumer で処理するため同じサブスクリプション名を指定
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscribtion-name")
.subscribe();
for (int i = 0; i < 100; i++) { // メッセージを100個まで受信
// トピックからメッセージを受信
Message msg = consumer.receive();
System.out.println("Second がメッセージを受信: " + new String(msg.getData()));
// Broker がメッセージを削除できるように確認応答 ACK を送信
consumer.acknowledge(msg);
}
// Broker サーバーとのコネクションを切断
pulsarClient.close();
}
}
サンプルプログラムを動かして挙動を見る
HelloProducer を gradle run で実行して、トピックにメッセージを5個送信する。
$ gradle run
別のターミナルで HelloConsumerFirst を gradle run で実行すると、トピックからメッセージ5個を受信する。
$ gradle run
(中略)
First がメッセージを受信: こんにちは世界-0
First がメッセージを受信: こんにちは世界-1
First がメッセージを受信: こんにちは世界-2
First がメッセージを受信: こんにちは世界-3
First がメッセージを受信: こんにちは世界-4
また別のターミナルで HelloConsumerSecond を gradle run で実行すると、この時点ではトピックにメッセージが残っていないので何も受信しない。
$ gradle run
HelloConsumerFirst と HelloConsumerSecond が起動している状態で、HelloProducer を実行する。
HelloConsumerFirst と HelloConsumerSecond に分散してメッセージが受信される。
今回は HelloConsumerFirst に2つのメッセージを受信し、
First がメッセージを受信: こんにちは世界-1
First がメッセージを受信: こんにちは世界-3
HelloConsumerSecond に3つのメッセージを受信した。
Second がメッセージを受信: こんにちは世界-0
Second がメッセージを受信: こんにちは世界-2
Second がメッセージを受信: こんにちは世界-4
参考資料
- Pulsarのコンセプトとアーキテクチャ
- Set up a standalone Pulsar locally · Apache Pulsar
- Pulsar Javaクライアント
- Pulsar admin CLI · Apache Pulsar
- メッセージングPF「Apache Pulsar」の使い方(入門編) - Yahoo! JAPAN Tech Blog
- メッセージングPF「Apache Pulsar」の使い方(クライアント編) - Yahoo! JAPAN Tech Blog
- メッセージングPF「Apache Pulsar」の使い方(クライアント編2) - Yahoo! JAPAN Tech Blog