Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
3
Help us understand the problem. What is going on with this article?
@jobhacker

RethinkDBを使ったPubSubを試す

More than 5 years have passed since last update.

サンプル

公開されているpubsubのサンプルを使って説明したいと思います。
https://github.com/rethinkdb/example-pubsub/tree/master/ruby

ソース

# publish.rb

require 'rethinkdb'
require_relative 'repubsub'

include RethinkDB::Shortcuts

exchange = Repubsub::Exchange.new(:topics, db: :repubsub, host: 'localhost', port: 28015)
exchange.topic("topics.subtopics").publish({ notice: "topic changed" })

# subscribe.rb

require 'rethinkdb'
require_relative 'repubsub'

include RethinkDB::Shortcuts

exchange = Repubsub::Exchange.new(:topics, db: :repubsub, host: 'localhost', port: 28015)
queue = exchange.queue{|topic| topic.match('topics.subtopics.*')}
queue.subscription.each do |topic,payload|
  puts "I got the topic: #{topic}"
  puts "With the message: #{payload}"
end

実行

rethinkdb
ruby subscribe.rb
ruby publish.rb

結果

I got the topic: topics.subtopics
With the message: {"notice"=>"topic changed"}

という感じで、簡単に実装できました。

どういう原理で動いてるのかソースをみたところ

subscriptionで最終的に以下のようになっていました。

@table.changes[:new_val].filter{|row| filter_func.call(row[:topic])}

@table.changesのところですが、結果的にはRethinkDBのchangefeedsという機能を使ってるようだったので、本当にそうなっているか確かめてみました。

ソース

# change.rb

require 'rethinkdb'
require 'eventmachine'
require_relative 'repubsub'

include RethinkDB::Shortcuts

conn = r.connect(db: 'repubsub')
EventMachine.run do
  r.table('topics').changes().run(conn).each do |change|
    p change
  end
end

実行

ruby change.rb
ruby publish.rb

結果

updated_onが変更になって、changefeedsの機能でsubscriberに対して通知が行くという仕組みです

【注意】2回目の実行なのでold_valがありますが1回目の場合はnullです。

{ 
  "new_val" =>  {
    "id "=> "faa31cf7-9863-42ad-bca8-5781de2e2cd9", 
    "payload" => {
      "notice" => "topic changed"
    }, 
    "topic" => "topics.subtopics", 
    "updated_on" => 2016-02-11 02:25:03 +0000
  }, 
  "old_val" => {
    "id" => "faa31cf7-9863-42ad-bca8-5781de2e2cd9", 
    "payload" => {
      "notice" => "topic changed"
    }, 
    "topic" => "topics.subtopics", 
    "updated_on" => 2016-02-11 02:16:46 +0000
  }
}

まとめ

指定したDB, テーブルの作成・更新がRethinkDBのchangefeedsの機能でqueueに登録されたsubscriberに対して通知がいくというシンプルな作りです。

ちなみに複数のsubscriberのテストをすると

ruby subscribe.rb
ruby subscribe.rb
ruby publish.rb

とするとそれぞれのsubscriberに

I got the topic: topics.subtopics
With the message: {"notice"=>"topic changed"}

と表示されました。

今回作ったサンプル

3
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
jobhacker
スタートアップ・ベンチャーで20年近くやってるエンジニアです。 新規事業立ち上げを主軸に開発に携わっています。

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
3
Help us understand the problem. What is going on with this article?