Yahoo! JAPAN Tech Blog向けに寄稿した記事を、会社の許可を得てこちらにも転載しています。
メッセージングPF「Apache Pulsar」の使い方(サーバー編)
こんにちは。ヤフー株式会社システム統括本部の坂本です。現在、私はオープンソースのメッセージングミドルウェアであるApache Pulsarを社内向けの共通プラットフォームとして提供・運用するチームに所属しています。
私たちのチームでは、Pulsarを紹介する記事を過去4回にわたって連載してきました。過去の記事については以下をご覧ください。
- メッセージングPF「Apache Pulsar」の使い方(入門編)
- メッセージングPF「Apache Pulsar」の使い方(クライアント編)
- メッセージングPF「Apache Pulsar」の使い方(クライアント編2)
- Pulsar Summitのセッションと、ヤフーの発表内容紹介
さて、5回目となる今回は、Pulsarのサーバーサイドに関する記事です。
Pulsarには、複数のコンポーネントを1つのJVMプロセスとして手軽に起動できるstandaloneモードが存在しており、これまでの記事ではこのstandaloneモードを用いてPulsarの基本的な使い方を説明してきました。しかし、standaloneモードはあくまでデモンストレーション用といった位置付けであり、プロダクション環境でPulsarをこのモードで稼働させる事はあまりありません。
可用性やパフォーマンスの向上のために、プロダクション環境ではPulsarを構成する各コンポーネントを複数のホスト上で個別のJVMプロセスとして起動し、Pulsarのクラスターを構築する必要があります。本稿では、その具体的な方法をご紹介していきたいと思います。
なお、本稿ではPulsarのバージョン2.6.1を基に解説を行います。
ヤフーとPulsarの関わり
私の所属するチームはPulsarがOSS化される2016年9月頃から携わっており、Pulsarのコミッターも複数名在籍しています。このチームではヤフーの各プロダクト向けにPulsarを運用しつつ、社内における需要・事例をもとに機能拡張・バグ修正などの開発を通じたOSSへの貢献を行っています。
詳細については別記事を投稿しているのでこちらもご覧ください。
Pulsarを構成するコンポーネント
第1回の記事で解説したように、Pulsarのサーバーサイドには複数のコンポーネントが存在します。Pulsarのデプロイ方法を説明する前に、Pulsarを構成する各コンポーネントについて簡単におさらいしておきます。
Broker
ProducerとConsumerの間のメッセージのやりとりを仲介する役割を持つサーバーです。それに加えて、テナントやネームスペースの作成・設定が可能なREST APIも提供しています。
Bookie
オープンソースのストレージシステムであるApache BookKeeperのサーバーです。Brokerは、Producerからトピックに送信されたメッセージをBookieに書き込んで永続化します。永続化されたメッセージは、そのトピックを購読しているConsumerによって受信されるまでBookieに保存されます。
ZooKeeper
BrokerとBookieは、共にメタデータの管理のためにオープンソースのメタデータストアであるApache ZooKeeperを使用しています。Pulsarでは、ZooKeeperを次の2通りの用途で使用します。
- Local ZooKeeper
- それぞれのPulsarクラスターで独立した情報(Broker/Bookieのメタデータやトピックの統計情報など)を管理します。
- Configuration Store(Global ZooKeeper)
- 複数のPulsarクラスターで共有する必要のある情報(テナントやネームスペースの設定情報など)を管理します。
各コンポーネントのデプロイ方法
それでは、ここからは各コンポーネントのデプロイ方法を解説していきます。それに併せて、それぞれのコンポーネントの主要な設定項目もご紹介しようと思います。
前章で説明したように、Brokerが動作するにはBookieとZooKeeperが必要であり、Bookieが動作するにはZooKeeperが必要です。つまり、コンポーネントをデプロイする順番は、
- ZooKeeper
- Bookie
- Broker
でなければなりません。
なお、今回の解説はそれぞれのコンポーネントのサーバー1つを個別の物理マシンまたは仮想マシンにデプロイする事を想定したものです。
準備
以下の準備を、Pulsarのコンポーネントをインストールする全ての環境(ホスト)で実施してください。
Java 11のインストール
Pulsarを構成する全てのコンポーネントはJavaで実装されており、動作環境にはあらかじめJava 11をインストールしておく必要があります。
JDKのバイナリは任意のものを使っていただいて構いませんが、例としてOracleが提供しているOpenJDKのリファレンス実装は次のページからダウンロードできます。
Pulsarのバイナリパッケージのダウンロード
下記のコマンドでPulsar 2.6.1のバイナリパッケージをダウンロードしてください。このパッケージにはPulsarの全てのコンポーネントのバイナリが含まれます。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz
パッケージのダウンロードが完了したら、下記のコマンドでパッケージを解凍・展開します。
$ tar -xvzf apache-pulsar-2.6.1-bin.tar.gz
展開されたパッケージには、下記のディレクトリが含まれています(または、コンポーネントが稼働を開始するタイミングで作成されます)。
ディレクトリ | 含まれるファイル |
---|---|
bin |
pulsar や pulsar-admin といったコマンドラインツール |
conf |
各コンポーネントの設定ファイル |
data |
BookKeeperとZooKeeperがデータを保存するディレクトリ |
lib |
Pulsarによって使用されるJARファイル |
logs |
ログファイル |
ZooKeeperのデプロイ
必要なサーバー数
ZooKeeperはサーバー単独で稼働させる事も可能ですが(standaloneモード)、特にプロダクション環境では複数のサーバーを稼働させ、それらを協調させてサービスを提供するのが一般的です(replicatedモード)。こうしたZooKeeperのサーバーのグループはアンサンブルと呼ばれ、全てのサーバーが同じデータの複製を持っています。
ZooKeeperをreplicatedモードで稼働させる場合、サーバーの数は奇数とする事が推奨されています。つまり、最低でも3台のサーバーを起動する必要があります。
なぜ2台や4台といった偶数ではいけないのでしょうか。これは、ZooKeeperのアンサンブルはその構成サーバーの過半数が稼働していないとサービスを提供できないためです。ZooKeeperのアンサンブルでは、1台のサーバーがリーダー、それ以外のサーバーがフォロワーとなり、リーダーはクライアントから送られてくる更新リクエストを取りまとめてデータの整合性を保つ役割を果たします。こうしたリーダー権限を行使するには、アンサンブルの過半数のサーバーからの支持を得なければなりません。この過半数のサーバー群はクォーラムと呼ばれます。
重要なのは、必要なサーバーの数は「半分以上」ではなく「過半数」である事です。例えば2台のサーバーでアンサンブルを構成した場合、クォーラムは2台となり、ダウンが許容されるサーバーの数は0台でstandaloneモードと変わりません。それどころか、単一障害点が1つから2つに増えている分、standaloneモードよりも可用性は低下してしまいます。
サーバーの起動方法
ZooKeeperのサーバーを起動するには、アンサンブルを構成する全てのサーバーのホスト名を設定ファイル conf/zookeeper.conf
に記述する必要があります。以下は3台のサーバーでアンサンブルを構成する場合の記述例です。
server.1=zk1.pulsar.yahoo.co.jp:2888:3888
server.2=zk2.pulsar.yahoo.co.jp:2888:3888
server.3=zk3.pulsar.yahoo.co.jp:2888:3888
server.N
のNの部分は各サーバーに割り当てるIDです。ホスト名の後についている 2888
と 3888
はTCPのポート番号です。2888
はリーダーとフォロワーのトランザクションのやりとりに使用され、3888
は新しいリーダーを選出する際に使用されます。
続いて、各ZooKeeperサーバーで data/zookeeper
ディレクトリを作成し、その中の myid
ファイルに自身に対応するID(server.N
のNの部分)を記述してください。以下はIDが1である zk1.pulsar.yahoo.co.jp
におけるコマンドの例です。
$ mkdir -p data/zookeeper
$ echo 1 > data/zookeeper/myid
最後に、次のコマンドでZooKeeperサーバーのプロセスをデーモンとして起動します。全てのホストでサーバーを起動できたらZooKeeperのデプロイは完了です。
$ bin/pulsar-daemon start zookeeper
設定項目
server.N
以外にもZooKeeperにはさまざまな設定項目が存在しますが、ほとんどはデフォルトのままで問題ありません。
項目名 | 説明 | デフォルト値 |
---|---|---|
tickTime | ZooKeeperにおける基本的な時間の単位をミリ秒で指定 | 2000 |
initLimit | フォロワーがリーダーに接続して初回の同期を行う際のタイムアウト時間をtickTimeの個数で指定 | 10 |
syncLimit | フォロワーがリーダーに同期する際のタイムアウト時間をtickTimeの個数で指定 | 5 |
dataDir | インメモリのデータベースのスナップショットや更新のトランザクションログが保存されるディレクトリ | data/zookeeper |
clientPort | ZooKeeperのサーバーがクライアントからの接続をリッスンするポート | 2181 |
autopurge.snapRetainCount | dataDirに保持するスナップショットとトランザクションログの個数 | 3 |
autopurge.purgeInterval | 古いスナップショットとトランザクションログを削除する間隔を時間単位で指定 | 1 |
maxClientCnxns | 同時に接続可能なクライアントの最大ソケット数 | 60 |
Configuration Storeについて
さて、既に述べたように、PulsarではZooKeeperをLocal ZooKeeperとConfiguration Storeという2通りの用途で使用します。複数のPulsarクラスターを構築してそれらを統合したい場合(地理的に離れた複数のデータセンターが存在する場合、1つのデータセンターを1つのクラスターとするのが一般的です)、Local ZooKeeperとConfiguration Storeは別々のアンサンブルとする必要があります。つまり、2種類のZooKeeperサーバーのプロセスを起動させなければなりません。
しかし、今回は説明を簡単にするため構築するクラスターは1つだけとし、Local ZooKeeperとConfiguration Storeを同じアンサンブルとします。Pulsarをマルチクラスター構成にしたい場合には、下記の公式ドキュメントを参考にしてください。
クラスターメタデータの初期化
ZooKeeperのデプロイが完了したら、PulsarクラスターのメタデータをZooKeeperに書き込んでおく必要があります。この作業はクラスターの新規構築時に一度だけ実行すれば大丈夫です。
メタデータの初期化に使用するコマンドもPulsarのバイナリパッケージに含まれています。次のようなコマンドを任意のZooKeeperサーバー1台で実行してください。
$ bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-1 \
--zookeeper zk1.pulsar.yahoo.co.jp:2181 \
--configuration-store zk1.pulsar.yahoo.co.jp:2181 \
--web-service-url http://broker.pulsar.yahoo.co.jp:8080 \
--web-service-url-tls https://broker.pulsar.yahoo.co.jp:8443 \
--broker-service-url pulsar://broker.pulsar.yahoo.co.jp:6650 \
--broker-service-url-tls pulsar+ssl://broker.pulsar.yahoo.co.jp:6651
それぞれのコマンドラインオプションの意味は次の通りです。
オプション | 説明 |
---|---|
--cluster |
クラスターの名前 |
--zookeeper |
任意のLocal ZooKeeperサーバー1台のホスト名とポート番号 |
--configuration-store |
任意のConfiguration Storeサーバー1台のホスト名とポート番号 |
--web-service-url |
クラスターのHTTPサービスのURL |
--web-service-url-tls |
クラスターのHTTPSサービスのURL(HTTPSを使用しない場合は不要) |
--broker-service-url |
クラスターのBrokerサービスのURL。URLのスキームは http ではなく pulsar を指定 |
--broker-service-url-tls |
クラスターのTLS BrokerサービスのURL。URLのスキームは https ではなく pulsar+ssl を指定(TLSを使用しない場合は不要) |
なお、上記のコマンド例はクラスター内に存在する複数のBrokerサーバーに到達できる共通のドメイン(ここでは broker.pulsar.yahoo.co.jp
がそれに当たります)を用意できる事を前提としたものです。そうしたドメインの用意が困難な場合には、次のような指定方法で複数のBrokerサーバーのホスト名を列挙する事も可能です。
--web-service-url http://host1:8080,host2:8080,host3:8080 \
--web-service-url-tls https://host1:8443,host2:8443,host3:8443 \
--broker-service-url pulsar://host1:6650,host2:6650,host3:6650 \
--broker-service-url-tls pulsar+ssl://host1:6651,host2:6651,host3:6651
Bookieのデプロイ
必要なサーバー数
ZooKeeperと同じく、BookKeeperも複数のBookieサーバーを構築して冗長化を行います。Bookieサーバーの台数を決定するためには、BookKeeperにおけるデータの保存の仕組みを理解しておく必要があります。
BookKeeperは、ログのようなシーケンシャルなデータをストレージに永続化する機能を提供するミドルウェアです。そうしたデータ1つ1つは、BookKeeperではEntryと呼ばれます。また、連続したEntryのまとまりはLedgerと呼ばれます。BookKeeperはこのLedgerを複数保存できます。言い換えれば、複数の独立したデータストリームを保存可能です。
Pulsarにおいては、トピックのメッセージがBookKeeperのEntryに当たります。あるトピックはオープン状態のLedger 1つとひもづいており、Producerからトピックに送信されたメッセージはそのLedgerにEntryとして追加されていきます。そしてLedgerのEntry数がある程度の数に達したら、そのLedgerはクローズされて新しいLedgerが作成されます。
さて、各Ledgerはメタデータとして次の3つのパラメーターを持っています。
パラメーター | 記号 | 説明 |
---|---|---|
アンサンブルサイズ | E | Ledgerを保存するのに使用されるBookieの数 |
Writeクォーラムサイズ | Qw | 各Entryが書き込まれるBookieの数 |
Ackクォーラムサイズ | Qa | 各Entryの書き込みが完了した事が保証されるBookieの数 |
3つの数値の大小関係はE >= Qw >= Qaとなっています。以降は、これら3つの数値の意味を具体例を挙げつつ見ていこうと思います。
例として、E = 4、Qw = 3、Qa = 2のケースを考えます。新しいLedgerが作成されると、クラスターに存在するBookieの中から4つが選択されます。ここでは、B1・B2・B3・B4という4つのBookieが選択されたとします。しかし、全てのEntryが4つのBookie全てに書き込まれるわけではありません。Qw = 3の場合、各Entryは4つのBookieの内の3つに書き込まれます。具体的には、書き込み先のBookieは次の表のようになります。
Entryの番号 | 書き込み先のBookie |
---|---|
0 | B1, B2, B3 |
1 | B2, B3, B4 |
2 | B3, B4, B1 |
3 | B4, B1, B2 |
4 | B1, B2, B3 |
ただし、これは3つのBookieにEntryの複製が確実に存在する事を意味してはいません。Qa = 2の場合、2つのBookieへの書き込みが完了した時点でそのEntryの永続化は成功したと見なされるためです。2つのBookieにデータが保存されていれば、その内1つのBookieが障害でダウンしたりデータが消えたりしても、もう一方のBookieから同じデータを読み込む事ができます。
クラスター内のBookieの総数がEより少ないとLedgerの作成ができないため、Bookieの総数はE以上でなければなりません。また、クラスターはQa - 1台までのBookieの障害に耐える事ができます。したがって、Bookieの総数はE + Qa - 1以上にするのがいいでしょう。
デフォルトの設定では、E・Qw・Qaの値は全て2であるため、Bookieは少なくとも3台あればよい事になります。
ハードウェア構成
BookKeeperは、ログ先行書き込み(WAL)という仕組みを採用しています。Producerから送信されたメッセージは、その「書き込み操作」の内容が最初にジャーナルと呼ばれるディスク領域に書き込まれます。ジャーナルへのデータの永続化が完了した時点で、BrokerはProducerにメッセージの送信が成功した事を通知します。その後、ジャーナルに記録された書き込み操作はLedgerストレージと呼ばれる別のディスク領域に非同期に反映されます。Ledgerストレージに永続化されたメッセージは、トピックの全てのConsumerによって受信されるまで保存されます。Ledgerストレージへの書き込み中にサーバーの電源が落ちたりした場合でも、ジャーナルからデータを復旧する事が可能です。
BookKeeperが高いパフォーマンスを発揮するには、ジャーナルとLedgerストレージを別々のデバイスとするのが望ましいとされています。ジャーナルには容量は少なくても速度の速いSSDを使用し、Ledgerストレージには速度は遅くても容量の多いHDDを使用するのが一般的です。ただ、こうしたハードウェア構成でなければBookKeeperはデプロイできない、というわけではありません。
サーバーの起動方法
それでは、Bookieサーバーを起動してみましょう。Bookieの設定ファイルは conf/bookkeeper.conf
です。多数の設定項目が存在しますが、最低限、次の設定は変更する必要があります。
# ジャーナルとして使用するディレクトリのパス
journalDirectories=data/bookkeeper/journal
# Ledgerストレージとして使用するディレクトリのパス
ledgerDirectories=data/bookkeeper/ledgers
# Local ZooKeeperのホスト名とポートをコンマ区切りで列挙
# BookieはLedgerのメタデータや自分自身が稼働状態にあるかどうかといった情報をLocal ZooKeeperに保存します
zkServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181
設定ファイルの編集が完了したら、次のコマンドでBookieサーバーのプロセスをデーモンとして起動します。
$ bin/pulsar-daemon start bookie
Bookieサーバーが正常に起動できたかどうかは、次のコマンドでサニティテストを実行する事で確認できます。サニティテストでは、ローカルのBookieに対してLedger(アンサンブルサイズは1)の作成、書き込み、読み込み、削除を行います。
$ bin/bookkeeper shell bookiesanity
サニティテストが成功したら、他のBookieサーバーも同様に起動していきましょう。
設定項目
Bookieの設定項目は多岐にわたるため、その全てをご紹介する事はできません。以下にその中でも比較的重要と思われるものを挙げておきます。
設定項目 | 説明 | デフォルト値 |
---|---|---|
bookiePort | Bookieサーバーがリッスンするポート | 3181 |
useHostNameAsBookieID | Bookieが自分自身のIDとしてホスト名を使用するかどうか。falseの場合はIPアドレスを使用 | false |
numAddWorkerThreads | 書き込みリクエストを処理するワーカースレッドの数。0の場合はNettyのスレッドが直接処理 | 0 |
numReadWorkerThreads | 読み込みリクエストを処理するワーカースレッドの数。0の場合はNettyのスレッドが直接処理 | 8 |
numHighPriorityWorkerThreads | 優先度の高い特殊なリクエストを処理するワーカースレッドの数 | 8 |
autoRecoveryDaemonEnabled | あるBookieがダウンした際にそのデータを他のBookieに再複製するデーモンを起動するかどうか | true |
journalMaxSizeMB | ジャーナルファイル1つあたりの最大サイズ(メガバイト) | 2048 |
journalMaxBackups | 古いジャーナルファイルのバックアップ数 | 5 |
journalSyncData | ジャーナルへの書き込みをディスクにフラッシュしてからBrokerに確認応答するかどうか | true |
prometheusStatsHttpPort | Prometheusのメトリクスのエクスポーターが使用するポート | 8000 |
readOnlyModeEnabled | ディスク使用量がしきい値に達した際にBookieをReadOnlyモードに移行させるかどうか | true |
forceReadOnlyBookie | Bookieを強制的にReadOnlyモードに変更するためのフラグ | false |
diskUsageThreshold | BookieがReadOnlyモードに移行するディスク使用量のしきい値 | 0.95 |
httpServerEnabled | 管理用のAPIを提供するHTTPサーバーを有効にするかどうか | false |
httpServerPort | HTTPサーバーがリッスンするポート | 8000 |
dbStorage_writeCacheMaxSizeMb | 書き込みキャッシュとして使用するダイレクトメモリのサイズ(メガバイト) | ダイレクトメモリの1/4 |
dbStorage_readAheadCacheMaxSizeMb | 読み込みキャッシュとして使用するダイレクトメモリのサイズ(メガバイト) | ダイレクトメモリの1/4 |
Brokerのデプロイ
必要なサーバー数
ZooKeeperとBookieのデプロイが完了したら、いよいよPulsarの核となるコンポーネントであるBrokerをデプロイしていきます。BrokerにはZooKeeperやBookieと違って「少なくとも〇〇台以上起動しなければならない」といった制限は存在しません。単一障害点を作らないように、2台以上起動するのがいいでしょう。
サーバーの起動方法
Brokerの設定ファイルは conf/broker.conf
です。以下の設定項目に適切な値を入れてください。
# Local ZooKeeperのホスト名とポートをコンマ区切りで列挙
zookeeperServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181
# Configuration Storeのホスト名とポートをコンマ区切りで列挙(今回はLocal ZooKeeperと同じ)
configurationStoreServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181
# 「クラスターメタデータの初期化」で登録したクラスター名を指定
clusterName=pulsar-cluster-1
# Ledgerのアンサンブルサイズ
managedLedgerDefaultEnsembleSize=2
# LedgerのWriteクォーラムサイズ
managedLedgerDefaultWriteQuorum=2
# LedgerのAckクォーラムサイズ
managedLedgerDefaultAckQuorum=2
設定ファイルの編集が完了したら、次のコマンドでBrokerサーバーのプロセスをデーモンとして起動します。
$ bin/pulsar-daemon start broker
全てのホストでBrokerサーバーが起動できたら、Pulsarクラスターの構築は完了です。
設定項目
Brokerにもまた、多数の設定項目が存在しています。ここではその一部をご紹介します。
設定項目 | 説明 | デフォルト値 |
---|---|---|
brokerServicePort | Pulsarプロトコルのポート | 6650 |
brokerServicePortTls | TLS用のPulsarプロトコルのポート。空でない場合はTLSが有効になります | |
webServicePort | HTTPサーバーのポート | 8080 |
webServicePortTls | HTTPSサーバーのポート。空でない場合はTLSが有効になります | |
numIOThreads | NettyのI/Oスレッドの数 | CPUのコア数の2倍 |
numHttpServerThreads | HTTPリクエストを処理するスレッドの数 | CPUのコア数の2倍 |
backlogQuotaCheckEnabled | トピックごとのストレージ使用量のチェックを有効にするかどうか | true |
backlogQuotaDefaultLimitGB | 1トピックあたりのストレージの割り当て量(ギガバイト)。0未満の場合は無制限 | -1 |
backlogQuotaDefaultRetentionPolicy | あるトピックのストレージの使用量が割り当て量を超過した場合の挙動(※1) | producer_request_hold |
ttlDurationDefaultInSeconds | Producerから送信されたメッセージの生存時間。0の場合は無限 | 0 |
subscriptionExpirationTimeMinutes | サブスクリプションの生存時間。0の場合は無限 | 0 |
dispatchThrottlingRatePerTopicInMsg | 1トピックあたりのConsumerへのメッセージの配信速度(メッセージ/秒)。0の場合は無制限 | 0 |
dispatchThrottlingRatePerTopicInByte | 1トピックあたりのConsumerへのメッセージの配信速度(バイト/秒)。0の場合は無制限 | 0 |
dispatchThrottlingRatePerSubscriptionInMsg | 1サブスクリプションあたりのConsumerへのメッセージの配信速度(メッセージ/秒)。0の場合は無制限 | 0 |
dispatchThrottlingRatePerSubscriptionInByte | 1サブスクリプションあたりのConsumerへのメッセージの配信速度(バイト/秒)。0の場合は無制限 | 0 |
maxProducersPerTopic | 1トピックに接続可能なProducer数の上限。0の場合は無制限 | 0 |
maxConsumersPerTopic | 1トピックに接続可能なConsumer数の上限。0の場合は無制限 | 0 |
maxConsumersPerSubscription | 1サブスクリプションに接続可能なConsumer数の上限。0の場合は無制限 | 0 |
maxMessageSize | メッセージサイズの上限(バイト) | 5242880 |
maxNumPartitionsPerPartitionedTopic | パーティションドトピックのパーティション数の上限。0の場合は無制限 | 0 |
tlsCertificateFilePath | TLS証明書のファイルパス | |
tlsKeyFilePath | TLS秘密鍵のファイルパス | |
tlsTrustCertsFilePath | Brokerが信頼するTLS証明書のファイルパス | |
tlsAllowInsecureConnection | 安全でないTLSクライアント証明書の使用を許可するかどうか | false |
tlsRequireTrustedClientCertOnConnect | クライアント接続時にTLSクライアント証明書を求めるかどうか | false |
authenticationEnabled | 認証を有効にするかどうか | false |
authenticationProviders | 認証方式を決定するプラグインのクラス名を指定(※2) | |
authorizationEnabled | 認可を有効にするかどうか | false |
superUserRoles | 全ての権限を与える「スーパーユーザー」のロール名を指定 | |
managedLedgerCacheSizeMB | Brokerが保持するLedgerのEntryのキャッシュサイズ(メガバイト) | ダイレクトメモリの1/5 |
defaultRetentionTimeInMinutes | 全てのConsumerに受信されたメッセージを何分間まで保存しておくか | 0 |
defaultRetentionSizeInMB | 全てのConsumerに受信されたメッセージを何メガバイトまで保存しておくか | 0 |
webSocketServiceEnabled | WebSocket APIを有効にするかどうか | false |
functionsWorkerEnabled | Pulsar Functionsのワーカーを起動するかどうか | false |
※1:次の3つのポリシーのいずれかを選択できます。
ポリシー | ストレージの割り当て量を超過した場合の挙動 |
---|---|
producer_request_hold | Producerから送信されたメッセージの永続化を保留します。 |
producer_exception | Producer側で例外がスローされます。 |
consumer_backlog_eviction | バックログのメッセージが古いものから削除されます。 |
※2:Pulsar 2.6.1の時点で実装されている認証プラグインは以下の通りです。
クラス名 | 認証方式 |
---|---|
org.apache.pulsar.broker.authentication.AuthenticationProviderBasic | Basic認証 |
org.apache.pulsar.broker.authentication.AuthenticationProviderTls | TLSクライアント認証 |
org.apache.pulsar.broker.authentication.AuthenticationProviderToken | JSON Web Token認証 |
org.apache.pulsar.broker.authentication.AuthenticationProviderSasl | SASL(Kerberos)認証 |
org.apache.pulsar.broker.authentication.AuthenticationProviderAthenz | Athenz認証 |
動作確認
クラスターの構築が完了したら、実際にPulsarクライアントを使用してメッセージの送受信を試してみましょう。動作確認には、バイナリパッケージに含まれているCLIツール pulsar-admin
および pulsar-client
を使用します。
最初にクライアントの設定ファイル conf/client.conf
を編集する必要があります。次のようにクラスターのURLを指定してください。
# クラスターのHTTPサービスのURL
webServiceUrl=http://broker.pulsar.yahoo.co.jp:8080
# クラスターのBrokerサービスのURL
brokerServiceUrl=pulsar://broker.pulsar.yahoo.co.jp:6650
もし上記のように複数のBrokerサーバーに到達できる共通のドメイン broker.pulsar.yahoo.co.jp
が存在しない場合には、次のように複数のBrokerサーバーのホスト名を列挙してください。
webServiceUrl=http://host1:8080,host2:8080,host3:8080
brokerServiceUrl=pulsar://host1:6650,host2:6650,host3:6650
設定ファイルの編集が完了したら、pulsar-admin
コマンドを使って自分のトピックが所属するテナントおよびネームスペースを作成します。
# 「my-tenant」というテナントを作成
$ bin/pulsar-admin tenants create my-tenant
# 「my-tenant/my-ns」というネームスペースを作成
$ bin/pulsar-admin namespaces create my-tenant/my-ns
続いて、pulsar-client
コマンドを使ってConsumerを起動します。この際、トピックとサブスクリプションは自動的に作成されます。
# トピック「persistent://my-tenant/my-ns/my-topic」にサブスクリプション「my-sub」を作成し、メッセージを5つ受信
$ bin/pulsar-client consume -s my-sub -n 5 persistent://my-tenant/my-ns/my-topic
Consumerを起動したままもう1つターミナルを開き、Producerを起動してメッセージを送信してみます。
# トピック「persistent://my-tenant/my-ns/my-topic」に「my-msg」というメッセージを5つ送信
$ bin/pulsar-client produce -m my-msg -n 5 persistent://my-tenant/my-ns/my-topic
正常に動作していれば、Consumer側で5つのメッセージが受信できるはずです。
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
おわりに
第5回の内容は以上です。今回はPulsarのサーバーサイドのデプロイ方法と各コンポーネントの設定項目の一部をご紹介いたしました。
前述の通り、今回の解説はデプロイ対象の環境が(オンプレミスの)物理マシンまたは仮想マシンである事を前提としたものでした。しかし、Pulsarの公式ドキュメントではAmazon Web ServicesやKubernetesといった環境へのデプロイ方法も紹介されています。詳細は下記のページをご参照ください。
- Deploying a Pulsar cluster on AWS using Terraform and Ansible(外部サイト)
- Deploy Pulsar on Kubernetes(外部サイト)
さて、Pulsarについて紹介する記事の連載は今回でいったん一区切りとなります。日本国内での認知度はまだまだ高いとは言えないPulsarですが、今回の連載で少しでも多くの方に興味を持っていただけたら幸いです。