LoginSignup
1
0

More than 5 years have passed since last update.

Scala で Google Cloud Pub/Sub の Subscription から Pull してみる

Last updated at Posted at 2018-03-28

はじめに

本来であれば Client Libraries を使用したかったのだが、既存システムが他の Google Cloud Platform サービスに対して REST/HTTP API を使用していたので、Pub/Sub もそちらに併せて実装することにした。
REST/HTTP API には Java 用の API が用意されているため、Scala から Java の API を呼び出す形となる。
REST/HTTP API だとポーリングが API そのものでは出来ないようなので、Client Libraries を使った非同期 pull も実装中。

使うのはこちら

REST/HTTP API
https://cloud.google.com/pubsub/docs/reference/service_apis_overview?hl=ja

使いたかったのはこちら

Cloud Pub/Sub Client Libraries
https://cloud.google.com/pubsub/docs/reference/libraries?hl=ja

sbt

マルチプロジェクトで作っているため詳細は割愛、dependenciesのみ記載。

project/Dependencies.scala
  "com.google.api-client" % "google-api-client" % "1.23.0",
  "com.google.apis" % "google-api-services-pubsub" % "v1-rev377-1.23.0"

com.google.api.services.pubsub.Pubsub の作成

Pubsub のインスタンスを作り、そのインスタンスに対して操作を行う。
なのでまずはPubsub のインスタンスを作るところから始めよう。

指定した GCP のアカウントファイルを読み込み、Pubsub インスタンスを作って返却するファクトリ。

GCPServiceFactory.scala
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.pubsub.{ Pubsub, PubsubScopes }

import scala.collection.JavaConverters.asJavaCollectionConverter
import scala.io.Source

object GCPServiceFactory {

  lazy private val transport = GoogleNetHttpTransport.newTrustedTransport
  lazy private val jsonFactory = JacksonFactory.getDefaultInstance

  // GCP の他のサービスにも対応するファクトリ

  // 複数の GCP プロジェクトに対応するため、Project を指定可能
  // Project はアカウントファイルの情報を保持する case object
  def pubSubService(project: Project): PubSubService = {
    val fileContents = Source
      .fromResource(project.accountFile)
      .getLines
      .mkString
    val is = new ByteArrayInputStream(fileContents.getBytes)
    val credential = GoogleCredential
      .fromStream(is, transport, jsonFactory)
      .createScoped(Seq(PubsubScopes.PUBSUB).asJavaCollection)
    val pubsub = new Pubsub.Builder(transport, jsonFactory, credential)
      .setApplicationName(GCPConfig().applicationName)
      .build

    new PubSubService(pubsub)
  }

}

Pubsub インスタンスへの操作

サブスクリプション名と取得件数を指定して pull を実施、pull したメッセージを処理、ack を呼び出す、という流れを想定。
pull した際のメッセージの構成は以下の通り。

  • data
  • message_id
  • attributes
  • ack_id

最初に述べたとおり、これ自体は pull が呼ばれる度にサブスクリプションを参照する。
サブスクリプションへの参照を作って、リクエストを作って、レスポンスを作って・・・、という一連の流れがどうもしっくりこないが、一通り問題なく動作は出来た。
うーん、Scala なのに Java みたいな実装に見える・・・。

PubSubService.scala
import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.model.{ AcknowledgeRequest, PullRequest }

import scala.collection.JavaConverters._

case class PubsubMessage(messageId: String, data: Option[String], ackId: String)

class PubSubService(pubsub: Pubsub) {

  def pull(subscriptionName: String, maxMessages: Int): Seq[PubsubMessage] = {
    val subscriptions = pubsub
      .projects()
      .subscriptions()
    val pullRequest = new PullRequest()
      .setReturnImmediately(true)
      .setMaxMessages(maxMessages)
    val pullResponse = subscriptions
      .pull(subscriptionName, pullRequest)
      .execute
    val maybeMessages = Option(
      pullResponse
        .getReceivedMessages
        .asScala
    )
    val res = maybeMessages match {
      case Some(messages) =>
        messages.map { message =>
          val pubsubMessage = {
            if (message.getMessage != null && message.getMessage.decodeData != null) {
              Some(new String(message.getMessage.decodeData, "UTF-8"))
            }
            else {
              None
            }
          }
          PubsubMessage(
            message.getMessage.getMessageId,
            pubsubMessage,
            message.getAckId
          )
        }
      case _ => Seq.empty
    }
    res
  }

  def ack(subscriptionName: String, ackIds: Seq[String]): Unit = {
    if (ackIds.nonEmpty) {
      val subscriptions = pubsub
        .projects
        .subscriptions
      val ackRequest = new AcknowledgeRequest()
        .setAckIds(ackIds.asJava)
      subscriptions
        .acknowledge(subscriptionName, ackRequest)
        .execute
    }
  }
}

おまけ(というか備忘)

gcloud コマンドで pull

--auto-ack オプションは付けていないけれども、この場合に ack が送られていないのかまではわからなかった。

gcloud beta pubsub subscriptions pull --limit 1 <subscription>

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0