分散キーバリューストアriakは、データを保存する直前および直後に特定の処理を行うことができます。
- pre-commitはデータを保存する直前に実行されます。JavascriptとErlangが使用できます。
- post-commitはデータを保存した直後に非同期で実行されます。Erlangのみ使用できます。
pre-commit
pre-commit関数の用意
Javascriptも使えますが、ここではErlangで記述します。
ソースはriak/docsに書いてあったまんま。
やってることは、受け取ったJSONデータがパースできるかのチェック。単にパースして問題なければ、引数で渡されたオブジェクトをそのまま返す。エラーがでたら{fail, Reason}をかえしています。
{fail, Reason}を返した場合、そのオブジェクトの保存はされず、クライアントにエラーを返すようです。
こんな感じでモジュールを用意。
(以下は問題アリ。後述)
-module(validate_json).
-export([validate/1]).
validate(Obj) ->
try
mochijson2:decode(riak_object:get_value(Obj)),
Obj
catch
throw:invalid_utf8 ->
{fail, "Invalid JSON: Illegal UTF-8 character"};
error:Error ->
{fail,
"Invalid JSON:" ++
binary_to_list(list_to_binary(io_lib:format("~p", [Error])))}
end.
最初は上のコードでいいかと思ってました。
ですが、pre-commitはデータを削除する際にも実行されます。なのでこれだと新規保存、更新時はいいとしても、削除ができません。削除しようとすると必ずfailとなります。
で、削除を考慮したコードは以下。
-module(validate_json).
-export([validate/1]).
validate(Obj) ->
case is_delete_request(Obj) of
true -> Obj;
false -> validate(post_or_put, Obj)
end.
validate(post_or_put, Obj) ->
try
mochijson2:decode(riak_object:get_value(Obj)),
Obj
catch
throw:invalid_utf8 ->
{fail, "Invalid JSON: Illegal UTF-8 character"};
error:Error ->
{fail,
"Invalid JSON:" ++
binary_to_list(list_to_binary(io_lib:format("~p", [Error])))}
end.
is_delete_request(Obj) ->
Metadata = riak_object:get_metadata(Obj),
case dict:find(<<"X-Riak-Deleted">>, Metadata) of
error -> false;
{ok, _} -> true
end.
削除の場合は"X-Riak-Deleted"というメタデータが付加されてるので、これがメタデータの中に存在していたらチェックしません。存在していない場合は新規作成または更新ということになるので、正しいJSONかどうかチェックします。
コンパイル
$ erlc validate_json.erl
エラーが出たら修正。でなければOK。validate_json.beamというファイルがコンパイルの成果物です。
設定とbeamファイルの保存
このvalidate_json.beamを適当なディレクトリに保存します。既に/etc/riak/app.configで
{riak_kv, [
%% ...
{add_paths, ["/tmp/beams/"]},
%% ...
のような設定がされている場合は、そのディレクトリ下に、そうでなければ"/tmp/beams"などの下に保存しておいて、上記設定をapp.configに追加します。
$ mkdir /tmp/beams
$ cp validate_json.beam /tmp/beams/
$ sudo vi /etc/riak/app.config #必要なら設定を追加
riakを再起動します。DebianやUbuntuでdpkgから入れた場合は
$ sudo /etc/init.d/riak stop
$ sudo /etc/init.d/riak start
bucketへの設定
bucketのプロパティにprecommit関数を追加します。
$ curl -XPUT -H "Content-Type: application/json" \
https://127.0.0.1:8098/buckets/messages/props \
-d '{"props":{"precommit":[{"mod": "validate_json", "fun": "validate"}]}}' \
--insecure
これ以降は、bucket:messages
にデータが保存される直前に必ず上記のprecommitで指定した関数が実行されます。
pre-commitのテスト
ためしにデータを保存してみます。
(ローカルのhttpsで動作していると仮定)
普通に保存してみる
curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count": 1'} --insecure
問題なく保存されました。
不正なJSONを保存してみる
$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d 'this is not json' --insecure
Invalid JSON:{case_clause,<<"this is not json">>}
エラーメッセージが表示されました。
現在の値を取得してみます。
$ curl https://127.0.0.1:8098/buckets/messages/keys/spam3 --insecure
{"count": 1}
最初の値が表示されています。エラーが表示された2回目の保存は実行されなかったようです。
post-commit
次はpost-commit
post-commit関数の用意
post-commitはErlangのみ使用可能です。ソースを用意します。
-module(post_sum).
-export([sum/1]).
sum(Obj) ->
case is_delete_request(Obj) of
true -> ignore;
false -> sum(post_or_put, Obj)
end.
sum(post_or_put, Obj) ->
{struct,Prop} = mochijson2:decode(riak_object:get_value(Obj)),
{<<"count">>, Count} = lists:keyfind(<<"count">>, 1, Prop),
{ok, C} = riak:local_client(),
Result = C:get(<<"message_sum">>, <<"sample">>, 1),
case Result of
{ok, O1} ->
{struct,SumProp} = mochijson2:decode(riak_object:get_value(O1)),
{<<"count">>, SumCount} = lists:keyfind(<<"count">>, 1, SumProp),
NewObj = create_obj({struct, [{<<"count">>, SumCount + Count}]}),
O2 = riak_object:update_value(O1, NewObj),
C:put(O2);
_ ->
NewObj = create_obj({struct, [{<<"count">>, Count}]}),
O2 = riak_object:new(<<"message_sum">>, <<"sample">>, NewObj),
C:put(O2, 1)
end.
create_obj(Struct) ->
NewObjIO = mochijson2:encode(Struct),
iolist_to_binary(NewObjIO).
is_delete_request(Obj) ->
Metadata = riak_object:get_metadata(Obj),
case dict:find(<<"X-Riak-Deleted">>, Metadata) of
error -> false;
{ok, _} -> true
end.
やってることは、
- 受け取ったデータが
{count: 数値}
のようなJSONデータと仮定して、一旦オブジェクトにパース。 - 既に
bucket:message_sum, key:sample
に保存されている同様の形式のJSONデータを取ってきてパースした上で、今回の値を加算、加算結果を再びbucket:message_sum, key:sample
に保存します。
動作を確認したいだけなので、異常値への対処等はしてません。
あと、これだと同じキーが保存されてもどんどん加算されていくので、保存されている値の合計値では無いですし、削除されたときに消すのもしてません笑
とりあえずサンプルってことで。
コンパイルと保存
コンパイルして、先ほどの"/tmp/beams"に保存します。
$ erlc post_sum.erl
$ cp post_sum.beam /tmp/beams/
riakを再起動します。DebianやUbuntuでdpkgから入れた場合は
$ sudo /etc/init.d/riak stop
$ sudo /etc/init.d/riak start
bucketへの設定
bucketのプロパティにpostcommit関数を追加します。
$ curl -XPUT -H "Content-Type: application/json" \
https://127.0.0.1:8098/buckets/messages/props \
-d '{"props":{"postcommit":[{"mod": "post_sum", "fun": "sum"}]}}' \
--insecur
これ以降は、bucket:messages
にデータが保存される直前に必ず上記のpostcommitで指定した関数が実行されます。
post-commit のテスト
データを保存してみます。
$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count": 1'} --insecure
bucket:message_sum, key:sample
の値を確認してみます。
$ curl https://127.0.0.1:8098/buckets/message_sum/keys/sample --insecure
{"count":1}
保存されています。
さらにデータを保存してみます。
$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count": 100'} --insecure
結果を確認
curl https://192.168.5.220:8098/buckets/message_sum/keys/sample --insecure
{"count":101}
素晴らしい。