UDFでもっと効率厨に
Aerospikeは定義済み関数を作れます。
ストアドプロシージャ的なものですかね。
関数はLua言語でスクリプトを作成し、Aerospikeサーバに登録して使います。
UDFは用途によって大きく二種類に分かれます
Record UDF
keyに対するレコードに定義した処理を行う。
他のレコードや、他Setに対しての操作や参照を行うことはできない。
例:aerospike 公式のサンプルコード
- UDFのコード (lua スクリプト)
stats.lua
-- 最初の引数はrecにする決まり 以降の引数はいくつでも定義できる
function update_stats( rec, newValue )
local meth = "update_stats()";
local rc = 0;
if( not aerospike:exists( rec ) ) then
-- 1レコード目の処理
-- レコードを新規作成し、各Binに初期値を入れる
rc = aerospike:create( rec );
if( rc == nil or rec == nil ) then
error("ERROR creating record");
end
rec["MaxValue"] = newValue;
rec["MinValue"] = newValue;
rec["Sum"] = newValue;
rec["AveValue"] = newValue;
rec["ValueCount"] = 1;
else
-- 既にレコードが存在する場合
-- 各集計値を計算し Binに設定
local sum = 0;
local count = 1;
if( rec["MaxValue"] == nil or newValue > rec["MaxValue"] ) then
rec["MaxValue"] = newValue; -- write the value if new or changed
end
if( rec["MinValue"] == nil or newValue < rec["MinValue"] ) then
rec["MinValue"] = newValue; -- write the value if new or changed
end
if( rec["ValueCount"] == nil ) then
count = 1; -- this is the first one
else
count = rec["ValueCount"];
count = count + 1; -- increment the value
end
if( rec["Sum"] == nil or type(rec["Sum"]) ~= "number" ) then
sum = newValue;
else
sum = rec["Sum"];
sum = sum + newValue; -- Notice that we do NOT do this: rec["sum"] = rec["sum" + 1;
end
rec["ValueCount"] = count;
rec["Sum"] = sum;
rec["AveValue"] = sum / count;
end
-- レコードの更新処理
rc = aerospike:update( rec );
if( rc ~= nil and rc ~= 0 ) then
warn("[ERROR]<%s> record update: rc(%s)", meth,tostring(rc));
error("ERROR updating the record");
end
rc = 0; -- safety, in case rc == nil. But, no error here.
-- client側に戻り値を返す
return rc;
end -- update_stats()
- UDFの登録 いくつか方法がある。 サーバ上にluaスクリプトを配置し、AQLかascliからファイル指定で登録
aqlの例
# UDF登録
$ register module 'udf/stats.lua'
OK, 1 module added.
# UDF確認(うろ覚え)
$ show modules
+---------------------------+-------+------------------------+
| module | type | hash |
+---------------------------+-------+------------------------+
| "stats.lua" | "lua" | "033671e05067888fce09" |
+---------------------------+-------+------------------------+
# UDF実行
$ execute list_example.push(399) on namespace.set where PK = 'PK001'
Stream UDF
ストリーム集約の用途に使用する。
ここで言うストリームとは、Aerospikeサーバ内に存在する複数行レコードリストを指します。
Aerospikeのデータ問い合わせ結果がクライアント側に全て揃うのを待つこと無く、サーバ側でフィルタや集約を並行で行えます。
セカンダリインデックスを使用した抽出クエリ結果の変換や集約の用途が考えられます。
制約として、Setやrecordに対し、createやupdateを行うことはできません。
レコードUDFと違い、こっちは使ったこと無いので使用感はちょっとわからないけど便利そうな気はします。実際のアプリケーションで使うかは別として。
公式のサンプルコード
function avg_age(stream)
local function female(rec)
return rec.gender == "F"
end
local function name_age(rec)
return map{ name=rec.name, age=rec.age }
end
local function eldest(p1, p2)
if p1.age > p2.age then
return p1
else
return p2
end
end
return stream : filter(female) : map(name_age) : reduce(eldest)
end
実行
$ AGGREGATE profile_aggregator.avg_age() ON users.profiles WHERE age BETWEEN 20 and 29
UDFまとめ
RecordUDFを使ってデータの登録と中間集計を同時に行うことができそう
StreamUDFを使って大量なクエリ結果からのデータ分析を高速に行うことができそう
以前はなんとなくアプリケーション側から見て無邪気に使えるKVSとして使っていたけど、運用まで考えると
おわり