はじめに
RabbitMQでは、耐障害性のあるキューとしてquorum queue
が用意されています。既存の classic mirrored queue
の後継にあたる機能で、classic mirrored queue の方は v4.0で廃止予定です。
classic mirrored queueからquorum queueへの移行で考慮すべき差分として、優先度付きキューが使えなくなるという点があげられます。
quorum queueへの優先度付きキュー実装については要望が多く、discussionにも多くの意見が寄せられていました。
この時点では優先度付きキューの実装はしない予定1でしたが、v4.0のquorum queueエンハンスのissueを見ると実装予定機能に
- Fair share high/low priorities using https://github.com/kjnilsson/hiloq/)
という項目がありました。
本記事ではこの優先度付きキューの実装として挙げられているモジュール hiloq
を触ってみます。
本記事は2024年6月時点の情報です。今後変更されるかもしれませんのであらかじめご了承ください。
2024/10/27追記: 9月にv4.0がリリースされ、予定通り優先度付きキューが組み込まれました。
hiloq
hiloqは名前の通り「hi」と「lo」の優先度を持つキューです。Erlangで書かれています(RabbitMQもErlang製)。
プッシュ
Erlangのオブジェクトはimmutableなため、操作をする代わりに戻り値に操作後のキューを返します。
% モジュール読み込み
1> c(hiloq).
{ok,hiloq}
% キュー初期化
2> Q0 = hiloq:new(2).
{hiloq,{[],[]},{[],[]},0,2,0}
% push
3> Q1 = hiloq:in(hi, hi1, Q0).
{hiloq,{[hi1],[]},{[],[]},1,2,0}
4> Q2 = hiloq:in(hi, hi2, Q1).
{hiloq,{[hi2],[hi1]},{[],[]},2,2,0}
5> Q3 = hiloq:in(lo, lo1, Q2).
{hiloq,{[hi2],[hi1]},{[lo1],[]},3,2,0}
ポップ
ポップの戻り値は {{value, 値}, ポップ後のキュー}
の形式です。hi
のキューに値が残っている場合、lo
より優先して取り出されます。
6> {{value, V1}, Q4} = hiloq:out(Q3).
{{value,hi1},{hiloq,{[],[hi2]},{[lo1],[]},2,2,1}}
7> V1.
hi1
8> {{value, V2}, Q5} = hiloq:out(Q4).
{{value,hi2},{hiloq,{[],[]},{[lo1],[]},1,2,2}}
9> V2.
hi2
10> {{value, V3}, Q6} = hiloq:out(Q5).
{{value,lo1},{hiloq,{[],[]},{[],[]},0,2,0}}
11> V3.
lo1
12> {{value, V4}, Q7} = hiloq:out(Q6).
** exception error: no match of right hand side value empty
weight
基本hiを優先すればよいのですが、常にhiから取り出しているといつまでもloの値が溜まってしまう可能性があります。
hiloqでは、n回おきにloを優先することでこの問題を回避しています。
1> c(hiloq).
{ok,hiloq}
% 1回おきにloを優先する
2> Q0 = hiloq:new(1).
{hiloq,{[],[]},{[],[]},0,1,0}
% 値のプッシュ
3> Q1 = hiloq:in(hi, hi1, Q0).
{hiloq,{[hi1],[]},{[],[]},1,1,0}
4> Q2 = hiloq:in(hi, hi2, Q1).
{hiloq,{[hi2],[hi1]},{[],[]},2,1,0}
5> Q3 = hiloq:in(lo, lo1, Q2).
{hiloq,{[hi2],[hi1]},{[lo1],[]},3,1,0}
% ポップ
6> {{value, V1}, Q4} = hiloq:out(Q3).
{{value,hi1},{hiloq,{[],[hi2]},{[lo1],[]},2,1,1}}
7> V1.
hi1
% hiにまだ中身があるがいったんloを優先
8> {{value, V2}, Q5} = hiloq:out(Q4).
{{value,lo1},{hiloq,{[],[hi2]},{[],[]},1,1,0}}
9> V2.
lo1
% ふたたびhiを優先
10>
10> {{value, V3}, Q6} = hiloq:out(Q5).
{{value,hi2},{hiloq,{[],[]},{[],[]},0,1,1}}
11> V3.
hi2
RabbitMQに組み込まれる際には、この値は固定値 or 設定可能な値になりそうです。
(2024/10/27追記) weightは固定値2
9月にv4.0がリリースされ、hiloqとほぼ同じ仕組みの優先度付きキューが実装されました。
weightは固定値 2
(rabbitmqctl
等で変更もできなさそう)なので、メッセージが溜まっていたら
- hi
- hi
- lo
- hi
- hi
- lo
- ...
という順番に取り出されます。
実装
複雑な動作に見えますが、実装は100行に満たないシンプルなものでした。
値を入れる
hiloqのキューは内部に hi
lo
キューを持っており、in
では指定した方のキューに値が入ります。
-spec in(hi | lo, term(), state()) -> state().
% hiを指定した場合
in(hi, Item, #?MODULE{hi = Hi, len = Len} = State) ->
State#?MODULE{hi = queue:in(Item, Hi),
len = Len + 1};
% loを指定した場合
in(lo, Item, #?MODULE{lo = Lo, len = Len} = State) ->
State#?MODULE{lo = queue:in(Item, Lo),
len = Len + 1}.
各キューには標準ライブラリのqueueを使用しています。
上記の例でも見たようにキューのデータ構造はimmutableなので、値を挿入する代わりに「値が挿入された状態のキュー」を戻り値で返します。
値を取り出す
続いて優先度付きキューの要である out
です。こちらは in
よりも実装が複雑です。
out(#?MODULE{len = 0}) ->
empty;
out(#?MODULE{hi = Hi0, lo = Lo0,
len = Len, dequeue_counter = C,
weight = W} = State) ->
case W == C of
true ->
%% try lo before hi
case queue:out(Lo0) of
{empty, _} ->
{{value, _} = Ret, Hi} = queue:out(Hi0),
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = 0,
len = Len - 1}};
{Ret, Lo} ->
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = 0,
len = Len - 1}}
end;
%% ★
false ->
case queue:out(Hi0) of
{empty, _} ->
{{value, _} = Ret, Lo} = queue:out(Lo0),
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = C + 1,
len = Len - 1}};
{Ret, Hi} ->
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = C + 1,
len = Len - 1}}
end
end.
最初に呼び出されるのは上記 ★
の分岐です。hi
のキューに中身があれば hi
のキューから取り出し、なければ代わりに lo
のキューから取り出します。
case queue:out(Hi0) of
%% hiが空ならloから取得
{empty, _} ->
{{value, _} = Ret, Lo} = queue:out(Lo0),
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = C + 1,
len = Len - 1}};
%% そうでなければhiから取得
{Ret, Hi} ->
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = C + 1,
len = Len - 1}}
end
この際、キューの取り出しと同時に dequeue_counter
(= C
) を1増やします。このカウンターが溜まり weight
(= W
) と同じになったタイミングでは、逆に lo
の中身を優先して取り出します。
case queue:out(Lo0) of
{empty, _} ->
{{value, _} = Ret, Hi} = queue:out(Hi0),
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = 0,
len = Len - 1}};
{Ret, Lo} ->
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = 0,
len = Len - 1}}
end;
こうすることで、 weight
で指定した回数おきに lo
の中身も取り出されていたという仕組みです。
おわりに
以上、hiloqモジュールの紹介でした。RabbitMQ4.0がリリースされた際には実際に組み込まれている箇所も見てみたいと思います。
-
メンテナーの回答としては「quorum queueと優先度付きキューを同時に実現すると実装が複雑化すること」、「既存のclassic mirrored queueはデータ保護の観点で欠陥のある設計だったため、実質普通のキューで置き換えても変わらないこと(※4.0以降も普通のキューは優先度を使用可能)」からquorum queueに優先度を実装する予定は無いとしていました。 ↩