Riak2.0からData Types
という機能が追加されており、Riakに保存するデータの型を定義する事でよりアプリケーションよりの機能を提供してくれる。
公式ドキュメントはこちら
提供される型は
- counters
- sets
- maps
があります。最初に前準備方法を確認し、各型の扱い方をみてみる。
Data Typesを使う準備
Data Types
を使う為にはbucket type
を作る必要がある。
で、そのbucket type
のdatatype
というパラメーターにcounter
, map
, set
のいずれかをセットする。
以下は公式のドキュメントからまるっと取ってきたサンプル。
$ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
maps created
$ riak-admin bucket-type create sets '{"props":{"datatype":"set"}}'
sets created
$ riak-admin bucket-type create counters '{"props":{"datatype":"counter"}}'
counters created
riak-admin bucket-type
コマンドを使ってbucket typeを作り、datatypeプロパティを設定します。bucket type名は任意。ここでは公式ドキュメントに従って、それぞれの型名を複数形にしたmaps
, sets
, counters
をbucket type名にした。
$ riak-admin bucket-typs <bucket名>
で、bucket type
の型が確認できる(他にも色々表示されるけど、datatype
として表示されるのがそれ)。
続いて先ほど作ったbucket type
をアクティブにする必要がある。
$ riak-admin bucket-type activate maps
maps has been activated
同様にsetsとcountersもアクティブにする。
$ riak-admin bucket-type activate sets
sets has been activated
$ riak-admin bucket-type activate counters
counters has been activated
以上で準備完了。
クライアントの準備
公式クライアントが以下に用意されている。
今回はRubyクライアントを使う。2014年9月7日の段階ではgemからインストールしたriak-clientがうまく動かなかったのでgithubのmasterブランチのものを使う。
$ git clone https://github.com/basho/riak-ruby-client.git
$ cd riak-ruby-client
$ bundle install --path vendor/bundle
$ bundle exec irb
>
以上で検証環境ができた。
次にriakへ接続するクライアントを作る。
require "riak"
client = Riak::Client.new(nodes:[{:host => "127.0.0.1", :pb_port => 8087}])
以下、各data typeについて動作確認する。
Counters
counterは数値の増減を操作するdata type。機能自体はシンプルだが、マルチマスタのkey-value storeであるRiakでこれが実装されているのは素晴らしいと思う。
bucket = client.bucket("counters")
# カウンターオブジェクトを作る
counter = Riak::Crdt::Counter.new(bucket, 'traffic_tickets', 'counters')
# バケットに対してデフォルトのbucket typeを指定しておくと、以後は最後の引数を省略できる。
Riak::Crdt::DEFAULT_BUCKET_TYPES[:counter] = 'counters'
counter = Riak::Crdt::Counter.new(bucket, 'traffic_tickets')
カウンターを1増加させる
counter.increment
curlでhttp越しに値を取得してみる
$ curl http://localhost:8098/types/counters/buckets/counters/datatypes/traffic_tickets
{"type":"counter","value":1}
Rubyがわで一気に10ほどカウンターを増加させる
counter.increment 10
curlでhttp越しに確認
$ curl http://localhost:8097/types/counters/buckets/counters/datatypes/traffic_tickets
{"type":"counter","value":11}
カウンターの値を取得
counter.value #=> 11
ここで、もう一つ別のirbコンソールで値を取得してみる。
$ bundle exec irb
require "riak"
client = Riak::Client.new(nodes:[{:host => "127.0.0.1", :pb_port => 8087}])
bucket = client.bucket("counters")
counter = Riak::Crdt::Counter.new(bucket, 'traffic_tickets', 'counters')
counter.value #=> 11
# irb1側からインクリメント
counter.increment
再び、もとのirbコンソールで値を表示
counter.value #=> 11
値は増加していない。Riakから最新の値を取得し直して再度確認
counter.reload
counter.value #=> 12
なお、irbとirb1から同時にインクリメントしても、ちゃんと正しい値になっている。この場合は両方からインクリメントされた値の合計が元の値に足された値が最終的な値になる。
Sets
setは任意の値の集合です。
bucket = client.bucket('travel')
set = Riak::Crdt::Set.new(bucket, 'cities', 'sets')
# または、バケットに対してデフォルトのバケットタイプを指定しておくと最後の引数は省略できます
Riak::Crdt::DEFAULT_BUCKET_TYPES[:set] = 'sets'
set = Riak::Crdt::Set.new(bucket, 'cities')
空かどうか確認。当然作ったばかりなので空(=trueを返す)
set.empty? #=> true
値を追加する
set.add('Montreal')
set.add('Toronto')
setの中身を確認する。
set.members #=> #<Set: {"Montreal", "Toronto"}>
Montrealを削除
set.remove('Montreal')
setの中身を確認する。
set.members #=> #<Set: {"Toronto"}>
値が存在するか確認する
set.include? "not_exist" #=> false
set.include? "Toronto" #=> true
要素の個数を取得
set.members.length #=> 1
Maps
最後はmapです。mapはキーとバリューの組の集合です。
bucket = client.bucket('cat')
mike = Riak::Crdt::Map.new(bucket, 'mike', 'maps')
# または、バケットに対してデフォルトのバケットタイプを指定しておくと最後の引数は省略できます
Riak::Crdt::DEFAULT_BUCKET_TYPES[:map] = 'maps'
mike = Riak::Crdt::Map.new(bucket, 'mike')
mike.registers["name"] = "mike"
# 数値は文字列に変換する必要が在ります。
mike.registers["age"] = 3.to_s
flagsを使うとtrueまたはfalseを保存できます。
mike.flags["male"] = false
mapの中でcounterを使う事も出来ます。
mike.counters["nya-"].increment
値の取得
mike.registers["name"] #=> "mike"
mike.registers["age"] #=> "3"
mike.flags["male"] #=> false
mike.counters["nya-"].value #=> 1
なお、Countersのところで試した通り、各プロセスで保持しているcounterなりsetなりmapなりは、reload
メソッドを呼ぶ事でRiakから値を取得し直して最新の値になる。
ただし値の更新は、値を取得し直して別のプロセスの変更を自オブジェクトに反映せずに更新しても、ちゃんと最終的な値は各プロセスの変更を反映したものになる。これは色々と出番がありそうな機能だなー。