考え方と原理
私のロック魂をアピールさせてください!(多田李衣菜)
RedisでSETNXを使えば簡単に排他ロックが実現できる。MSETNXを使えば、複数のロックを同時に評価できる。それは良いのだが、並行実行性を考えると、なるべく競合しない方が良いよね。データ構造でも、競合しにくいように考えていると思うので、制御の方でも競合しにくい作りにしよう。
その代表が、Reader-Writer-Lock。読み出しはいくらでも競合したまえ。書き込みは別だ。というやつだね。賢いロックの賢いついでにreaderlock獲得中のコンテクストが、writerlockに昇格できるってのいいかも、と思ったけど、それ、ちょっと考えたら、同じことを考えてる奴が複数いたら、デッドロックになるじゃんね? 書き込みをしたい人が、書き込みをするための情報を得るために読み出す場合、読み出しの段階からwriterlock獲得しとかなきゃいけないね。注意注意。一応、writerlockに昇格できるreaderlockをまた別のprewriter-readlockとかにして、prewriter-readlock同士は排他するようにすればそういうの出来るけどさ、どうだろね。ややこしすぎるよね。これは最終的に実装するかどうか微妙。リードライトロック以外の何者かになるしね。
と、書いたけど、prewriter-readlock採用。微妙だけど、WriteLockはコストが高いから、その前の読み出しの期間はやっぱりロック状態にしておきたくない。そうすると、下記のようなパターンになる。こうすると、読み込みのみのコンテクストは限界まで阻害されにくくなるので、writerlockに予約機能を持たせて「writerlockにも必ず出番が回ってくる」ことを保証する仕様とのバランスが良くなる。つまり、readerにとっては、writerがのべつまくなし連続して到達しても、prewrite-readlockの間にクリティカルセクションに進入してしまえば良いということになる。ね、うまくできてるよね。いわば「三相Reader-Writer-Lock」ということになると思うのだが、これ、うまいやり方だから先行研究があると思ったら発見できなかった。こんなプリミティブなアイディア、先行研究がないわけないので、誰か教えてえろいひと。
書き込みのパターン
prewrite-readlock
読み込み(書き込みのための情報を得る)
writerlock
書き込み
writerunlock
prewrite-readunlock
さて、まずは仮想コードだ
とりあえず、仮想コード的に考え方だけ書いておく。 時間が空いたらJavascriptで書き直す。 時間は大して空いてはいないものの、春アニメが一段落した隙間で何とか書いてみた。色んな実装の方法が有りうるが、下記に示したように、writerlockが予約できるような形にしたほうがいい。というのは、readerlockの波状攻撃が止まず、いつまでもwriterlockが獲得できない、という事件が起きうるからだ。(それでも、C++でマルチスレッドだったら、まず短時間のspinlockを試してみてから離散的なポーリングをするようにしておけば、かいくぐれたりするのだが、Node.js想定なので)
実装は単純明快で、基本的にINCR族のコマンドとGETだけで実現できる。INCR族がアトミックだというのは素晴らしいね。ただ、実装が単純な代わりに、readerlockの同時獲得数に制限があるという、いつものゲーム脳だが、その辺は勘弁してくれ。何となれば、閾値を百万とかにしてくれればいい。
reader
LOCK
INCR lock して返値が99999以上なら、DECR lock して、(心持ち長めに)待ってリトライ
リトライ時は、まずGETしてみて返値が99999未満なら、INCRからやり直す。99999以上ならさらに待つ
UNLOCK
DECR lock
prewrite-read
LOCK
INCRBY lock,50000して、返値が99999以上なら、DECRBY lock,50000して、待ってリトライ
リトライ時は、まずGETしてみて返値が49999未満なら、INCRからやり直す。49999以上ならさらに待つ
UNLOCK
DECRBY lock,50000
writer
LOCK
INCRBY lock,100000して、返値が
【再試行(writerlock予約済 or 獲得済)】
150000+自己prewrite以上なら、DECRBY lock,100000して、待ってリトライ
【ロック予約】
100001+自己prewrite以上なら、小リトライ
小リトライは、そのまま GET lock を(しつこめに)ポーリングし、
(このポーリングは非同期関数3段重ねとかで良いと思うよ)
返値が 100000+自己prewriteになったら通過
【獲得】
100000+自己prewriteなら、通過
UNLOCK
DECRBY key,100000
ロッキング動作のまとめ
上記のようにすれば、概ね使い物になるロック機構ができあがる。
- readerlockは、writerlockが獲得もしくは予約されていなければ、複数(49999件まで)同時に獲得できる
- prewrite-readlockは、writerlockが獲得もしくは予約されておらず、他のprewrite-readlockが獲得されていなければ、獲得できる
- writerlockは、writerlockが獲得もしくは予約されておらず、他のprewrite-readlockが獲得されていなければ、
- readerlockが獲得されていれば、writerlockを予約して、readerlockが外れるまで待つ
- readerlockが獲得されていなければ、writerlockを獲得する
という動作になる。つまり、readerlockは、writerlockさえ無ければ同時にたくさん獲得できるが、prewrite-readlockとwriterlockは同時に合わせて1つまでしか獲得できない。
とても混んでいる時のためのヒント
賢明なる読者の皆さんはお気づきだろうが、writerlockが本当は獲得できたはずなのに、readerlockがちょこちょこちょっかいを出してくるせいで小リトライからなかなか抜けられない、という問題は起きうる。もしそうなったら、readerlockリトライ中の皆さんがリトライ開始時にINCRし、リトライを終えたらDECRするキーを別に用意して、それが閾値を超えたら初回ロック時からまずINCRではなく、取りあえずGETで様子を見てからにする、という解決策があり得る。
基本的に、readerが溜まっても、writerlockが外れれば一気に捌けるので、とことんwriter優遇でよいのだ。まあ、良く実験することが大事なんだけどね。
上記の三相reader-writer-lockでこの問題には片がついた、はずだよね?
で、例の木構造との兼ね合いなのだが
そもそもRedisでロック機構を作ろうと思った理由は、例の「Redisでツリー構造を扱うなら「入れ子区間モデル」でOK」のためなので、実数区間に対して選択的にロックをかけることができるようにしたいわけだ。これは、応用範囲の広いものでは無いけれど、これもまたかなりプリミティブなアイディアであるし、意味のあることだろう。
rwRangeLock
rangelockのためのメタロックを三相reader-writerlockで行い、それをWRITER-META-{LOCK,UNLOCK}, PRW-META-{LOCK,UNLOCK}, READ-META-{LOCK,UNLOCK}と表す。RangeLockそのものは、ZSETで管理し、PWR-ZSET, WRITER-ZSET, READER-ZSETの3つに対して、name:left, name:rightを登録する。
rangewriterlock
PWR-META-LOCK
ロックしたい区間が、PWR-ZSET, WRITER-ZSETと重複しないことを確認
OK
WRITER-META-LOCK
WRITER-ZSETにロックしたい区間を追記
WRITER-META-UNLOCK
PWR-META-UNLOCK
READER-ZSETをポーリングし、ロックしたい区間と重複が無くなれば、ロック完了
NG
PWR-META-UNLOCK
冒頭からリトライ
rangewriterunlock
WRITER-META-LOCK
WRITER-ZSETからロックしていた区間を削除
WRITER-META-UNLOCK
rangepwrlock
PWR-META-LOCK
ロックしたい区間が、PWR-ZSET, WRITER-ZSETと重複しないことを確認
OK
WRITER-META-LOCK
PWR-ZSETにロックしたい区間を追記
WRITER-META-UNLOCK
PWR-META-UNLOCK
ロック完了
NG
PWR-META-UNLOCK
冒頭からリトライ
rangepwrunlock
WRITER-META-LOCK
PWR-ZSETからロックしていた区間を削除
WRITER-META-UNLOCK
rangereaderlock
READER-META-LOCK
WRITER-ZSETとロックしたい区間が重複しないことを確認
OK
READER-ZSETにロックしたい区間を追記
READER-META-UNLOCK
ロック完了
NG
READER-META-UNLOCK
冒頭からリトライ
rangereaderunlock
READER-ZSETからロックしていた区間を削除
こんな感じ。このままでは、変な使い方をすると嵌まる、という意味ではもう少しコードセキュリティを考えた実装にしなきゃいかん気もするけど、ライブラリ内使用ならこんな感じではなかろうか。
さらに機能拡張の展望
なお、色々試してみて思ったのは、ロックを外さずに範囲を縮小する機能が欲しい、ということ。拡大だと、下手をするとデッドロックの原因になるが、縮小なら問題無い。外して狭い範囲で再びロック、ということをすると割り込まれるおそれがあるし、かといって注視する範囲が狭まったのに広いロックのままでは粒度が大きくなる。有効かどうかはコンフリクトの頻度次第ではあるんだけどね。
あとは、せっかくコールバック大好きNode.jsでやっているので、ロックを要求して駄目だったときの予備の仕事をコールバックで渡せると良いかな、と思った。ロックが取得できたらロックオブジェクトの中にしまっておいて、Unlock後に呼び出してやる感じ。予備の仕事内でロックを使うとやばいことが起きるから、コードセキュリティ的にどうなの、というのはあるけどね。デバッグ用にもなんか便利そう。
後者は「いずれやるかも」程度の話だと思ってちょんまげ。というわけで、いよいよ実コードに入ろう。サンプルコードは、全部繋げると、使えるコードになる。なお、GitHubにも上げたので、そっちもご覧あれ。使ってみようと思う方は npm install redis-readwritelock でどぞ。
では、実装行ってみよう!
ずいぶん予定よりも遅くなってしまった。面目ない。
まずは初期化まで
ロック名の記録をしているのが実は大事で、アプリケーションを中断したとき、ロックが残っていると、redisに永続化されてしまう。クラスタの場合の起動時には、initが全員完了するまで待ち合わせする必要があるとか、クラスタに追加する場合にはinitが実行されちゃダメとか、色々この辺は使い方に応じて変えなきゃならん。このコードでは次回実行時に前回のゴミを掃除するようになっているけど、上記の諸々の難点を考えると、結構悩ましい。結局、シグナルハンドラ内でgraceful-shutdownフラグみたいなのを立てて、接続している皆様が全員後始末するのを待って終了するのがベストだと思うよん。途中参加でinitをサボらなきゃいかんというのは、すぐできることだから後で対応するけどね(とゆーか、引数にフラグ付けて利用者任せにするつもり)
競合時に「ランダムな時間待つ」というのは、気が進まないがしょうがない。いわずもがな、ライブロック防止策だ。しかも、こいつときたら、ポーリングの度にRedisを叩くので間隔は開けたい。が、開けすぎるとパフォーマンスが酷くなる。なので、一定間隔まで開いたら振り出しに戻る方式にした。Nodeっぽい書き方をするなら、EventEmitterで待ち組を揺さぶり起こすのが定石だが、タイミング問題が怖いし(競合検出~イベントハンドラ登録の隙間でコンテクストを一度手放すので、その隙間に、競合解消&イベント着火が行われるリスクがあるってことね)どうせ他サーバからもRedis叩いたりすると、結局ポーリングも混用せざるを得ないことを考えて、そこは手を抜いた。テストコードみたいな苛烈な競合は、実用上起こらんだろうしね。
cs = require('control-structures')
redis = require("redis")
rcli = redis.createClient()
lockconfig = {}
locks = {}
rwlocks = {}
rangelocks = {}
uselocks = ['redislock:keys', '']
#
# 使用したロック名の記録
#
regist_uselocks = (lockname)->
uselocks[1] = lockname
rcli.SADD uselocks,->return
#
# ランダムな時間待つ(ロック競合解決のため)
#
randomwait = (waittimeobj, next)->
if waittimeobj.time >= lockconfig.waitmax
waittimeobj.time = lockconfig.waitmin
else
waittimeobj.time *= Math.random()+Math.random()+Math.random()
setTimeout next, Math.floor(waittimeobj.time)
#
# デバッグ用ログ出力
#
locklog = (lockobj, message)->
if lockconfig.log
logstr = ''
if /^wait/.test(message) and not lockconfig.logwait
return
if /^shrink/.test(message) and not lockconfig.logshrink
return
switch lockobj.type
when 'simple'
if lockconfig.logsimple
logstr = lockobj.obj+'['+lockobj.type+']'
else return
when 'multi'
if lockconfig.logsimple
logstr = lockobj.obj+'['+lockobj.type+']'
else return
when 'reader'
if lockconfig.logrw
logstr = lockobj.obj+'['+lockobj.type+']'
else return
when 'pwr'
if lockconfig.logrw
logstr = lockobj.obj+'['+lockobj.type+']'
else return
when 'writer'
if lockconfig.logrw
logstr = lockobj.obj+'['+lockobj.type+']'
else return
when 'rangereader'
if lockconfig.logrange
logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
else return
when 'rangepwr'
if lockconfig.logrange
logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
else return
when 'rangewriter'
if lockconfig.logrange
logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
else return
when 'rangewriter-upgrade'
if lockconfig.logrange
logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
else return
logstr += ' '+message
console.log logstr
#
# RangeLockのユニーク名のリセット
# (RangeLockにはユニーク名が必要だが、延々と足し続けるといつかバグる。
# 手遅れになる前に、良い感じのところで巻き戻す)
#
resetrangelockname = (lockobj, next)->
if lockobj.name > 99999999
rcli.ZADD [lockobj.live, lockobj.name, lockobj.name], (err,reply)->
rcli.ZRANGE [lockobj.live, 0, 0, 'WITHSCORES'], (err, replies)->
if replies[1] > 99999999
rcli.DECRBY [objectname+':uniquename', 99999999], (err,reply)->
next()
else
next()
###
初期化:前回起動時に使ったロックを全部削除する
###
module.exports.init = init = (config, next)->
lockconfig = config
lockconfig.waitmax = lockconfig.waitmax ? 200
lockconfig.waitmin = lockconfig.waitmin ? 4
lockconfig['log'] = true
lockconfig['logwait'] = false
lockconfig['logshrink'] = false
lockconfig['logsimple'] = false
lockconfig['logrw'] = false
lockconfig['logrange'] = false
if lockconfig.log
console.log 'init'
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
rcli.SMEMBERS ['redislock:keys'],(err,replies)->
replies.push 'redislock:keys'
rcli.DEL replies,(err,reply)->
next()
アンロック
アンロックは、ほとんどどのタイプでも同じ。注意すべきは、使い方のほうで、普通のReadWriteLockで、PreWrite-ReadLockをアップグレードしたWriterLockをアンロックする際は、PreWrite-ReadLockで使ったlockobjを使い回すけど、RangeReadWriteLockの時は、WriterLockで返却されるlockobjは別物なので、アンロックの際に使い分けないといけない点かな。
その辺の使い方はサンプルを見てね。
###
アンロックは共通
###
#
# ロックを外す
#
module.exports.unlock = unlock = (lockobj, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
if lockobj?.obj? and lockobj.type?
switch lockobj.type
when 'simple'
rcli.DEL lockobj.obj, (err, reply) ->
locklog lockobj, 'unlocked'
next()
when 'multi'
if lockobj.obj.length > 0
rcli.DEL lockobj.obj, (err, reply) ->
locklog lockobj, 'unlocked'
next()
next()
when 'reader'
rcli.DECR [lockobj.obj], (err, reply)->
locklog lockobj, 'unlocked'
next()
when 'pwr'
rcli.DECRBY [lockobj.obj, 50000], (err, reply)->
locklog lockobj, 'unlocked'
next()
when 'writer'
rcli.DECRBY [lockobj.obj, 100000], (err, reply)->
locklog lockobj, 'unlocked'
if lockobj.score == 150000
delete lockobj.score
lockobj.type = 'pwr'
next()
when 'rangereader'
locklog lockobj, 'unlocked'
args = [lockobj.live, lockobj.name]
rcli.ZREM args, (err, reply)->
args[0] = lockobj.robj
args[1] = lockobj.name+':left'
rcli.ZREM args, (err,reply)->
args[1] = lockobj.name+':right'
rcli.ZREM args, (err,reply)->
#時間が経ったときはユニーク名のリナンバー
resetrangelockname lockobj, next
when 'rangepwr'
locklog lockobj, 'unlocked'
writerlock lockobj.obj, (metalockobj)->
args = [lockobj.live, lockobj.name]
rcli.ZREM args, (err, reply)->
args[0] = lockobj.pwrobj
args[1] = lockobj.name+':left'
rcli.ZREM args, (err,reply)->
args[1] = lockobj.name+':right'
rcli.ZREM args, (err,reply)->
unlock metalockobj,->
#時間が経ったときはユニーク名のリナンバー
resetrangelockname lockobj, next
when 'rangewriter'
locklog lockobj, 'unlocked'
writerlock lockobj.obj, (metalockobj)->
args = [lockobj.live, lockobj.name]
rcli.ZREM args, (err, reply)->
args[0] = lockobj.wobj
args[1] = lockobj.name+':left'
rcli.ZREM args, (err,reply)->
args[1] = lockobj.name+':right'
rcli.ZREM args, (err,reply)->
unlock metalockobj,->
#時間が経ったときはユニーク名のリナンバー
resetrangelockname lockobj, next
when 'rangewriter-upgrade'
locklog lockobj, 'unlocked'
writerlock lockobj.obj, (metalockobj)->
args = [lockobj.wobj, lockobj.name+':left']
rcli.ZREM args, (err,reply)->
args[1] = lockobj.name+':right'
rcli.ZREM args, (err,reply)->
unlock metalockobj,->
#時間が経ったときはユニーク名のリナンバー
resetrangelockname lockobj, next
else
console.log 'unknown lockobj.type is found! ['+lockobj.type+']'
process.exit 1
else
console.log 'lockobj is empty!'
process.exit 1
で、シンプルな排他
これは、別になんのことはない、シンプルな排他。ひとまとまりの処理は、複数のロックを並べて使うとデッドロックを起こしかねないので、multilockを使うべきだろう。
###
単一ロック
###
#
# ロックする
#
module.exports.simplelock = simplelock = (objectname, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {type: 'simple', obj: objectname+':lock'}
cs._while []
,(_break,_next) ->
if not (objectname in locks)
locks[objectname] = true
regist_uselocks objectname+':lock'
rcli.SETNX [objectname+':lock', 'ok'], (err, reply) ->
if reply == 1
_break()
else
locklog lockobj, 'waiting'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
###
複数ロック
###
#
# ロックする
#
module.exports.multilock = multilock = (objectarray, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {}
lockobj['type'] = 'multi'
lockobj['obj'] = []
lockargs = []
for obj in objectarray
lockobj.obj.push obj+':lock'
lockargs.push obj+':lock'
lockargs.push 'ok'
if not (obj in locks)
locks[obj] = true
regist_uselocks obj+':lock'
cs._while []
,(_break,_next) ->
rcli.MSETNX lockargs, (err, reply) ->
if reply == 1
_break()
else
locklog lockobj, 'waiting'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
ReadWriteLock
三相ReadWriteLockのアルゴリズムは既に示したとおり。実装が下記のようになる。なるべくならせっかく三相にしているので、writerlockをかける前に、必ずpwrlockをかけるようにしてくれると、全体としてのパフォーマンスが良くなる。やはり、いきなりwriterlockだと、readerlockには分が悪いので、だいぶ後回しにされるのだ。
###
Readerロック
###
module.exports.readerlock = readerlock = (objectname, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {}
lockobj['type'] = 'reader'
lockobj['obj'] = objectname+':rwlock'
if not (objectname in rwlocks)
rwlocks[objectname] = true
regist_uselocks objectname+':rwlock'
args = [lockobj.obj]
rcli.INCR args, (err, reply)->
if reply >= 99999
rcli.DECR args, (err,reply)->
cs._while []
,(_break,_next) ->
rcli.GET args, (err,reply)->
if reply < 99999
rcli.INCR args, (err, reply)->
if reply < 99999
_break()
else
rcli.DECR args, (err,reply)->
locklog lockobj, 'waiting1'
randomwait waittimeobj, _next
else
locklog lockobj, 'waiting2'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
else
locklog lockobj, 'locked'
next lockobj
###
PreWrite-Readロック
###
module.exports.pwrlock = pwrlock = (objectname, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
lockobj = {}
lockobj['type'] = 'pwr'
lockobj['obj'] = objectname+':rwlock'
if not (objectname in rwlocks)
rwlocks[objectname] = true
regist_uselocks objectname+':rwlock'
waittimeobj = {time:lockconfig.waitmin}
args = [lockobj.obj, 50000]
rcli.INCRBY args, (err, reply)->
if reply >= 99999
rcli.DECRBY args, (err,reply)->
cs._while []
,(_break,_next) ->
rcli.GET [lockobj.obj], (err,reply)->
if reply < 49999
rcli.INCRBY args, (err, reply)->
if reply < 99999
_break()
else
rcli.DECRBY args, (err,reply)->
locklog lockobj, 'waiting1'
randomwait waittimeobj, _next
else
locklog lockobj, 'waiting2'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
else
locklog lockobj, 'locked'
next lockobj
###
Writerロック
###
module.exports.writerlock = writerlock = (objectname_or_lockobj, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
basescore = 0
if objectname_or_lockobj?
if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'pwr'
lockobj = objectname_or_lockobj
lockobj.type = 'writer'
lockobj['score'] = 150000
basescore = 50000
else
lockobj = {}
lockobj['type'] = 'writer'
lockobj['obj'] = objectname_or_lockobj+':rwlock'
lockobj['score'] = 100000
if not (objectname_or_lockobj in rwlocks)
rwlocks[objectname_or_lockobj] = true
regist_uselocks objectname_or_lockobj+':rwlock'
else
console.log 'objectname_or_lockobj is null.'
process.exit 1
waittimeobj = {time:lockconfig.waitmin}
replyvalue = 0
getargs = [lockobj.obj]
args = [lockobj.obj, 100000]
rcli.INCRBY args, (err, reply)->
replyvalue = reply
cs._ []
,(localnext)->
if replyvalue >= 150000+basescore
#writerlocked or pwrlocked
rcli.DECRBY args, (err,reply)->
cs._while []
,(_break,_next)->
rcli.INCRBY args, (err, reply)->
replyvalue = reply
if replyvalue >= 150000+basescore
rcli.DECRBY args, (err,reply)->
locklog lockobj, 'waiting1'
randomwait waittimeobj, _next
else
_break()
,->
localnext()
else
localnext()
,->
waittimeobj.time = lockconfig.waitmin
if replyvalue >= 100001+basescore
#readerlocked
cs._while []
,(_break,_next) ->
rcli.GET getargs, (err,reply)->
if reply <= 100000+basescore
_break()
else
locklog lockobj, 'waiting2'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
else if replyvalue == 100000+basescore
#OK
locklog lockobj, 'locked'
next lockobj
else
#arienai
console.log 'unknown error (reply = '+reply
process.exit 1
区間ReadWriteLock
さて、最終目的地である、区間ReadWriteLockだ。実数区間ごとに、readerlockもpwrlockもwriterlockも掛け放題だ。これができるので、入れ子区間モデルで木構造を管理すると美味しいぜ!ということになるのだ。ちなみに、pwrlockにwriterlockを重ねるとき、後者が前者の部分集合であれば良い。だから、例えば木のつなぎ替えをするときには、srcとdstを両方含む範囲にpwrlockをかけて、それぞれその中の必要な範囲にwriterlockをかけては外し、かけては外し、とやるのが良い。
そのへんは、いずれ、入れ子区間モデルのコードを完成させるときに実装する
isconflictrangelock = (targetobj, rangemin, rangemax, threshold, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
rcli.ZCOUNT [targetobj, rangemin, rangemax], (err,reply)->
if reply <= threshold*2
rcli.ZRANGEBYSCORE [targetobj, '-inf', rangemin], (err,replies)->
if replies.length == 0
next false
else
tablemem = 0
for reply in replies
if /\:left$/.test(reply)
tablemem += 1
else if /\:right$/.test(reply)
tablemem -= 1
else
console.log reply+' is an illegal range mark.'
process.exit 1
next (tablemem > threshold)
else
next true
###
RangeReaderロック
###
module.exports.rangereaderlock = rangereaderlock = (objectname, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {}
lockobj['type'] = 'rangereader'
lockobj['name'] = ''
lockobj['obj'] = objectname+':meta'
lockobj['min'] = rangemin
lockobj['max'] = rangemax
lockobj['robj'] = objectname+':readerlockzset'
lockobj['pwrobj'] = objectname+':pwrlockzset'
lockobj['wobj'] = objectname+':writerlockzset'
lockobj['live'] = objectname+':alivelockzset'
if not (objectname in rangelocks)
rangelocks[objectname] = true
regist_uselocks objectname+':readerlockzset'
regist_uselocks objectname+':pwrlockzset'
regist_uselocks objectname+':writerlockzset'
regist_uselocks objectname+':alivelockzset'
regist_uselocks objectname+':uniquename'
cs._ []
,(localnext)->
rcli.INCR [objectname+':uniquename'], (err,reply)->
lockobj.name = reply
localnext()
,->
cs._while []
,(_break,_next)->
readerlock lockobj.obj,(metalockobj)->
isconflictrangelock lockobj.wobj, rangemin, rangemax, 0, (isconflict)->
if not isconflict
args = [lockobj.robj, rangemin, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = rangemax
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
unlock metalockobj, ->
_break()
else
unlock metalockobj, ->
locklog lockobj, 'waiting'
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
###
RangeReaderロックの対象を縮小する
###
module.exports.rangereaderlock_shrink = rangereaderlock_shrink = (lockobj, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
if lockobj.type == 'rangereader' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
args = [lockobj.robj, rangemin, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = rangemax
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
lockobj.min = rangemin
lockobj.max = rangemax
locklog lockobj, 'shrinked'
next lockobj
else
console.log 'err'
process.exit 1
###
RangePWRロック
###
module.exports.rangepwrlock = rangepwrlock = (objectname, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {}
lockobj['type'] = 'rangepwr'
lockobj['name'] = ''
lockobj['obj'] = objectname+':meta'
lockobj['min'] = rangemin
lockobj['max'] = rangemax
lockobj['robj'] = objectname+':readerlockzset'
lockobj['pwrobj'] = objectname+':pwrlockzset'
lockobj['wobj'] = objectname+':writerlockzset'
lockobj['live'] = objectname+':alivelockzset'
if not (objectname in rangelocks)
rangelocks[objectname] = true
regist_uselocks objectname+':readerlockzset'
regist_uselocks objectname+':pwrlockzset'
regist_uselocks objectname+':writerlockzset'
regist_uselocks objectname+':alivelockzset'
regist_uselocks objectname+':uniquename'
cs._ []
,(localnext)->
rcli.INCR [objectname+':uniquename'], (err,reply)->
lockobj.name = reply
localnext()
,->
cs._while []
,(_break,_next)->
pwrlock lockobj.obj,(metalockobj)->
isconflictrangelock lockobj.pwrobj, lockobj.min, lockobj.max, 0, (isconflict)->
if not isconflict
isconflictrangelock lockobj.wobj, lockobj.min, lockobj.max, 0, (isconflict)->
if not isconflict
writerlock metalockobj,(metalockobj)->
args = [lockobj.pwrobj, lockobj.min, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = lockobj.max
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
unlock metalockobj, ->
unlock metalockobj, ->
_break()
else
unlock metalockobj, ->
locklog lockobj, 'waiting'
randomwait waittimeobj, _next
else
unlock metalockobj, ->
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
###
RangePWRロックの対象を縮小する
###
module.exports.rangepwrlock_shrink = rangepwrlock_shrink = (lockobj, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
if lockobj.type == 'rangepwr' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
args = [lockobj.pwrobj, rangemin, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = rangemax
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
lockobj.min = rangemin
lockobj.max = rangemax
locklog lockobj, 'shrinked'
next lockobj
else
console.log 'err'
process.exit 1
###
RangeWriterロック
###
module.exports.rangewriterlock = rangewriterlock = (objectname_or_lockobj, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
waittimeobj = {time: lockconfig.waitmin}
lockobj = {}
basename = ''
if not objectname_or_lockobj?
console.log 'error'
process.exit 1
else
cs._ []
,(localnext1)->
if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'rangepwr'
#rangepwrlockの内部にrangewritelockを作る場合
lockobj['obj'] = objectname_or_lockobj.obj
basename = objectname_or_lockobj.obj.replace(/\:meta$/, '')
lockobj['type'] = 'rangewriter-upgrade'
pwrmin = objectname_or_lockobj.min
pwrmax = objectname_or_lockobj.max
if pwrmin <= rangemin and rangemin <= rangemax and rangemax <= pwrmax
lockobj['min'] = rangemin
lockobj['max'] = rangemax
localnext1 basename
else
console.log 'error'
process.exit 1
else
#いきなりrangewritelockを作る場合
lockobj['obj'] = objectname_or_lockobj+':meta'
basename = objectname_or_lockobj
lockobj['type'] = 'rangewriter'
lockobj['min'] = rangemin
lockobj['max'] = rangemax
localnext1 basename
,(localnext2, basename)->
lockobj['robj'] = basename+':readerlockzset'
lockobj['pwrobj'] = basename+':pwrlockzset'
lockobj['wobj'] = basename+':writerlockzset'
lockobj['live'] = basename+':alivelockzset'
if not (basename in rangelocks)
rangelocks[basename] = true
regist_uselocks basename+':readerlockzset'
regist_uselocks basename+':pwrlockzset'
regist_uselocks basename+':writerlockzset'
regist_uselocks basename+':alivelockzset'
regist_uselocks basename+':uniquename'
rcli.INCR [basename+':uniquename'], (err,reply)->
lockobj.name = reply
if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'rangepwr'
#rangepwrlockの内部にrangewritelockを作る場合、
#rangepwrlockやrangewriterlockとの競合は既に排除されているので、
#ロックの数を揃えるためのpwrlockをかけるだけで先へ進む
pwrlock lockobj.obj,(metalockobj)->
localnext2 metalockobj
else
#rangepwrlockやrangewriterlockとの競合をテストして、競合しなくなるまでリトライ
cs._while []
,(_break,_next)->
pwrlock lockobj.obj,(metalockobj)->
#writerともpwrとも衝突チェックして、外れるまでゼロから繰り返し
isconflictrangelock lockobj.pwrobj, lockobj.min, lockobj.max, 0, (isconflict)->
if not isconflict
isconflictrangelock lockobj.wobj, lockobj.min, lockobj.max, 0, (isconflict)->
if not isconflict
_break metalockobj
else
unlock metalockobj, ->
locklog lockobj, 'waiting1'
randomwait waittimeobj, _next
else
unlock metalockobj, ->
locklog lockobj, 'waiting2'
randomwait waittimeobj, _next
,(metalockobj)->
localnext2 metalockobj
,(_dummynext, metalockobj)->
#writerともpwrとも衝突していないので、readerを無視してロックをかけてしまう
writerlock metalockobj,(metalockobj)->
args = [lockobj.wobj, lockobj.min, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = lockobj.max
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
unlock metalockobj, ->
unlock metalockobj, ->
#重複するrangereadlockが外れるのを待つ
waittimeobj.time = lockconfig.waitmin
cs._while []
,(_break,_next)->
isconflictrangelock lockobj.robj, lockobj.min, lockobj.max, 0, (isconflict)->
if not isconflict
_break()
else
randomwait waittimeobj, _next
,->
locklog lockobj, 'locked'
next lockobj
###
RangeWriterロックの対象を縮小する
###
module.exports.rangewriterlock_shrink = rangewriterlock_shrink = (lockobj, rangemin, rangemax, next)->
if not next?
if lockconfig.log then console.log 'no next'
next = -> return
if lockobj.type == 'rangewriter' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
args = [lockobj.wobj, rangemin, lockobj.name+':left']
rcli.ZADD args, (err,reply)->
args[1] = rangemax
args[2] = lockobj.name+':right'
rcli.ZADD args, (err,reply)->
lockobj.min = rangemin
lockobj.max = rangemax
locklog lockobj, 'shrinked'
next lockobj
else
console.log 'err'
process.exit 1
テストコード
では、テストコードだ。initの第1引数に、テスト毎にちょうど良い粒度のログが設定できるようになっている。ただし、あんまりログをたくさん吐くと、ログのせいでタイミングが変わって、異なる実行順序になったりするのでご注意。
redislock = require('../lib/redis-readwritelock')
###
---------------------------------------------
test
---------------------------------------------
###
randsleep = (next)->
setTimeout next, Math.floor(Math.random()*1000)
testA = (num)->
randsleep ->
redislock.simplelock 'test', (lockobj)->
randsleep ->
console.log 'testA'+num
redislock.unlock lockobj, ->
return
testB = (num)->
randsleep ->
redislock.simplelock 'test2', (lockobj)->
randsleep ->
console.log 'testB'+num
redislock.unlock lockobj, ->
return
testC = (num)->
randsleep ->
redislock.simplelock 'test3', (lockobj)->
randsleep ->
console.log 'testC'+num
redislock.unlock lockobj, ->
return
testD = (num)->
randsleep ->
redislock.multilock ['test','test2','test3'], (lockobj)->
randsleep ->
console.log 'testD'+num
redislock.unlock lockobj, ->
return
testrwA = (num)->
randsleep ->
redislock.readerlock 'test', (lockobj)->
randsleep ->
console.log 'testA'+num
redislock.unlock lockobj, ->
return
testrwB = (num)->
randsleep ->
redislock.readerlock 'test', (lockobj)->
randsleep ->
console.log 'testB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.readerlock 'test', (lockobj)->
randsleep ->
console.log 'testBB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.readerlock 'test', (lockobj)->
randsleep ->
console.log 'testBBB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.readerlock 'test', (lockobj)->
randsleep ->
console.log 'testBBBB'+num
redislock.unlock lockobj, ->
return
testrwC = (num)->
randsleep ->
redislock.pwrlock 'test', (lockobj)->
randsleep ->
redislock.writerlock lockobj, (lockobj)->
console.log 'testC'+num+':'+lockobj.score
redislock.unlock lockobj, ->
randsleep ->
redislock.unlock lockobj, ->
return
testrwD = (num)->
randsleep ->
redislock.writerlock 'test', (lockobj)->
console.log 'testD'+num
redislock.unlock lockobj, ->
return
testrangeA = (num)->
randsleep ->
redislock.rangereaderlock 'test', 0, 2, (lockobj)->
randsleep ->
console.log 'testA'+num
redislock.unlock lockobj, ->
return
testrangeB = (num)->
randsleep ->
redislock.rangereaderlock 'test', 0, 1, (lockobj)->
randsleep ->
console.log 'testB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.rangereaderlock 'test', 1, 2, (lockobj)->
randsleep ->
console.log 'testBB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.rangereaderlock 'test', 2, 3, (lockobj)->
randsleep ->
console.log 'testBBB'+num
redislock.unlock lockobj, ->
randsleep ->
randsleep ->
redislock.rangereaderlock 'test', 1, 2, (lockobj)->
randsleep ->
console.log 'testBBBB'+num
redislock.unlock lockobj, ->
return
testrangeC = (num)->
randsleep ->
redislock.rangepwrlock 'test', 0.5, 2.0, (lockobj)->
randsleep ->
redislock.rangewriterlock lockobj, 1.2, 1.8, (wlockobj)->
console.log 'testC'+num
redislock.unlock wlockobj, ->
randsleep ->
redislock.unlock lockobj, ->
return
testrangeD = (num)->
randsleep ->
redislock.rangewriterlock 'test', 2.2, 2.3, (lockobj)->
console.log 'testD'+num
redislock.unlock lockobj, ->
return
redislock.init {
logwait: no
logshrink: no
logsimple: yes
logrw: yes
logrange: yes
},->
switch 1
when 1
for i in [0..5]
testA i
testB i
testC i
testD i
when 2
for i in [0..5]
testrwA i
testrwB i
testrwC i
testrwD i
when 3
for i in [0..5]
testrangeA i
testrangeB i
testrangeC i
testrangeD i