はじめに
本来であれば Client Libraries を使用したかったのだが、既存システムが他の Google Cloud Platform サービスに対して REST/HTTP API を使用していたので、Pub/Sub もそちらに併せて実装することにした。
REST/HTTP API には Java 用の API が用意されているため、Scala から Java の API を呼び出す形となる。
REST/HTTP API だとポーリングが API そのものでは出来ないようなので、Client Libraries を使った非同期 pull も実装中。
使うのはこちら
REST/HTTP API
https://cloud.google.com/pubsub/docs/reference/service_apis_overview?hl=ja
使いたかったのはこちら
Cloud Pub/Sub Client Libraries
https://cloud.google.com/pubsub/docs/reference/libraries?hl=ja
sbt
マルチプロジェクトで作っているため詳細は割愛、dependenciesのみ記載。
"com.google.api-client" % "google-api-client" % "1.23.0",
"com.google.apis" % "google-api-services-pubsub" % "v1-rev377-1.23.0"
com.google.api.services.pubsub.Pubsub の作成
Pubsub のインスタンスを作り、そのインスタンスに対して操作を行う。
なのでまずはPubsub のインスタンスを作るところから始めよう。
指定した GCP のアカウントファイルを読み込み、Pubsub インスタンスを作って返却するファクトリ。
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.pubsub.{ Pubsub, PubsubScopes }
import scala.collection.JavaConverters.asJavaCollectionConverter
import scala.io.Source
object GCPServiceFactory {
lazy private val transport = GoogleNetHttpTransport.newTrustedTransport
lazy private val jsonFactory = JacksonFactory.getDefaultInstance
// GCP の他のサービスにも対応するファクトリ
// 複数の GCP プロジェクトに対応するため、Project を指定可能
// Project はアカウントファイルの情報を保持する case object
def pubSubService(project: Project): PubSubService = {
val fileContents = Source
.fromResource(project.accountFile)
.getLines
.mkString
val is = new ByteArrayInputStream(fileContents.getBytes)
val credential = GoogleCredential
.fromStream(is, transport, jsonFactory)
.createScoped(Seq(PubsubScopes.PUBSUB).asJavaCollection)
val pubsub = new Pubsub.Builder(transport, jsonFactory, credential)
.setApplicationName(GCPConfig().applicationName)
.build
new PubSubService(pubsub)
}
}
Pubsub インスタンスへの操作
サブスクリプション名と取得件数を指定して pull を実施、pull したメッセージを処理、ack を呼び出す、という流れを想定。
pull した際のメッセージの構成は以下の通り。
- data
- message_id
- attributes
- ack_id
最初に述べたとおり、これ自体は pull が呼ばれる度にサブスクリプションを参照する。
サブスクリプションへの参照を作って、リクエストを作って、レスポンスを作って・・・、という一連の流れがどうもしっくりこないが、一通り問題なく動作は出来た。
うーん、Scala なのに Java みたいな実装に見える・・・。
import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.model.{ AcknowledgeRequest, PullRequest }
import scala.collection.JavaConverters._
case class PubsubMessage(messageId: String, data: Option[String], ackId: String)
class PubSubService(pubsub: Pubsub) {
def pull(subscriptionName: String, maxMessages: Int): Seq[PubsubMessage] = {
val subscriptions = pubsub
.projects()
.subscriptions()
val pullRequest = new PullRequest()
.setReturnImmediately(true)
.setMaxMessages(maxMessages)
val pullResponse = subscriptions
.pull(subscriptionName, pullRequest)
.execute
val maybeMessages = Option(
pullResponse
.getReceivedMessages
.asScala
)
val res = maybeMessages match {
case Some(messages) =>
messages.map { message =>
val pubsubMessage = {
if (message.getMessage != null && message.getMessage.decodeData != null) {
Some(new String(message.getMessage.decodeData, "UTF-8"))
}
else {
None
}
}
PubsubMessage(
message.getMessage.getMessageId,
pubsubMessage,
message.getAckId
)
}
case _ => Seq.empty
}
res
}
def ack(subscriptionName: String, ackIds: Seq[String]): Unit = {
if (ackIds.nonEmpty) {
val subscriptions = pubsub
.projects
.subscriptions
val ackRequest = new AcknowledgeRequest()
.setAckIds(ackIds.asJava)
subscriptions
.acknowledge(subscriptionName, ackRequest)
.execute
}
}
}
おまけ(というか備忘)
gcloud コマンドで pull
--auto-ack オプションは付けていないけれども、この場合に ack が送られていないのかまではわからなかった。
gcloud beta pubsub subscriptions pull --limit 1 <subscription>