まえがき
前までに説明したDBへの操作をそれぞれ実装します。
クエリ説明
まずは実装するクエリの種類を説明する。トランザクションそのものを操作する4つの操作。
- begin_tx - トランザクションを開始する。
- commit_tx - トランザクションをコミットする。ローカルデータを共有データへマージする。
- rollback_tx - トランザクションをロールバックする。ローカルデータを破棄する。
- allow_tx - 自分のトランザクションが許可されている場合はokを返す。
次にDDLとDML。説明はなし。
- create_table
- drop_table
- insert
- select
- update
- delete
使い方は前にも示したが、begin_tx → create_table → insert → select → update → commit_tx のように、トランザクションを開始し、テーブルを操作し、そしてトランザクションをコミットして反映する。
サーバ構成
実装の話へと深堀していく。まずは、簡単にサーバの構成を示しておく。サーバとはErlang/OTPで実装するgen_serverのことである。メッセージを待ち受け、受け取ったメッセージに従って処理を行うプロセスというとわかりやすい。サーバは3つ用意する。(命名が下手・・・)
- simple_db_server - 今まで実装してきたシンプルデータベース
- query_exec - クエリを受け取って実行するサーバ
- tx_mng - トランザクションを管理するサーバ
図にすると下のようになる。線でつながってるサーバ同士がメッセージをやりとりする。クライアントからのメッセージはquery_execが処理をする。query_execはクライアントから受け取ったクエリに対して処理を行う。
begin_txを行った時のサーバ間やりとりのイメージ
例えば、トランザクションを開始するbegin_txを行った時のサーバ間のメッセージやりとりを説明する。
①クライアントからquery_execへトランザクションを開始するというメッセージを送る。
②query_execからtx_mngへトランザクションを開始するというメッセージを送る。
③tx_mngはトランザクションIDを振り出し、query_execへ返却する。
④query_execからクライアントへトランザクションIDを返却する。
tx_mngで行う処理の詳細は別記事に後述する。
ここでは、各サーバはメッセージを待ち受けており、メッセージが届いたらその内容に基づいて処理を行い、その結果を返却するという動作をすることを理解してもらいたい。
DMLの実装説明
登場人物が出揃ったところで、各DMLの実装について説明する。
insert
早速insertのソースを説明する。クライアントからは「このテーブル(TableName)にこのデータ(Val)を挿入してちょうだい」というメッセージ{insert, TableName, Val}が飛んでくる想定である。
handle_call({exec_query, {insert, TableName, Val}}, _From, State)->
%% トランザクションが許可されている場合のみ次に進める
case ask_transaction(State) of
transaction_not_found ->
{reply, transaction_not_found, State};
ok ->
QueryId = generate_query_id(),
ObjectId = generate_object_id(),
case check_table_exists(TableName, Val) of
true ->
local_insert_data(State, QueryId, TableName, ObjectId, Val),
{reply, QueryId, State#state{queryId = get_query_id_list(State) ++ [QueryId]}};
_ -> {reply, table_not_found, State}
end
end;
- ask_transaction関数は、tx_mngサーバに対してトランザクションがアクティブかどうかを質問するためのチェック関数である。okが返ってきたらクエリを実行する。以降どのクエリの実装でも出てくる。内部仕様は別記事で記載する。
- QueryIdは、クエリを識別するためのIDである。トランザクションをコミットするときに、積み上がったクエリを識別するために利用する。
- ObjectIdは、シンプルデータベースの実装の時にも使ったオブジェクトのIDである。
local_insert_dataが肝である。ローカル領域にデータを挿入する。
local_insert_data(State, QueryId, TableName, Oid, Val) ->
LKvstore = get_local_kvstore(State),
LColumnIndex = get_local_column_index(State),
ColList = simple_db_server:get_column_list(TableName),
if
ColList =:= not_found -> not_found;
true ->
lists:map(fun({ColName, ColVal}) ->
ets:insert(LColumnIndex, {QueryId, ins, TableName, ColName, ColVal, Oid}) end,
lists:zip(ColList, Val)),
ets:insert(LKvstore, {QueryId, ins, TableName, Oid, Val})
end.
LKvstoreとLColumnIndexは、サーバのStateからローカル領域のets(Erlangのメモリ上ストレージ)を取得し代入します。下の部分がローカルのカラムインデックスを作成する処理である。ローカルカラムインデックスに投入するデータは{QueryId, ins, TableName, ColName, ColVal, Oid}であり、2番目のinsというタグをつけて登録するところがミソである。ローカル領域ではどういう操作をしたか記憶しておく必要があるためである。
lists:map(fun({ColName, ColVal}) ->
ets:insert(LColumnIndex, {QueryId, ins, TableName, ColName, ColVal, Oid}) end,
lists:zip(ColList, Val)),
例えば、フルーツテーブル(名前と値段の列を持つ)に[みかん、100円]という行を挿入するとする。
そのとき、ローカルカラムインデックスには下記のようなデータを登録する。
- {0001, ins, Fruit, name, みかん, A}
- {0001, ins, Fruit, price, 100円, A}
続いて、ローカルキーバリューストアの挿入である。
ets:insert(LKvstore, {QueryId, ins, TableName, Oid, Val})
先の例では下記のようなデータを登録する。
- {0001, ins, Fruit, A, [みかん, 100円]}
クライアントへの返り値はクエリIDであり、同時にサーバのStateにクエリIDを追記する。
{reply, QueryId, State#state{queryId = get_query_id_list(State) ++ [QueryId]}}
select
データの参照の部分の実装を説明する。クライアントからは「このテーブル(TableName)のこのカラム(ColName)がこの値(Val)になっているデータを取ってきてちょうだい」というメッセージ{select, TableName, ColName, Val}が飛んでくる想定である。データの取得はシンプルデータベースと同じく、カラムインデックスから抽出対象となるオブジェクトIDリストを取得してきて、それを使ってキーバリューストアからデータを取得する。
handle_call({exec_query, {select, TableName, ColName, Val}}, _From, State)->
%% トランザクションが許可されている場合のみ次に進める
case ask_transaction(State) of
transaction_not_found ->
{reply, transaction_not_found, State};
ok ->
QueryIdList = get_query_id_list(State),
%% オブジェクトIDを取得する
OidList = select_object_id_list(State, TableName, ColName, Val, QueryIdList),
%% オブジェクトIDから値を取得する
{reply, select_data(State, TableName, OidList, QueryIdList), State}
end;
QueryIdListは、このトランザクションで実行してきたクエリのIDリストである。クエリのIDリストを使ってオブジェクトIDリストを取得する処理が次である。
select_object_id_list(State, TableName, ColName, Val, QueryIdList) ->
%% 共有データを検索
ShareData = simple_db_server:select_column_index(TableName, ColName, Val),
%% QueryIdListの順にローカルデータを検索してマージする
lists:foldl(fun(QueryId, SData) -> merge_local_index(State, TableName, ColName, Val, SData, QueryId) end,
ShareData, QueryIdList).
シンプルデータベースであれば、カラムインデックスに対して、カラム名と値を使ってオブジェクトIDを検索してくれば良いが、今回は共有カラムインデックスに検索しに行って、その結果にさらにローカルカラムインデックスの検索結果を上被せする処理となる。
lists:foldl(fun(QueryId, SData) -> merge_local_index(State, TableName, ColName, Val, SData, QueryId) end,
ShareData, QueryIdList).
畳み込みを使って、クエリIDのリストを一つずつ取ってはローカルデータを検索してマージしていく。オブジェクトIDのリストが取れたら、データを取得する。こちらも共有データからデータを取得して、ローカルデータを上被せする(merge_local_data)。
select_data(State, TableName, OidList, QueryIdList) when is_list(OidList)->
lists:map(fun(Oid) -> select_data(State, TableName, Oid, QueryIdList) end, OidList);
select_data(State,TableName, Oid, QueryIdList) ->
ShareData = simple_db_server:select_kvstore(TableName, Oid),
io:format("ShareData: ~p~n", [ShareData]),
lists:foldl(fun(QueryId, SData) -> merge_local_data(State, SData, Oid, QueryId) end,
ShareData, QueryIdList).
上のselect_data関数はガード文がついており、オブジェクトIDリストがリスト形式の時に実行される。その処理の中身はリストに対するマップ処理で、下の方のselect_dataを呼び出す。こういう書き方にしておけば、オブジェクトIDがリストでない時は下の方の関数が直接呼び出されるし、リストの時は結果をリストにして返せるので便利な書き方と思って使っている。Erlang的にどうなのかはよくわからない。。
update
次回説明
delete
次回説明
あとがき
久しぶりの投稿となってしまいましたが、地道に実装してきたクエリの実装について説明しました。長くなってしまったので、update, deleteは次回の記事に書きます。