LoginSignup
1
1

More than 5 years have passed since last update.

riakでpre-commit、post-commitする

Last updated at Posted at 2013-02-20

分散キーバリューストアriakは、データを保存する直前および直後に特定の処理を行うことができます。

  • pre-commitはデータを保存する直前に実行されます。JavascriptとErlangが使用できます。
  • post-commitはデータを保存した直後に非同期で実行されます。Erlangのみ使用できます。

pre-commit

pre-commit関数の用意

Javascriptも使えますが、ここではErlangで記述します。
ソースはriak/docsに書いてあったまんま。
やってることは、受け取ったJSONデータがパースできるかのチェック。単にパースして問題なければ、引数で渡されたオブジェクトをそのまま返す。エラーがでたら{fail, Reason}をかえしています。
{fail, Reason}を返した場合、そのオブジェクトの保存はされず、クライアントにエラーを返すようです。

こんな感じでモジュールを用意。

(以下は問題アリ。後述)

validate_json.erl
-module(validate_json).
-export([validate/1]).

validate(Obj) ->
  try
    mochijson2:decode(riak_object:get_value(Obj)),
    Obj
  catch
    throw:invalid_utf8 ->
      {fail, "Invalid JSON: Illegal UTF-8 character"};
    error:Error ->
      {fail, 
         "Invalid JSON:" ++ 
          binary_to_list(list_to_binary(io_lib:format("~p", [Error])))}
  end.

最初は上のコードでいいかと思ってました。
ですが、pre-commitはデータを削除する際にも実行されます。なのでこれだと新規保存、更新時はいいとしても、削除ができません。削除しようとすると必ずfailとなります。

で、削除を考慮したコードは以下。

validate_json.erl
-module(validate_json).
-export([validate/1]).

validate(Obj) ->
  case is_delete_request(Obj) of
    true  -> Obj;
    false -> validate(post_or_put, Obj)
  end.

validate(post_or_put, Obj) ->
  try
    mochijson2:decode(riak_object:get_value(Obj)),
    Obj
  catch
    throw:invalid_utf8 ->
      {fail, "Invalid JSON: Illegal UTF-8 character"};
    error:Error ->
      {fail,
         "Invalid JSON:" ++
          binary_to_list(list_to_binary(io_lib:format("~p", [Error])))}
  end.

is_delete_request(Obj) ->
  Metadata = riak_object:get_metadata(Obj),
  case dict:find(<<"X-Riak-Deleted">>, Metadata) of
    error   -> false;
    {ok, _} -> true
  end.

削除の場合は"X-Riak-Deleted"というメタデータが付加されてるので、これがメタデータの中に存在していたらチェックしません。存在していない場合は新規作成または更新ということになるので、正しいJSONかどうかチェックします。

コンパイル

$ erlc validate_json.erl

エラーが出たら修正。でなければOK。validate_json.beamというファイルがコンパイルの成果物です。

設定とbeamファイルの保存

このvalidate_json.beamを適当なディレクトリに保存します。既に/etc/riak/app.configで

/etc/riak/app.config
{riak_kv, [
  %% ...
  {add_paths, ["/tmp/beams/"]},
  %% ...

のような設定がされている場合は、そのディレクトリ下に、そうでなければ"/tmp/beams"などの下に保存しておいて、上記設定をapp.configに追加します。

$ mkdir /tmp/beams
$ cp validate_json.beam /tmp/beams/
$ sudo vi /etc/riak/app.config #必要なら設定を追加

riakを再起動します。DebianやUbuntuでdpkgから入れた場合は

$ sudo /etc/init.d/riak stop
$ sudo /etc/init.d/riak start

bucketへの設定

bucketのプロパティにprecommit関数を追加します。

$ curl -XPUT -H "Content-Type: application/json" \
     https://127.0.0.1:8098/buckets/messages/props \
     -d '{"props":{"precommit":[{"mod": "validate_json", "fun": "validate"}]}}' \
     --insecure

これ以降は、bucket:messages にデータが保存される直前に必ず上記のprecommitで指定した関数が実行されます。

pre-commitのテスト

ためしにデータを保存してみます。
(ローカルのhttpsで動作していると仮定)

普通に保存してみる

curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count":  1'} --insecure

問題なく保存されました。

不正なJSONを保存してみる

$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d 'this is not json' --insecure

Invalid JSON:{case_clause,<<"this is not json">>}

エラーメッセージが表示されました。
現在の値を取得してみます。

$ curl https://127.0.0.1:8098/buckets/messages/keys/spam3 --insecure

{"count":  1}

最初の値が表示されています。エラーが表示された2回目の保存は実行されなかったようです。

post-commit

次はpost-commit

post-commit関数の用意

post-commitはErlangのみ使用可能です。ソースを用意します。

post_sum.erl
-module(post_sum).
-export([sum/1]).

sum(Obj) ->
  case is_delete_request(Obj) of
     true  -> ignore;
     false -> sum(post_or_put, Obj)
  end.

sum(post_or_put, Obj) ->
  {struct,Prop} = mochijson2:decode(riak_object:get_value(Obj)),
  {<<"count">>, Count} = lists:keyfind(<<"count">>, 1, Prop),

  {ok, C} = riak:local_client(),
  Result = C:get(<<"message_sum">>, <<"sample">>, 1),

  case Result of
    {ok, O1} ->
      {struct,SumProp} = mochijson2:decode(riak_object:get_value(O1)),
      {<<"count">>, SumCount} = lists:keyfind(<<"count">>, 1, SumProp),
      NewObj = create_obj({struct, [{<<"count">>, SumCount + Count}]}),
      O2 = riak_object:update_value(O1, NewObj),
      C:put(O2);
    _ ->
      NewObj = create_obj({struct, [{<<"count">>, Count}]}),
      O2 = riak_object:new(<<"message_sum">>, <<"sample">>, NewObj),
      C:put(O2, 1)
  end.

create_obj(Struct) ->
  NewObjIO = mochijson2:encode(Struct),
  iolist_to_binary(NewObjIO).

is_delete_request(Obj) ->
  Metadata = riak_object:get_metadata(Obj),
  case dict:find(<<"X-Riak-Deleted">>, Metadata) of
    error   -> false;
    {ok, _} -> true
  end.

やってることは、

  • 受け取ったデータが {count: 数値} のようなJSONデータと仮定して、一旦オブジェクトにパース。
  • 既に bucket:message_sum, key:sample に保存されている同様の形式のJSONデータを取ってきてパースした上で、今回の値を加算、加算結果を再び bucket:message_sum, key:sample に保存します。

動作を確認したいだけなので、異常値への対処等はしてません。
あと、これだと同じキーが保存されてもどんどん加算されていくので、保存されている値の合計値では無いですし、削除されたときに消すのもしてません笑
とりあえずサンプルってことで。

コンパイルと保存

コンパイルして、先ほどの"/tmp/beams"に保存します。

$ erlc post_sum.erl
$ cp post_sum.beam /tmp/beams/

riakを再起動します。DebianやUbuntuでdpkgから入れた場合は

$ sudo /etc/init.d/riak stop
$ sudo /etc/init.d/riak start

bucketへの設定

bucketのプロパティにpostcommit関数を追加します。

$ curl -XPUT -H "Content-Type: application/json" \
     https://127.0.0.1:8098/buckets/messages/props \
     -d '{"props":{"postcommit":[{"mod": "post_sum", "fun": "sum"}]}}' \
     --insecur

これ以降は、bucket:messages にデータが保存される直前に必ず上記のpostcommitで指定した関数が実行されます。

post-commit のテスト

データを保存してみます。

$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count":  1'} --insecure

bucket:message_sum, key:sample の値を確認してみます。

$ curl https://127.0.0.1:8098/buckets/message_sum/keys/sample --insecure

{"count":1}

保存されています。
さらにデータを保存してみます。

$ curl -X PUT https://127.0.0.1:8098/buckets/messages/keys/spam3 -H "content-type: application/json" -d '{"count":  100'} --insecure

結果を確認

curl https://192.168.5.220:8098/buckets/message_sum/keys/sample --insecure
{"count":101}

素晴らしい。

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1