Cloud Pub/Subで遊ぶ。emulatorがあってローカルで遊ぶことができる。
ミドルウェアはあまり詳しくないがApache KafkaやAWSのKinesis Streamあたりに近いものだと認識している。
実際には、使い方的にもコスト的にもSQSのほうが似てる気がしているがKafkaもKinesisも使ったことないのでしらない。
Cloud Pub/Subはローカル環境で動作確認するためのエミュレータがある。
サーバを起動するにはgcloud beta emulators pubsub start
で実行できる。
クライアント側は環境変数PUBSUB_EMULATOR_HOST
を設定しておけば、接続をエミュレータにながせる。eval $(gcloud beta emulators pubsub env-init)
とすると簡単に設定できる。
pubsubは、topicへメッセージををpublishすると、topicをsubscribeしているsubcriberにメッセージを渡すことができる。
subscriberがメッセージを受け取るには二通り用意されていて、pull型とpush型がある。
pull型の場合は定期的にメッセージがないかサーバに取りに行くことになる。
さらに、取得したメッセージの処理したことをサーバに通知する必要がある。ackを送れば良い。ackを送らなければ一定時間経過すると再度イベントが取得できる。
push型の場合はpubsubに登録しておいたURL(endponit)にメッセージをPOSTしてくれる。
Web hookを叩いてくれる感じである。
POSTしたときのステータスコード次第で、ackの代わりになる模様。
試してみる
エミュレータを起動しておく。
gem install gcloud
しておく。
適当にreplで作業していることを想定する。
replを起動する前に、
eval $(gcloud beta emulators pubsub env-init)
で環境変数を設定する。
irb -r gcloud
とかpry -r gcloud
とかする。
作業しやすいようにPubSub::Service
のインスタンスを用意しておく。
gcloud = Gcloud.new "project-id"
pubsub = gcloud.pubsub
エミュレータであれば "project-id"
は存在していないプロジェクトIDでも大丈夫だった。
topicをつくる
my-topic
という名前のtopicをつくってみる。
topic = pubsub.create_topic "my-topic"
pull型でメッセージを受け取る
まずsubscriptionをつくる。subscribeすると作れる。
名前はpull-subscriber-1
としておく。
subscription1 = topic.subscribe "pull-subscriber-1"
せっかくなのでふたつ作ってみる。
subscription2 = topic.subscribe "pull-subscriber-2"
メッセージが受け取れる状態になった。
メッセージをpullしてみる。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => []
メッセージを送ってみる。
topic.publish "hello"
再取得してみる。
subscription1.pull.each do |message|
p message.message.data
message.acknowledge!
end # => [ReciveMessage的なの]
"hello"
と出力されるはずである。
subscription2でも受け取ってみる。
subscription2.pull.each do |message|
p message.message.data
end # => [ReciveMessage的なの]
"hello"
と出力される。ackを返していないので、しばらくおいて実行すると再出力される。subscription1のほうはもうメッセージは送られて来ない。
push型でメッセージを受け取る
受け取るためのアプリケーションを用意する必要がある。
別のshellを起動して、以下のrubyコードを実行しておこう。
require "sinatra"
require "json"
require "base64"
post "/push" do
message = JSON.parse request.body.read
data = Base64.decode64 message["message"]["data"]
logger.info data
response.status = 204
end
$ ruby app.rb
sinatraはデフォルトポートは 4567 なのでPOSTしてもらうURLはhttp://localhost:4567/push
となる。
準備ができたらREPLにもどってsubscriptionをつくる。
subscription1 = topic.subscribe "push-subscriber-1"
subscription1.endpoint = "http://localhost:4567/push"
endpointを指定するとpushになる。
(subscribeの引数にendpoint: "http://localhost:4567/push"
としても動作しなかった。バグっぽい。)
あとはメッセージをpublishする。
topic.publish "hoge"
topic.publish "goro"
webアプリケーション側に"hoge"
や"goro"
が表示される。
ふたつ目のpush型のsubscriptionを用意すれば、2回メッセージがとんでくる。
subscription2 = topic.subscribe "push-subscriber-2"
subscription2.endpoint = "http://localhost:4567/push"
topic.publish "hoge"
topic.publish "goro"
hoge
とgoro
が2度づつ出力される。
まとめ
ローカルで試せるとTLSの設定が不要なので、push型の動きを確認しやすい。
実際に使う場合は、topicやsubscriptionはGCPのウェブコンソールからも作成できる。