LoginSignup
0
2

More than 5 years have passed since last update.

Aerospike(4) UDFって便利

Last updated at Posted at 2016-11-27

UDFでもっと効率厨に

Aerospikeは定義済み関数を作れます。
ストアドプロシージャ的なものですかね。

関数はLua言語でスクリプトを作成し、Aerospikeサーバに登録して使います。

UDFは用途によって大きく二種類に分かれます

Record UDF

keyに対するレコードに定義した処理を行う。
他のレコードや、他Setに対しての操作や参照を行うことはできない。

例:aerospike 公式のサンプルコード

  • UDFのコード (lua スクリプト)
stats.lua
-- 最初の引数はrecにする決まり 以降の引数はいくつでも定義できる
function update_stats( rec, newValue )
  local meth = "update_stats()";
  local rc = 0;

  if( not aerospike:exists( rec ) ) then
    -- 1レコード目の処理
    -- レコードを新規作成し、各Binに初期値を入れる
    rc = aerospike:create( rec );
    if( rc == nil or rec == nil ) then
      error("ERROR creating record");
    end
    rec["MaxValue"]   = newValue;
    rec["MinValue"]   = newValue;
    rec["Sum"]        = newValue;
    rec["AveValue"]   = newValue;
    rec["ValueCount"] = 1;
  else
    -- 既にレコードが存在する場合
    -- 各集計値を計算し Binに設定
    local sum = 0;
    local count = 1;
    if( rec["MaxValue"] == nil or newValue > rec["MaxValue"] ) then
      rec["MaxValue"] = newValue; -- write the value if new or changed
    end
    if( rec["MinValue"] == nil or newValue < rec["MinValue"] ) then
      rec["MinValue"] = newValue; -- write the value if new or changed
    end
    if( rec["ValueCount"] == nil ) then
      count = 1;                  -- this is the first one
    else
      count = rec["ValueCount"];
      count = count + 1;          -- increment the value
    end
    if( rec["Sum"] == nil or type(rec["Sum"]) ~= "number" ) then
      sum = newValue;
    else
      sum = rec["Sum"];
      sum = sum + newValue;       -- Notice that we do NOT do this: rec["sum"] = rec["sum" + 1;
    end
    rec["ValueCount"] = count;
    rec["Sum"] = sum;
    rec["AveValue"] = sum / count;
  end

  -- レコードの更新処理
  rc = aerospike:update( rec );
  if( rc ~= nil and rc ~= 0 ) then
    warn("[ERROR]<%s> record update: rc(%s)", meth,tostring(rc));
    error("ERROR updating the record");
  end
  rc = 0; -- safety, in case rc == nil.  But, no error here.
  -- client側に戻り値を返す
  return rc;
end -- update_stats()
  • UDFの登録 いくつか方法がある。 サーバ上にluaスクリプトを配置し、AQLかascliからファイル指定で登録

aqlの例

# UDF登録
$ register module 'udf/stats.lua'
OK, 1 module added.

# UDF確認(うろ覚え)
$ show modules
+---------------------------+-------+------------------------+
| module                    | type  | hash                   |
+---------------------------+-------+------------------------+
| "stats.lua"               | "lua" | "033671e05067888fce09" |
+---------------------------+-------+------------------------+

# UDF実行
$ execute list_example.push(399) on namespace.set where PK = 'PK001'

Stream UDF

ストリーム集約の用途に使用する。
ここで言うストリームとは、Aerospikeサーバ内に存在する複数行レコードリストを指します。
Aerospikeのデータ問い合わせ結果がクライアント側に全て揃うのを待つこと無く、サーバ側でフィルタや集約を並行で行えます。

セカンダリインデックスを使用した抽出クエリ結果の変換や集約の用途が考えられます。
制約として、Setやrecordに対し、createやupdateを行うことはできません。

レコードUDFと違い、こっちは使ったこと無いので使用感はちょっとわからないけど便利そうな気はします。実際のアプリケーションで使うかは別として。

公式のサンプルコード

function avg_age(stream)

  local function female(rec)
    return rec.gender == "F"
  end

  local function name_age(rec)
    return map{ name=rec.name, age=rec.age }
  end

  local function eldest(p1, p2)
    if p1.age > p2.age then
      return p1
    else
      return p2
    end
  end

  return stream : filter(female) : map(name_age) : reduce(eldest)
end

実行

$ AGGREGATE profile_aggregator.avg_age() ON users.profiles WHERE age BETWEEN 20 and 29

UDFまとめ

RecordUDFを使ってデータの登録と中間集計を同時に行うことができそう
StreamUDFを使って大量なクエリ結果からのデータ分析を高速に行うことができそう

以前はなんとなくアプリケーション側から見て無邪気に使えるKVSとして使っていたけど、運用まで考えると

おわり

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2