GoogleからCloud PubSub用Javaライブラリが提供されているが、その代替として AlpakkaのCloud Pubsubコネクタがあったので使ってみることにする。
ちなみに先にJavaライブラリを簡単に確認しておくと、、
Javaライブラリ
publisher
以下のような感じ。
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topic).build();
ByteString data = ByteString.copyFromUtf8("my-message");
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
} finally {
if (publisher != null) {
publisher.shutdown();
}
}
subscriber
以下のような感じ。
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
consumer.ack();
}
};
MessageReceiverインタフェースを実装する。
メッセージ受信するたびに、receiveMessageメソッドが呼び出されメッセージが渡ってくる。
メッセージが正しく処理されたら最終的にackを返す。
Javaライブラリを使ったSubscriberのプログラムは、メッセージ1件をどう処理するかというスタイルのコーディングとなる。
続いてAlpakka。
Alpakka
公式ドキュメント
Publisher
提供コネクタ
Akka StreamのFlowとしてCloud Pub/SubにPublishする部品が提供されている。
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(projectId, apiKey, email, privateKey, topic)
使うためには、5つの引数に適切な値が設定される必要がある。
- projectId・・・プロジェクトのID
- apiKey・・・GCPコンソールより[APIとサービス] - [認証情報] - [認証情報を作成] - [APIキー]で作成される「キー」情報を使用する
- email・・・プロジェクトを使用するIAMのメンバーのメールアドレス
- privateKey・・・GCPコンソールより[IAMと管理] - [サービスアカウント] - [サービスアカウントを作成] - [新しい秘密鍵の提供] - [JSON]でダウンロードされるjsonのprivate_key項目の値を使用する
- topic・・・Cloud PubSubのトピック名
使ってみる
プログラムは以下のような感じになる。
10件のメッセージをpublishするサンプル。メッセージIDが標準出力される。
val source: Source[PublishRequest, NotUsed] = {
val msgs = (1 to 10).map { i =>
PubSubMessage(messageId = i.toString, data = new String(Base64.getEncoder.encode(s"Hello${i}".getBytes)))
}
Source.single(PublishRequest(msgs))
}
val done: Future[Done] =
source.via(publishFlow).runForeach(println)
Subscriber
提供コネクタ
メッセージ源泉としてのSource部品。
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)
Ackを返すSink部品。
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)
なお、各種引数はpublisherのときと同様。ただし、5番目はサブスクリプション名を指定する。
使ってみる
3件分をまとめてAckを返すプログラム。
publishされたメッセージ内容見てないの微妙ですが。。
val done: Future[Done] =
subscriptionSource
.map(_.ackId)
.grouped(3)
.map(AcknowledgeRequest.apply)
.runWith(ackSink)
ちなみにCloud PubSubからはどうやってメッセージを受信しているのかな?
// ・・・略・・・
val uri: Uri = s"$PubSubGoogleApisHost/v1/projects/$project/subscriptions/$subscription:pull"
val request = HttpApi.PullRequest(returnImmediately = true, maxMessages = 1000)
// ・・・略・・・
Cloud PubSubのWEB APIを叩いていて最大1000件がメッセージSourceとなるようだ。