RethinkDB の Realtime Feed を試してみる

More than 3 years have passed since last update.

普段は MongoDB を触っているのですが,RethinkDB も良さそうだなと思ってちょこちょこと触っています.

RethinkDB の公式ガイドにある Ten-minute Guide をやってみようかと.


ひとまず Create / Update / Delete

例えば,players テーブルの変更をすべて受け取るリアルタイムフィードは以下の様になる.

require 'rethinkdb'

include RethinkDB::Shortcuts

r.connect(host: 'localhost', port: 28015).repl
# ここまで下準備

cursor = r.table("players").changes.run
cursor.each{|document| p document}

これを走らせたうえで,別コネクションから players テーブルに更新や挿入を走らせる.

# 上記と別のコネクション

irb(main):024:0> r.table('players').insert([{name: 'Takehiro Ishikawa', number: 7, team: "DB"}]).run()

すると,最初の cursor.each したシェルには,以下の様に出力される.(読みやすいように整形しています.)

{

"new_val"=>{
"id"=>"d5e78984-d91e-4220-a7e5-bced167fbdbc",
"name"=>"Takehiro Ishikawa",
"number"=>7,
"team"=>"DB"
},
"old_val"=>nil
}

更新をしてみる.

irb(main):024:0> r.table('players').filter{|p| p['name'].eq('Takehiro Ishikawa')}.update(previous_number: 52).run()

すると,new_valold_val のそれぞれが入っていることが分かる.

どのような差分が生じたのかは自分で JSON 操作 API などを使って確認する必要があるようですね.

{

"new_val"=>{
"id"=>"d5e78984-d91e-4220-a7e5-bced167fbdbc",
"name"=>"Takehiro Ishikawa",
"number"=>7,
"previous_number"=>52,
"team"=>"DB"
},
"old_val"=>{
"id"=>"d5e78984-d91e-4220-a7e5-bced167fbdbc",
"name"=>"Takehiro Ishikawa",
"number"=>7,
"team"=>"DB"
}
}

続いて,削除.

irb(main):025:0> r.table('players').filter{|p| p['name'].eq('Takehiro Ishikawa')}.delete().run()

すると,予想通り,new_val に紐づくのは nil, old_val には削除する前のドキュメント.

{

"new_val"=>nil,
"old_val"=>{
"id"=>"d5e78984-d91e-4220-a7e5-bced167fbdbc",
"name"=>"Takehiro Ishikawa",
"number"=>7,
"previous_number"=>52,
"team"=>"DB"
}
}


cursor の性質いろいろ


購読の中断

Realtime Feed を読み込んでいる irb にて Ctrl+C をタイプして,フィード購読を中断する.

このあと,もう一度 cursor に対して Feed 購読を開始しようとすると怒られる.

RethinkDB::RqlRuntimeError: Can only iterate over a cursor once.

from /home/vagrant/.rbenv/versions/2.2.3/lib/ruby/gems/2.2.0/gems/rethinkdb-2.2.0.0/lib/net.rb:389:in `each'
from (irb):367
from /home/vagrant/.rbenv/versions/2.2.3/bin/irb:11:in `<main>'


Filter する

Realtime Feed を購読する対象を filter で絞ることが出来る.

例えば,以下のカーソル宣言で,team"DB" であるドキュメントだけを抽出する.

irb(main):376:0* players = r.table('players').filter(team: "DB").changes.run

irb(main):382:0* baystars.each{|player| p player}

購読を開始したうえで,team"DB" のものも,そうでないものも insert する.

irb(main):027:0> r.table('players').insert({name: "Kajitani Takayuki", number: 3, team: "DB"}).run() # (1)

irb(main):032:0* r.table('players').insert({name: "Okajima Hideki", team: "SB", number: 37}).run() # (2)

irb(main):033:0> r.table('players').insert({name: "Kinjo Tatsuhiko", number: 1, team: "DB"}).run() # (3)

すると,Realtime Feed を購読しているシェルには以下のように表示される.

((1) のクエリを実行すると以下の出力)

{
"new_val"=>{
"id"=>"b6b2bd75-5736-4cb8-9991-9eaaa1386159",
"name"=>"Kajitani Takayuki",
"number"=>3, "team"=>"DB"
},
"old_val"=>nil
}
(ここの上下の間に (2) のクエリを実行)
((3) のクエリを実行すると以下の出力)
{
"new_val"=>{
"id"=>"9eddbb22-f8af-44a8-818e-2acf676838b7",
"name"=>"Kinjo Tatsuhiko",
"number"=>1,
"team"=>"DB"
},
"old_val"=>nil
}

(2) のクエリを実行しても,挿入の結果は Realtime Feed の結果には反映されない.

更新するとどうなるか.team"DB" だった金城を "G" に所属させる.

irb(main):043:0> r.table('players').get('9eddbb22-f8af-44a8-818e-2acf676838b7').update(team: "G").run()

購読は以下のようになる.

{

"new_val"=>nil,
"old_val"=>{
"id"=>"9eddbb22-f8af-44a8-818e-2acf676838b7",
"name"=>"Kinjo Tatsuhiko",
"number"=>1,
"team"=>"DB"}
}

むむぅ,金城が巨人に写った後のデータは含まれていない様子.

これは,フィルタの対象が player テーブルではなく,team"DB" で絞った集合に対してリアルタイムフィードを購読しているからだと考えられる.つまり,上記の結果は team"DB" な集合から上記のデータが除去されたことを表している.

もちろん,Update した後のデータのみが filter に当てはまる場合は,new_val のみに値が入っている状態となる.

new_val もしくは old_val のどちらかにおいて team"DB" である,つまりベイスターズから出て行く選手なら出て行った先の情報も,入ってくる選手なら元のデータも取得したいときは,table 全体の Realtime Feed を購読し,その結果のストリームに対してフィルタを適用すれば良い.

# 見やすいように整形しています.

irb(main):399:0> baystars =
r.table('players')
.changes
.filter{ # Realtime Feed で帰ってくるドキュメントの配列にフィルタ
|d| d['new_val']['team'].eq("DB") |
d['old_val']['team'].eq("DB")}
.run
irb(main):400:0> baystars.each{|d| p d}