20
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RubyでCloud Pub/Subをローカルで触ってみる

Last updated at Posted at 2016-07-06

Cloud Pub/Subで遊ぶ。emulatorがあってローカルで遊ぶことができる。

ミドルウェアはあまり詳しくないがApache KafkaやAWSのKinesis Streamあたりに近いものだと認識している。
実際には、使い方的にもコスト的にもSQSのほうが似てる気がしているがKafkaもKinesisも使ったことないのでしらない。

Cloud Pub/Subはローカル環境で動作確認するためのエミュレータがある。
サーバを起動するにはgcloud beta emulators pubsub startで実行できる。

クライアント側は環境変数PUBSUB_EMULATOR_HOSTを設定しておけば、接続をエミュレータにながせる。eval $(gcloud beta emulators pubsub env-init)とすると簡単に設定できる。

pubsubは、topicへメッセージををpublishすると、topicをsubscribeしているsubcriberにメッセージを渡すことができる。
subscriberがメッセージを受け取るには二通り用意されていて、pull型とpush型がある。
pull型の場合は定期的にメッセージがないかサーバに取りに行くことになる。
さらに、取得したメッセージの処理したことをサーバに通知する必要がある。ackを送れば良い。ackを送らなければ一定時間経過すると再度イベントが取得できる。

push型の場合はpubsubに登録しておいたURL(endponit)にメッセージをPOSTしてくれる。
Web hookを叩いてくれる感じである。
POSTしたときのステータスコード次第で、ackの代わりになる模様。

試してみる

エミュレータを起動しておく。

gem install gcloudしておく。

適当にreplで作業していることを想定する。
replを起動する前に、

eval $(gcloud beta emulators pubsub env-init)で環境変数を設定する。

irb -r gcloudとかpry -r gcloudとかする。

作業しやすいようにPubSub::Serviceのインスタンスを用意しておく。

gcloud = Gcloud.new "project-id"
pubsub = gcloud.pubsub

エミュレータであれば "project-id"は存在していないプロジェクトIDでも大丈夫だった。

topicをつくる

my-topicという名前のtopicをつくってみる。

topic = pubsub.create_topic "my-topic"

pull型でメッセージを受け取る

まずsubscriptionをつくる。subscribeすると作れる。
名前はpull-subscriber-1としておく。

subscription1 = topic.subscribe "pull-subscriber-1"

せっかくなのでふたつ作ってみる。

subscription2 = topic.subscribe "pull-subscriber-2"

メッセージが受け取れる状態になった。
メッセージをpullしてみる。

subscription1.pull.each do |message| 
    p message.message.data
    message.acknowledge!
end  # => []

メッセージを送ってみる。

topic.publish "hello"

再取得してみる。

subscription1.pull.each do |message| 
    p message.message.data
    message.acknowledge!
end  # => [ReciveMessage的なの]

"hello"と出力されるはずである。

subscription2でも受け取ってみる。

subscription2.pull.each do |message| 
    p message.message.data
end  # => [ReciveMessage的なの]

"hello"と出力される。ackを返していないので、しばらくおいて実行すると再出力される。subscription1のほうはもうメッセージは送られて来ない。

push型でメッセージを受け取る

受け取るためのアプリケーションを用意する必要がある。
別のshellを起動して、以下のrubyコードを実行しておこう。

app.rb
require "sinatra"
require "json"
require "base64"

post "/push" do
  message = JSON.parse request.body.read
  data = Base64.decode64 message["message"]["data"]
  logger.info data
  response.status = 204
end
$ ruby app.rb

sinatraはデフォルトポートは 4567 なのでPOSTしてもらうURLはhttp://localhost:4567/pushとなる。

準備ができたらREPLにもどってsubscriptionをつくる。

subscription1 = topic.subscribe "push-subscriber-1"
subscription1.endpoint = "http://localhost:4567/push"

endpointを指定するとpushになる。
(subscribeの引数にendpoint: "http://localhost:4567/push"としても動作しなかった。バグっぽい。)

あとはメッセージをpublishする。

topic.publish "hoge"
topic.publish "goro"

webアプリケーション側に"hoge""goro"が表示される。

ふたつ目のpush型のsubscriptionを用意すれば、2回メッセージがとんでくる。

subscription2 = topic.subscribe "push-subscriber-2"
subscription2.endpoint = "http://localhost:4567/push"
topic.publish "hoge"
topic.publish "goro"

hogegoroが2度づつ出力される。

まとめ

ローカルで試せるとTLSの設定が不要なので、push型の動きを確認しやすい。
実際に使う場合は、topicやsubscriptionはGCPのウェブコンソールからも作成できる。

20
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?