LoginSignup
0
0

More than 3 years have passed since last update.

Apache Pulsar (MQ) を使う Java のサンプルコード

Last updated at Posted at 2020-12-04

概要

  • Apache Pulsar は MQ (メッセージキュー) プラットフォーム
  • Apache Pulsar を使う Java のサンプルコードを動かして挙動を見る
  • 環境: Apache Pulsar 2.7.0 (スタンドアローンモード) + Apache Pulsar Java Client 2.7.0 + Java 11 (AdoptOpenJDK 11.0.9.1+1) + Gradle 6.7.1 + macOS Catalina

Apache Pulsar をインストールしてスタンドアローンモードで起動

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
$ tar xfz apache-pulsar-2.7.0-bin.tar.gz
$ cd apache-pulsar-2.7.0
$ bin/pulsar standalone

テナントとネームスペースを作成

別のターミナルにて pulsar-admin コマンドでテナントとネームスペースを作成する。

$ bin/pulsar-admin tenants create my-tenant
$ bin/pulsar-admin namespaces create my-tenant/my-namespace

サンプルプログラム

サンプルプログラムは3つ

Producer が送信したメッセージを2つの Consumer に分散して処理する。

  • トピックにメッセージを送信する Producer が1つ
    • HelloProducer
  • トピックからメッセージを受信する Consumer が2つ
    • HelloConsumerFirst
    • HelloConsumerSecond

HelloProducer (トピックにメッセージを送信するプログラム)

ソースコード一覧

├── build.gradle
└── src
    └── main
        └── java
            └── HelloProducer.java

build.gradle

apply plugin: ApplicationPlugin
application.mainClassName = 'HelloProducer' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'

src/main/java/HelloProducer.java

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;

class HelloProducer {

  public static void main(String[] args) throws Exception {

    // PulsarClient を生成
    PulsarClient pulsarClient = PulsarClient.builder()
      .serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
      .build();

    // Producer を生成
    // 永続化ポリシー persistent とメッセージの送信先となるトピックを指定
    Producer<byte[]> producer = pulsarClient.newProducer()
      .topic("persistent://my-tenant/my-namespace/my-topic")
      .create();

    for (int i = 0; i < 5; i++) { // メッセージを5個送信
      // トピックにメッセージを送信
      producer.send(String.format("こんにちは世界-%d", i).getBytes());
    }

    // Broker サーバーとのコネクションを切断
    pulsarClient.close();
  }
}

HelloConsumerFirst (トピックからメッセージを受信するプログラム)

ソースコード一覧

├── build.gradle
└── src
    └── main
        └── java
            └── HelloConsumerFirst.java

build.gradle

apply plugin: ApplicationPlugin
application.mainClassName = 'HelloConsumerFirst' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'

src/main/java/HelloConsumerFirst.java

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

class HelloConsumerFirst {

  public static void main(String[] args) throws Exception {

    // PulsarClient を生成
    PulsarClient pulsarClient = PulsarClient.builder()
      .serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
      .build();

    // Consumer を生成
    // 永続化ポリシー persistent とメッセージの送信元となるトピックを指定
    // 複数の Consumer で分散処理するためサブスクリプションタイプに Shared を指定
    // 1つのサブスクリプションを複数の Consumer で処理するため同じサブスクリプション名を指定
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
      .topic("persistent://my-tenant/my-namespace/my-topic")
      .subscriptionType(SubscriptionType.Shared)
      .subscriptionName("my-subscribtion-name")
      .subscribe();

    for (int i = 0; i < 100; i++) { // メッセージを100個まで受信
      // トピックからメッセージを受信
      Message msg = consumer.receive();
      System.out.println("First がメッセージを受信: " + new String(msg.getData()));
      // Broker がメッセージを削除できるように確認応答 ACK を送信
      consumer.acknowledge(msg);
    }

    // Broker サーバーとのコネクションを切断
    pulsarClient.close();
  }
}

HelloConsumerSecond (トピックからメッセージを受信するプログラム)

ソースコード一覧

├── build.gradle
└── src
    └── main
        └── java
            └── HelloConsumerSecond.java

build.gradle

apply plugin: ApplicationPlugin
application.mainClassName = 'HelloConsumerSecond' // 起動クラス名
repositories.mavenCentral()
dependencies.implementation 'org.apache.pulsar:pulsar-client:2.7.0'

src/main/java/HelloConsumerSecond.java

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

class HelloConsumerSecond {

  public static void main(String[] args) throws Exception {

    // PulsarClient を生成
    PulsarClient pulsarClient = PulsarClient.builder()
      .serviceUrl("pulsar://localhost:6650") // 接続先の Broker サーバーを指定
      .build();

    // Consumer を生成
    // 永続化ポリシー persistent とメッセージの送信元となるトピックを指定
    // 複数の Consumer で分散処理するためサブスクリプションタイプに Shared を指定
    // 1つのサブスクリプションを複数の Consumer で処理するため同じサブスクリプション名を指定
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
      .topic("persistent://my-tenant/my-namespace/my-topic")
      .subscriptionType(SubscriptionType.Shared)
      .subscriptionName("my-subscribtion-name")
      .subscribe();

    for (int i = 0; i < 100; i++) { // メッセージを100個まで受信
      // トピックからメッセージを受信
      Message msg = consumer.receive();
      System.out.println("Second がメッセージを受信: " + new String(msg.getData()));
      // Broker がメッセージを削除できるように確認応答 ACK を送信
      consumer.acknowledge(msg);
    }

    // Broker サーバーとのコネクションを切断
    pulsarClient.close();
  }
}

サンプルプログラムを動かして挙動を見る

HelloProducer を gradle run で実行して、トピックにメッセージを5個送信する。

$ gradle run

別のターミナルで HelloConsumerFirst を gradle run で実行すると、トピックからメッセージ5個を受信する。

$ gradle run
(中略)
First がメッセージを受信: こんにちは世界-0
First がメッセージを受信: こんにちは世界-1
First がメッセージを受信: こんにちは世界-2
First がメッセージを受信: こんにちは世界-3
First がメッセージを受信: こんにちは世界-4

また別のターミナルで HelloConsumerSecond を gradle run で実行すると、この時点ではトピックにメッセージが残っていないので何も受信しない。

$ gradle run

HelloConsumerFirst と HelloConsumerSecond が起動している状態で、HelloProducer を実行する。

HelloConsumerFirst と HelloConsumerSecond に分散してメッセージが受信される。

今回は HelloConsumerFirst に2つのメッセージを受信し、

First がメッセージを受信: こんにちは世界-1
First がメッセージを受信: こんにちは世界-3

HelloConsumerSecond に3つのメッセージを受信した。

Second がメッセージを受信: こんにちは世界-0
Second がメッセージを受信: こんにちは世界-2
Second がメッセージを受信: こんにちは世界-4

参考資料

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0