1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AlpakkaのGoogle Cloud Pub/Subコネクタを使う

1
Posted at

GoogleからCloud PubSub用Javaライブラリが提供されているが、その代替として AlpakkaのCloud Pubsubコネクタがあったので使ってみることにする。

ちなみに先にJavaライブラリを簡単に確認しておくと、、

Javaライブラリ

publisher

以下のような感じ。

Publisher publisher = null;
try {
  publisher = Publisher.newBuilder(topic).build();
  ByteString data = ByteString.copyFromUtf8("my-message");
  PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
  ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
} finally {
  if (publisher != null) {
    publisher.shutdown();
  }
}

subscriber

以下のような感じ。

MessageReceiver receiver =
  new MessageReceiver() {
    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      System.out.println("got message: " + message.getData().toStringUtf8());
      consumer.ack();
    }
  };

MessageReceiverインタフェースを実装する。
メッセージ受信するたびに、receiveMessageメソッドが呼び出されメッセージが渡ってくる。
メッセージが正しく処理されたら最終的にackを返す。

Javaライブラリを使ったSubscriberのプログラムは、メッセージ1件をどう処理するかというスタイルのコーディングとなる。

続いてAlpakka。

Alpakka

公式ドキュメント

Publisher

提供コネクタ

Akka StreamのFlowとしてCloud Pub/SubにPublishする部品が提供されている。

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(projectId, apiKey, email, privateKey, topic)

使うためには、5つの引数に適切な値が設定される必要がある。

  • projectId・・・プロジェクトのID
  • apiKey・・・GCPコンソールより[APIとサービス] - [認証情報] - [認証情報を作成] - [APIキー]で作成される「キー」情報を使用する
  • email・・・プロジェクトを使用するIAMのメンバーのメールアドレス
  • privateKey・・・GCPコンソールより[IAMと管理] - [サービスアカウント] - [サービスアカウントを作成] - [新しい秘密鍵の提供] - [JSON]でダウンロードされるjsonのprivate_key項目の値を使用する
  • topic・・・Cloud PubSubのトピック名

使ってみる

プログラムは以下のような感じになる。
10件のメッセージをpublishするサンプル。メッセージIDが標準出力される。

val source: Source[PublishRequest, NotUsed] = {
  val msgs = (1 to 10).map { i =>
    PubSubMessage(messageId = i.toString, data = new String(Base64.getEncoder.encode(s"Hello${i}".getBytes)))
  }
  Source.single(PublishRequest(msgs))
}

val done: Future[Done] =
  source.via(publishFlow).runForeach(println)

Subscriber

提供コネクタ

メッセージ源泉としてのSource部品。

val subscriptionSource: Source[ReceivedMessage, NotUsed] =
  GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)

Ackを返すSink部品。

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)

なお、各種引数はpublisherのときと同様。ただし、5番目はサブスクリプション名を指定する。

使ってみる

3件分をまとめてAckを返すプログラム。
publishされたメッセージ内容見てないの微妙ですが。。

val done: Future[Done] =
  subscriptionSource
  .map(_.ackId)
  .grouped(3)
  .map(AcknowledgeRequest.apply)
  .runWith(ackSink)

ちなみにCloud PubSubからはどうやってメッセージを受信しているのかな?

alpakka/googlecloud/pubsub/HttpApi.scala
// ・・・略・・・

val uri: Uri = s"$PubSubGoogleApisHost/v1/projects/$project/subscriptions/$subscription:pull"

val request = HttpApi.PullRequest(returnImmediately = true, maxMessages = 1000)

// ・・・略・・・

Cloud PubSubのWEB APIを叩いていて最大1000件がメッセージSourceとなるようだ。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?