Cloud Pub/Subで遊ぶ。emulatorがあってローカルで遊ぶことができる。
ミドルウェアはあまり詳しくないがApache KafkaやAWSのKinesis Streamあたりに近いものだと認識している。
実際には、使い方的にもコスト的にもSQSのほうが似てる気がしているがKafkaもKinesisも使ったことないのでしらない。
Cloud Pub/Subはローカル環境で動作確認するためのエミュレータがある。
サーバを起動するにはgcloud beta emulators pubsub startで実行できる。
クライアント側は環境変数PUBSUB_EMULATOR_HOSTを設定しておけば、接続をエミュレータにながせる。eval $(gcloud beta emulators pubsub env-init)とすると簡単に設定できる。
pubsubは、topicへメッセージををpublishすると、topicをsubscribeしているsubcriberにメッセージを渡すことができる。
subscriberがメッセージを受け取るには二通り用意されていて、pull型とpush型がある。
pull型の場合は定期的にメッセージがないかサーバに取りに行くことになる。
さらに、取得したメッセージの処理したことをサーバに通知する必要がある。ackを送れば良い。ackを送らなければ一定時間経過すると再度イベントが取得できる。
push型の場合はpubsubに登録しておいたURL(endponit)にメッセージをPOSTしてくれる。
Web hookを叩いてくれる感じである。
POSTしたときのステータスコード次第で、ackの代わりになる模様。
試してみる
エミュレータを起動しておく。
gem install gcloudしておく。
適当にreplで作業していることを想定する。
replを起動する前に、
eval $(gcloud beta emulators pubsub env-init)で環境変数を設定する。
irb -r gcloudとかpry -r gcloudとかする。
作業しやすいようにPubSub::Serviceのインスタンスを用意しておく。
gcloud = Gcloud.new "project-id"
pubsub = gcloud.pubsub
エミュレータであれば "project-id"は存在していないプロジェクトIDでも大丈夫だった。
topicをつくる
my-topicという名前のtopicをつくってみる。
topic = pubsub.create_topic "my-topic"
pull型でメッセージを受け取る
まずsubscriptionをつくる。subscribeすると作れる。
名前はpull-subscriber-1としておく。
subscription1 = topic.subscribe "pull-subscriber-1"
せっかくなのでふたつ作ってみる。
subscription2 = topic.subscribe "pull-subscriber-2"
メッセージが受け取れる状態になった。
メッセージをpullしてみる。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => []
メッセージを送ってみる。
topic.publish "hello"
再取得してみる。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => [ReciveMessage的なの]
"hello"と出力されるはずである。
subscription2でも受け取ってみる。
subscription2.pull.each do |message|
p message.message.data
end # => [ReciveMessage的なの]
"hello"と出力される。ackを返していないので、しばらくおいて実行すると再出力される。subscription1のほうはもうメッセージは送られて来ない。
push型でメッセージを受け取る
受け取るためのアプリケーションを用意する必要がある。
別のshellを起動して、以下のrubyコードを実行しておこう。
require "sinatra"
require "json"
require "base64"
post "/push" do
message = JSON.parse request.body.read
data = Base64.decode64 message["message"]["data"]
logger.info data
response.status = 204
end
$ ruby app.rb
sinatraはデフォルトポートは 4567 なのでPOSTしてもらうURLはhttp://localhost:4567/pushとなる。
準備ができたらREPLにもどってsubscriptionをつくる。
subscription1 = topic.subscribe "push-subscriber-1"
subscription1.endpoint = "http://localhost:4567/push"
endpointを指定するとpushになる。
(subscribeの引数にendpoint: "http://localhost:4567/push"としても動作しなかった。バグっぽい。)
あとはメッセージをpublishする。
topic.publish "hoge"
topic.publish "goro"
webアプリケーション側に"hoge"や"goro"が表示される。
ふたつ目のpush型のsubscriptionを用意すれば、2回メッセージがとんでくる。
subscription2 = topic.subscribe "push-subscriber-2"
subscription2.endpoint = "http://localhost:4567/push"
topic.publish "hoge"
topic.publish "goro"
hogeとgoroが2度づつ出力される。
まとめ
ローカルで試せるとTLSの設定が不要なので、push型の動きを確認しやすい。
実際に使う場合は、topicやsubscriptionはGCPのウェブコンソールからも作成できる。