LoginSignup
12
11

More than 5 years have passed since last update.

Redisで超簡単にReader-Writer-Lockを実装する方法

Last updated at Posted at 2015-06-24

考え方と原理

私のロック魂をアピールさせてください!(多田李衣菜)

RedisでSETNXを使えば簡単に排他ロックが実現できる。MSETNXを使えば、複数のロックを同時に評価できる。それは良いのだが、並行実行性を考えると、なるべく競合しない方が良いよね。データ構造でも、競合しにくいように考えていると思うので、制御の方でも競合しにくい作りにしよう。

その代表が、Reader-Writer-Lock。読み出しはいくらでも競合したまえ。書き込みは別だ。というやつだね。賢いロックの賢いついでにreaderlock獲得中のコンテクストが、writerlockに昇格できるってのいいかも、と思ったけど、それ、ちょっと考えたら、同じことを考えてる奴が複数いたら、デッドロックになるじゃんね? 書き込みをしたい人が、書き込みをするための情報を得るために読み出す場合、読み出しの段階からwriterlock獲得しとかなきゃいけないね。注意注意。一応、writerlockに昇格できるreaderlockをまた別のprewriter-readlockとかにして、prewriter-readlock同士は排他するようにすればそういうの出来るけどさ、どうだろね。ややこしすぎるよね。これは最終的に実装するかどうか微妙。リードライトロック以外の何者かになるしね。

と、書いたけど、prewriter-readlock採用。微妙だけど、WriteLockはコストが高いから、その前の読み出しの期間はやっぱりロック状態にしておきたくない。そうすると、下記のようなパターンになる。こうすると、読み込みのみのコンテクストは限界まで阻害されにくくなるので、writerlockに予約機能を持たせて「writerlockにも必ず出番が回ってくる」ことを保証する仕様とのバランスが良くなる。つまり、readerにとっては、writerがのべつまくなし連続して到達しても、prewrite-readlockの間にクリティカルセクションに進入してしまえば良いということになる。ね、うまくできてるよね。いわば「三相Reader-Writer-Lock」ということになると思うのだが、これ、うまいやり方だから先行研究があると思ったら発見できなかった。こんなプリミティブなアイディア、先行研究がないわけないので、誰か教えてえろいひと。

書き込みのパターン

prewrite-readlock
  読み込み(書き込みのための情報を得る)
  writerlock
    書き込み
  writerunlock
prewrite-readunlock

さて、まずは仮想コードだ

とりあえず、仮想コード的に考え方だけ書いておく。 時間が空いたらJavascriptで書き直す。 時間は大して空いてはいないものの、春アニメが一段落した隙間で何とか書いてみた。色んな実装の方法が有りうるが、下記に示したように、writerlockが予約できるような形にしたほうがいい。というのは、readerlockの波状攻撃が止まず、いつまでもwriterlockが獲得できない、という事件が起きうるからだ。(それでも、C++でマルチスレッドだったら、まず短時間のspinlockを試してみてから離散的なポーリングをするようにしておけば、かいくぐれたりするのだが、Node.js想定なので)

実装は単純明快で、基本的にINCR族のコマンドとGETだけで実現できる。INCR族がアトミックだというのは素晴らしいね。ただ、実装が単純な代わりに、readerlockの同時獲得数に制限があるという、いつものゲーム脳だが、その辺は勘弁してくれ。何となれば、閾値を百万とかにしてくれればいい。

reader
  LOCK
    INCR lock して返値が99999以上なら、DECR lock して、(心持ち長めに)待ってリトライ
      リトライ時は、まずGETしてみて返値が99999未満なら、INCRからやり直す。99999以上ならさらに待つ
  UNLOCK
    DECR lock

prewrite-read
  LOCK
    INCRBY lock,50000して、返値が99999以上なら、DECRBY lock,50000して、待ってリトライ
   リトライ時は、まずGETしてみて返値が49999未満なら、INCRからやり直す。49999以上ならさらに待つ
  UNLOCK
    DECRBY lock,50000

writer
  LOCK
    INCRBY lock,100000して、返値が
      【再試行(writerlock予約済 or 獲得済)】
      150000+自己prewrite以上なら、DECRBY lock,100000して、待ってリトライ
      【ロック予約】
      100001+自己prewrite以上なら、小リトライ
        小リトライは、そのまま GET lock を(しつこめに)ポーリングし、
        (このポーリングは非同期関数3段重ねとかで良いと思うよ)
        返値が 100000+自己prewriteになったら通過
      【獲得】
      100000+自己prewriteなら、通過
  UNLOCK
    DECRBY key,100000

ロッキング動作のまとめ

上記のようにすれば、概ね使い物になるロック機構ができあがる。

  • readerlockは、writerlockが獲得もしくは予約されていなければ、複数(49999件まで)同時に獲得できる
  • prewrite-readlockは、writerlockが獲得もしくは予約されておらず、他のprewrite-readlockが獲得されていなければ、獲得できる
  • writerlockは、writerlockが獲得もしくは予約されておらず、他のprewrite-readlockが獲得されていなければ、
    • readerlockが獲得されていれば、writerlockを予約して、readerlockが外れるまで待つ
    • readerlockが獲得されていなければ、writerlockを獲得する

という動作になる。つまり、readerlockは、writerlockさえ無ければ同時にたくさん獲得できるが、prewrite-readlockとwriterlockは同時に合わせて1つまでしか獲得できない。

とても混んでいる時のためのヒント

賢明なる読者の皆さんはお気づきだろうが、writerlockが本当は獲得できたはずなのに、readerlockがちょこちょこちょっかいを出してくるせいで小リトライからなかなか抜けられない、という問題は起きうる。もしそうなったら、readerlockリトライ中の皆さんがリトライ開始時にINCRし、リトライを終えたらDECRするキーを別に用意して、それが閾値を超えたら初回ロック時からまずINCRではなく、取りあえずGETで様子を見てからにする、という解決策があり得る。

基本的に、readerが溜まっても、writerlockが外れれば一気に捌けるので、とことんwriter優遇でよいのだ。まあ、良く実験することが大事なんだけどね。

上記の三相reader-writer-lockでこの問題には片がついた、はずだよね?

で、例の木構造との兼ね合いなのだが

そもそもRedisでロック機構を作ろうと思った理由は、例の「Redisでツリー構造を扱うなら「入れ子区間モデル」でOK」のためなので、実数区間に対して選択的にロックをかけることができるようにしたいわけだ。これは、応用範囲の広いものでは無いけれど、これもまたかなりプリミティブなアイディアであるし、意味のあることだろう。

rwRangeLock
rangelockのためのメタロックを三相reader-writerlockで行い、それをWRITER-META-{LOCK,UNLOCK}, PRW-META-{LOCK,UNLOCK}, READ-META-{LOCK,UNLOCK}と表す。RangeLockそのものは、ZSETで管理し、PWR-ZSET, WRITER-ZSET, READER-ZSETの3つに対して、name:left, name:rightを登録する。

rangewriterlock
  PWR-META-LOCK
    ロックしたい区間が、PWR-ZSET, WRITER-ZSETと重複しないことを確認
    OK
      WRITER-META-LOCK
        WRITER-ZSETにロックしたい区間を追記
      WRITER-META-UNLOCK
      PWR-META-UNLOCK
      READER-ZSETをポーリングし、ロックしたい区間と重複が無くなれば、ロック完了
    NG
      PWR-META-UNLOCK
      冒頭からリトライ

rangewriterunlock
  WRITER-META-LOCK
    WRITER-ZSETからロックしていた区間を削除
  WRITER-META-UNLOCK

rangepwrlock
  PWR-META-LOCK
    ロックしたい区間が、PWR-ZSET, WRITER-ZSETと重複しないことを確認
    OK
      WRITER-META-LOCK
        PWR-ZSETにロックしたい区間を追記
      WRITER-META-UNLOCK
      PWR-META-UNLOCK
      ロック完了
    NG
      PWR-META-UNLOCK
      冒頭からリトライ

rangepwrunlock
  WRITER-META-LOCK
    PWR-ZSETからロックしていた区間を削除
  WRITER-META-UNLOCK

rangereaderlock
  READER-META-LOCK
    WRITER-ZSETとロックしたい区間が重複しないことを確認
    OK
      READER-ZSETにロックしたい区間を追記
      READER-META-UNLOCK
      ロック完了
    NG
      READER-META-UNLOCK
      冒頭からリトライ

rangereaderunlock
  READER-ZSETからロックしていた区間を削除

こんな感じ。このままでは、変な使い方をすると嵌まる、という意味ではもう少しコードセキュリティを考えた実装にしなきゃいかん気もするけど、ライブラリ内使用ならこんな感じではなかろうか。

さらに機能拡張の展望

なお、色々試してみて思ったのは、ロックを外さずに範囲を縮小する機能が欲しい、ということ。拡大だと、下手をするとデッドロックの原因になるが、縮小なら問題無い。外して狭い範囲で再びロック、ということをすると割り込まれるおそれがあるし、かといって注視する範囲が狭まったのに広いロックのままでは粒度が大きくなる。有効かどうかはコンフリクトの頻度次第ではあるんだけどね。

あとは、せっかくコールバック大好きNode.jsでやっているので、ロックを要求して駄目だったときの予備の仕事をコールバックで渡せると良いかな、と思った。ロックが取得できたらロックオブジェクトの中にしまっておいて、Unlock後に呼び出してやる感じ。予備の仕事内でロックを使うとやばいことが起きるから、コードセキュリティ的にどうなの、というのはあるけどね。デバッグ用にもなんか便利そう。

後者は「いずれやるかも」程度の話だと思ってちょんまげ。というわけで、いよいよ実コードに入ろう。サンプルコードは、全部繋げると、使えるコードになる。なお、GitHubにも上げたので、そっちもご覧あれ。使ってみようと思う方は npm install redis-readwritelock でどぞ。

では、実装行ってみよう!

ずいぶん予定よりも遅くなってしまった。面目ない。

まずは初期化まで

ロック名の記録をしているのが実は大事で、アプリケーションを中断したとき、ロックが残っていると、redisに永続化されてしまう。クラスタの場合の起動時には、initが全員完了するまで待ち合わせする必要があるとか、クラスタに追加する場合にはinitが実行されちゃダメとか、色々この辺は使い方に応じて変えなきゃならん。このコードでは次回実行時に前回のゴミを掃除するようになっているけど、上記の諸々の難点を考えると、結構悩ましい。結局、シグナルハンドラ内でgraceful-shutdownフラグみたいなのを立てて、接続している皆様が全員後始末するのを待って終了するのがベストだと思うよん。途中参加でinitをサボらなきゃいかんというのは、すぐできることだから後で対応するけどね(とゆーか、引数にフラグ付けて利用者任せにするつもり)

競合時に「ランダムな時間待つ」というのは、気が進まないがしょうがない。いわずもがな、ライブロック防止策だ。しかも、こいつときたら、ポーリングの度にRedisを叩くので間隔は開けたい。が、開けすぎるとパフォーマンスが酷くなる。なので、一定間隔まで開いたら振り出しに戻る方式にした。Nodeっぽい書き方をするなら、EventEmitterで待ち組を揺さぶり起こすのが定石だが、タイミング問題が怖いし(競合検出~イベントハンドラ登録の隙間でコンテクストを一度手放すので、その隙間に、競合解消&イベント着火が行われるリスクがあるってことね)どうせ他サーバからもRedis叩いたりすると、結局ポーリングも混用せざるを得ないことを考えて、そこは手を抜いた。テストコードみたいな苛烈な競合は、実用上起こらんだろうしね。

cs    = require('control-structures')
redis = require("redis")
rcli  = redis.createClient()

lockconfig = {}
locks = {}
rwlocks = {}
rangelocks = {}
uselocks = ['redislock:keys', '']
# 
# 使用したロック名の記録
# 
regist_uselocks = (lockname)->
  uselocks[1] = lockname
  rcli.SADD uselocks,->return

# 
# ランダムな時間待つ(ロック競合解決のため)
# 
randomwait = (waittimeobj, next)->
  if waittimeobj.time >= lockconfig.waitmax
    waittimeobj.time = lockconfig.waitmin
  else
    waittimeobj.time *= Math.random()+Math.random()+Math.random()
  setTimeout next, Math.floor(waittimeobj.time)

# 
# デバッグ用ログ出力
# 
locklog = (lockobj, message)->
  if lockconfig.log
    logstr = ''
    if /^wait/.test(message) and not lockconfig.logwait
      return
    if /^shrink/.test(message) and not lockconfig.logshrink
      return
    switch lockobj.type
      when 'simple'
        if lockconfig.logsimple
          logstr = lockobj.obj+'['+lockobj.type+']'
        else return
      when 'multi'
        if lockconfig.logsimple
          logstr = lockobj.obj+'['+lockobj.type+']'
        else return
      when 'reader'
        if lockconfig.logrw
          logstr = lockobj.obj+'['+lockobj.type+']'
        else return
      when 'pwr'
        if lockconfig.logrw
          logstr = lockobj.obj+'['+lockobj.type+']'
        else return
      when 'writer'
        if lockconfig.logrw
          logstr = lockobj.obj+'['+lockobj.type+']'
        else return
      when 'rangereader'
        if lockconfig.logrange
          logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
        else return
      when 'rangepwr'
        if lockconfig.logrange
          logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
        else return
      when 'rangewriter'
        if lockconfig.logrange
          logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
        else return
      when 'rangewriter-upgrade'
        if lockconfig.logrange
          logstr = lockobj.name+'('+lockobj.min+','+lockobj.max+')['+lockobj.type+']'
        else return

    logstr += ' '+message
    console.log logstr

# 
# RangeLockのユニーク名のリセット
# (RangeLockにはユニーク名が必要だが、延々と足し続けるといつかバグる。
# 手遅れになる前に、良い感じのところで巻き戻す)
# 
resetrangelockname = (lockobj, next)->
  if lockobj.name > 99999999
    rcli.ZADD [lockobj.live, lockobj.name, lockobj.name], (err,reply)->
      rcli.ZRANGE [lockobj.live, 0, 0, 'WITHSCORES'], (err, replies)->
        if replies[1] > 99999999
          rcli.DECRBY [objectname+':uniquename', 99999999], (err,reply)->
            next()
  else
    next()


###
   初期化:前回起動時に使ったロックを全部削除する
###
module.exports.init = init = (config, next)->
  lockconfig = config
  lockconfig.waitmax = lockconfig.waitmax ? 200
  lockconfig.waitmin = lockconfig.waitmin ? 4
  lockconfig['log']       = true
  lockconfig['logwait']   = false
  lockconfig['logshrink'] = false
  lockconfig['logsimple'] = false
  lockconfig['logrw']     = false
  lockconfig['logrange']  = false

  if lockconfig.log
    console.log 'init'

  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return

  rcli.SMEMBERS ['redislock:keys'],(err,replies)->
    replies.push 'redislock:keys'
    rcli.DEL replies,(err,reply)->
      next()

アンロック

アンロックは、ほとんどどのタイプでも同じ。注意すべきは、使い方のほうで、普通のReadWriteLockで、PreWrite-ReadLockをアップグレードしたWriterLockをアンロックする際は、PreWrite-ReadLockで使ったlockobjを使い回すけど、RangeReadWriteLockの時は、WriterLockで返却されるlockobjは別物なので、アンロックの際に使い分けないといけない点かな。

その辺の使い方はサンプルを見てね。

###
   アンロックは共通
###
# 
# ロックを外す
# 
module.exports.unlock = unlock = (lockobj, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  if lockobj?.obj? and lockobj.type?
    switch lockobj.type
      when 'simple'
        rcli.DEL lockobj.obj, (err, reply) ->
          locklog lockobj, 'unlocked'
          next()
      when 'multi'
        if lockobj.obj.length > 0
          rcli.DEL lockobj.obj, (err, reply) ->
            locklog lockobj, 'unlocked'
            next()
        next()
      when 'reader'
        rcli.DECR [lockobj.obj], (err, reply)->
          locklog lockobj, 'unlocked'
          next()
      when 'pwr'
        rcli.DECRBY [lockobj.obj, 50000], (err, reply)->
          locklog lockobj, 'unlocked'
          next()
      when 'writer'
        rcli.DECRBY [lockobj.obj, 100000], (err, reply)->
          locklog lockobj, 'unlocked'
          if lockobj.score == 150000
            delete lockobj.score
            lockobj.type = 'pwr'
          next()
      when 'rangereader'
        locklog lockobj, 'unlocked'
        args = [lockobj.live, lockobj.name]
        rcli.ZREM args, (err, reply)->
          args[0] = lockobj.robj
          args[1] = lockobj.name+':left'
          rcli.ZREM args, (err,reply)->
            args[1] = lockobj.name+':right'
            rcli.ZREM args, (err,reply)->
              #時間が経ったときはユニーク名のリナンバー
              resetrangelockname lockobj, next
      when 'rangepwr'
        locklog lockobj, 'unlocked'
        writerlock lockobj.obj, (metalockobj)->
          args = [lockobj.live, lockobj.name]
          rcli.ZREM args, (err, reply)->
            args[0] = lockobj.pwrobj
            args[1] = lockobj.name+':left'
            rcli.ZREM args, (err,reply)->
              args[1] = lockobj.name+':right'
              rcli.ZREM args, (err,reply)->
                unlock metalockobj,->
                  #時間が経ったときはユニーク名のリナンバー
                  resetrangelockname lockobj, next
      when 'rangewriter'
        locklog lockobj, 'unlocked'
        writerlock lockobj.obj, (metalockobj)->
          args = [lockobj.live, lockobj.name]
          rcli.ZREM args, (err, reply)->
            args[0] = lockobj.wobj
            args[1] = lockobj.name+':left'
            rcli.ZREM args, (err,reply)->
              args[1] = lockobj.name+':right'
              rcli.ZREM args, (err,reply)->
                unlock metalockobj,->
                  #時間が経ったときはユニーク名のリナンバー
                  resetrangelockname lockobj, next
      when 'rangewriter-upgrade'
        locklog lockobj, 'unlocked'
        writerlock lockobj.obj, (metalockobj)->
          args = [lockobj.wobj, lockobj.name+':left']
          rcli.ZREM args, (err,reply)->
            args[1] = lockobj.name+':right'
            rcli.ZREM args, (err,reply)->
              unlock metalockobj,->
                #時間が経ったときはユニーク名のリナンバー
                resetrangelockname lockobj, next
      else
        console.log 'unknown lockobj.type is found! ['+lockobj.type+']'
        process.exit 1
  else
    console.log 'lockobj is empty!'
    process.exit 1

で、シンプルな排他

これは、別になんのことはない、シンプルな排他。ひとまとまりの処理は、複数のロックを並べて使うとデッドロックを起こしかねないので、multilockを使うべきだろう。

###
   単一ロック
###
# 
# ロックする
# 
module.exports.simplelock = simplelock = (objectname, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {type: 'simple', obj: objectname+':lock'}
  cs._while []
  ,(_break,_next) ->
    if not (objectname in locks)
      locks[objectname] = true
      regist_uselocks objectname+':lock'
    rcli.SETNX [objectname+':lock', 'ok'], (err, reply) ->
      if reply == 1
        _break()
      else
        locklog lockobj, 'waiting'
        randomwait waittimeobj, _next
  ,->
    locklog lockobj, 'locked'
    next lockobj




###
   複数ロック
###
# 
# ロックする
# 
module.exports.multilock = multilock = (objectarray, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {}
  lockobj['type'] = 'multi'
  lockobj['obj'] = []
  lockargs = []
  for obj in objectarray
    lockobj.obj.push obj+':lock'
    lockargs.push obj+':lock'
    lockargs.push 'ok'
    if not (obj in locks)
      locks[obj] = true
      regist_uselocks obj+':lock'
  cs._while []
  ,(_break,_next) ->
    rcli.MSETNX lockargs, (err, reply) ->
      if reply == 1
        _break()
      else
        locklog lockobj, 'waiting'
        randomwait waittimeobj, _next
  ,->
    locklog lockobj, 'locked'
    next lockobj

ReadWriteLock

三相ReadWriteLockのアルゴリズムは既に示したとおり。実装が下記のようになる。なるべくならせっかく三相にしているので、writerlockをかける前に、必ずpwrlockをかけるようにしてくれると、全体としてのパフォーマンスが良くなる。やはり、いきなりwriterlockだと、readerlockには分が悪いので、だいぶ後回しにされるのだ。

###
   Readerロック
###
module.exports.readerlock = readerlock = (objectname, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {}
  lockobj['type'] = 'reader'
  lockobj['obj'] = objectname+':rwlock'
  if not (objectname in rwlocks)
    rwlocks[objectname] = true
    regist_uselocks objectname+':rwlock'
  args = [lockobj.obj]
  rcli.INCR args, (err, reply)->
    if reply >= 99999
      rcli.DECR args, (err,reply)->
        cs._while []
        ,(_break,_next) ->
          rcli.GET args, (err,reply)->
            if reply < 99999
              rcli.INCR args, (err, reply)->
                if reply < 99999
                  _break()
                else
                  rcli.DECR args, (err,reply)->
                    locklog lockobj, 'waiting1'
                    randomwait waittimeobj, _next
            else
              locklog lockobj, 'waiting2'
              randomwait waittimeobj, _next
        ,->
          locklog lockobj, 'locked'
          next lockobj
    else
      locklog lockobj, 'locked'
      next lockobj



###
   PreWrite-Readロック
###
module.exports.pwrlock = pwrlock = (objectname, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  lockobj = {}
  lockobj['type'] = 'pwr'
  lockobj['obj'] = objectname+':rwlock'
  if not (objectname in rwlocks)
    rwlocks[objectname] = true
    regist_uselocks objectname+':rwlock'
  waittimeobj = {time:lockconfig.waitmin}
  args = [lockobj.obj, 50000]
  rcli.INCRBY args, (err, reply)->
    if reply >= 99999
      rcli.DECRBY args, (err,reply)->
        cs._while []
        ,(_break,_next) ->
          rcli.GET [lockobj.obj], (err,reply)->
            if reply < 49999
              rcli.INCRBY args, (err, reply)->
                if reply < 99999
                  _break()
                else
                  rcli.DECRBY args, (err,reply)->
                    locklog lockobj, 'waiting1'
                    randomwait waittimeobj, _next
            else
              locklog lockobj, 'waiting2'
              randomwait waittimeobj, _next
        ,->
          locklog lockobj, 'locked'
          next lockobj
    else
      locklog lockobj, 'locked'
      next lockobj



###
   Writerロック
###
module.exports.writerlock = writerlock = (objectname_or_lockobj, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  basescore = 0
  if objectname_or_lockobj?
    if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'pwr'
      lockobj = objectname_or_lockobj
      lockobj.type = 'writer'
      lockobj['score'] = 150000
      basescore = 50000
    else
      lockobj = {}
      lockobj['type'] = 'writer'
      lockobj['obj'] = objectname_or_lockobj+':rwlock'
      lockobj['score'] = 100000
      if not (objectname_or_lockobj in rwlocks)
        rwlocks[objectname_or_lockobj] = true
        regist_uselocks objectname_or_lockobj+':rwlock'
  else
    console.log 'objectname_or_lockobj is null.'
    process.exit 1

  waittimeobj = {time:lockconfig.waitmin}
  replyvalue = 0
  getargs = [lockobj.obj]
  args = [lockobj.obj, 100000]
  rcli.INCRBY args, (err, reply)->
    replyvalue = reply
    cs._ []
    ,(localnext)->
      if replyvalue >= 150000+basescore
        #writerlocked or pwrlocked
        rcli.DECRBY args, (err,reply)->
          cs._while []
          ,(_break,_next)->
            rcli.INCRBY args, (err, reply)->
              replyvalue = reply
              if replyvalue >= 150000+basescore
                rcli.DECRBY args, (err,reply)->
                  locklog lockobj, 'waiting1'
                  randomwait waittimeobj, _next
              else
                _break()
          ,->
            localnext()
      else
        localnext()
    ,->
      waittimeobj.time = lockconfig.waitmin
      if replyvalue >= 100001+basescore
        #readerlocked
        cs._while []
        ,(_break,_next) ->
          rcli.GET getargs, (err,reply)->
            if reply <= 100000+basescore
              _break()
            else
              locklog lockobj, 'waiting2'
              randomwait waittimeobj, _next
        ,->
          locklog lockobj, 'locked'
          next lockobj
      else if replyvalue == 100000+basescore
        #OK
        locklog lockobj, 'locked'
        next lockobj
      else
        #arienai
        console.log 'unknown error (reply = '+reply
        process.exit 1

区間ReadWriteLock

さて、最終目的地である、区間ReadWriteLockだ。実数区間ごとに、readerlockもpwrlockもwriterlockも掛け放題だ。これができるので、入れ子区間モデルで木構造を管理すると美味しいぜ!ということになるのだ。ちなみに、pwrlockにwriterlockを重ねるとき、後者が前者の部分集合であれば良い。だから、例えば木のつなぎ替えをするときには、srcとdstを両方含む範囲にpwrlockをかけて、それぞれその中の必要な範囲にwriterlockをかけては外し、かけては外し、とやるのが良い。

そのへんは、いずれ、入れ子区間モデルのコードを完成させるときに実装する

isconflictrangelock = (targetobj, rangemin, rangemax, threshold, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  rcli.ZCOUNT [targetobj, rangemin, rangemax], (err,reply)->
    if reply <= threshold*2
      rcli.ZRANGEBYSCORE [targetobj, '-inf', rangemin], (err,replies)->
        if replies.length == 0
          next false
        else
          tablemem = 0
          for reply in replies
            if /\:left$/.test(reply)
              tablemem += 1
            else if /\:right$/.test(reply)
              tablemem -= 1
            else
              console.log reply+' is an illegal range mark.'
              process.exit 1
          next (tablemem > threshold)
    else
      next true


###
   RangeReaderロック
###
module.exports.rangereaderlock = rangereaderlock = (objectname, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {}
  lockobj['type'] = 'rangereader'
  lockobj['name'] = ''
  lockobj['obj'] = objectname+':meta'
  lockobj['min'] = rangemin
  lockobj['max'] = rangemax
  lockobj['robj'] = objectname+':readerlockzset'
  lockobj['pwrobj'] = objectname+':pwrlockzset'
  lockobj['wobj'] = objectname+':writerlockzset'
  lockobj['live'] = objectname+':alivelockzset'
  if not (objectname in rangelocks)
    rangelocks[objectname] = true
    regist_uselocks objectname+':readerlockzset'
    regist_uselocks objectname+':pwrlockzset'
    regist_uselocks objectname+':writerlockzset'
    regist_uselocks objectname+':alivelockzset'
    regist_uselocks objectname+':uniquename'

  cs._ []
  ,(localnext)->
    rcli.INCR [objectname+':uniquename'], (err,reply)->
      lockobj.name = reply
      localnext()
  ,->
    cs._while []
    ,(_break,_next)->
      readerlock lockobj.obj,(metalockobj)->
        isconflictrangelock lockobj.wobj, rangemin, rangemax, 0, (isconflict)->
          if not isconflict
            args = [lockobj.robj, rangemin, lockobj.name+':left']
            rcli.ZADD args, (err,reply)->
              args[1] = rangemax
              args[2] = lockobj.name+':right'
              rcli.ZADD args, (err,reply)->
                unlock metalockobj, ->
                  _break()
          else
            unlock metalockobj, ->
              locklog lockobj, 'waiting'
              randomwait waittimeobj, _next
    ,->
      locklog lockobj, 'locked'
      next lockobj


###
   RangeReaderロックの対象を縮小する
###
module.exports.rangereaderlock_shrink = rangereaderlock_shrink = (lockobj, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  if lockobj.type == 'rangereader' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
    args = [lockobj.robj, rangemin, lockobj.name+':left']
    rcli.ZADD args, (err,reply)->
      args[1] = rangemax
      args[2] = lockobj.name+':right'
      rcli.ZADD args, (err,reply)->
        lockobj.min = rangemin
        lockobj.max = rangemax
        locklog lockobj, 'shrinked'
        next lockobj
  else
    console.log 'err'
    process.exit 1



###
   RangePWRロック
###
module.exports.rangepwrlock = rangepwrlock = (objectname, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {}
  lockobj['type'] = 'rangepwr'
  lockobj['name'] = ''
  lockobj['obj'] = objectname+':meta'
  lockobj['min'] = rangemin
  lockobj['max'] = rangemax
  lockobj['robj'] = objectname+':readerlockzset'
  lockobj['pwrobj'] = objectname+':pwrlockzset'
  lockobj['wobj'] = objectname+':writerlockzset'
  lockobj['live'] = objectname+':alivelockzset'
  if not (objectname in rangelocks)
    rangelocks[objectname] = true
    regist_uselocks objectname+':readerlockzset'
    regist_uselocks objectname+':pwrlockzset'
    regist_uselocks objectname+':writerlockzset'
    regist_uselocks objectname+':alivelockzset'
    regist_uselocks objectname+':uniquename'

  cs._ []
  ,(localnext)->
    rcli.INCR [objectname+':uniquename'], (err,reply)->
      lockobj.name = reply
      localnext()
  ,->
    cs._while []
    ,(_break,_next)->
      pwrlock lockobj.obj,(metalockobj)->
        isconflictrangelock lockobj.pwrobj, lockobj.min, lockobj.max, 0, (isconflict)->
          if not isconflict
            isconflictrangelock lockobj.wobj, lockobj.min, lockobj.max, 0, (isconflict)->
              if not isconflict
                writerlock metalockobj,(metalockobj)->
                  args = [lockobj.pwrobj, lockobj.min, lockobj.name+':left']
                  rcli.ZADD args, (err,reply)->
                    args[1] = lockobj.max
                    args[2] = lockobj.name+':right'
                    rcli.ZADD args, (err,reply)->
                      unlock metalockobj, ->
                        unlock metalockobj, ->
                          _break()
              else
                unlock metalockobj, ->
                  locklog lockobj, 'waiting'
                  randomwait waittimeobj, _next
          else
            unlock metalockobj, ->
              randomwait waittimeobj, _next
    ,->
      locklog lockobj, 'locked'
      next lockobj



###
   RangePWRロックの対象を縮小する
###
module.exports.rangepwrlock_shrink = rangepwrlock_shrink = (lockobj, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  if lockobj.type == 'rangepwr' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
    args = [lockobj.pwrobj, rangemin, lockobj.name+':left']
    rcli.ZADD args, (err,reply)->
      args[1] = rangemax
      args[2] = lockobj.name+':right'
      rcli.ZADD args, (err,reply)->
        lockobj.min = rangemin
        lockobj.max = rangemax
        locklog lockobj, 'shrinked'
        next lockobj
  else
    console.log 'err'
    process.exit 1



###
   RangeWriterロック
###
module.exports.rangewriterlock = rangewriterlock = (objectname_or_lockobj, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  waittimeobj = {time: lockconfig.waitmin}
  lockobj = {}
  basename = ''
  if not objectname_or_lockobj?
    console.log 'error'
    process.exit 1
  else
    cs._ []
    ,(localnext1)->
      if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'rangepwr'
        #rangepwrlockの内部にrangewritelockを作る場合
        lockobj['obj'] = objectname_or_lockobj.obj
        basename = objectname_or_lockobj.obj.replace(/\:meta$/, '')
        lockobj['type'] = 'rangewriter-upgrade'
        pwrmin = objectname_or_lockobj.min
        pwrmax = objectname_or_lockobj.max
        if pwrmin <= rangemin and rangemin <= rangemax and rangemax <= pwrmax
          lockobj['min'] = rangemin
          lockobj['max'] = rangemax
          localnext1 basename
        else
          console.log 'error'
          process.exit 1
      else
        #いきなりrangewritelockを作る場合
        lockobj['obj'] = objectname_or_lockobj+':meta'
        basename = objectname_or_lockobj
        lockobj['type'] = 'rangewriter'
        lockobj['min'] = rangemin
        lockobj['max'] = rangemax
        localnext1 basename
    ,(localnext2, basename)->
      lockobj['robj'] = basename+':readerlockzset'
      lockobj['pwrobj'] = basename+':pwrlockzset'
      lockobj['wobj'] = basename+':writerlockzset'
      lockobj['live'] = basename+':alivelockzset'
      if not (basename in rangelocks)
        rangelocks[basename] = true
        regist_uselocks basename+':readerlockzset'
        regist_uselocks basename+':pwrlockzset'
        regist_uselocks basename+':writerlockzset'
        regist_uselocks basename+':alivelockzset'
        regist_uselocks basename+':uniquename'
      rcli.INCR [basename+':uniquename'], (err,reply)->
        lockobj.name = reply
        if objectname_or_lockobj.type? and objectname_or_lockobj.type == 'rangepwr'
          #rangepwrlockの内部にrangewritelockを作る場合、
          #rangepwrlockやrangewriterlockとの競合は既に排除されているので、
          #ロックの数を揃えるためのpwrlockをかけるだけで先へ進む
          pwrlock lockobj.obj,(metalockobj)->
            localnext2 metalockobj
        else
          #rangepwrlockやrangewriterlockとの競合をテストして、競合しなくなるまでリトライ
          cs._while []
          ,(_break,_next)->
            pwrlock lockobj.obj,(metalockobj)->
              #writerともpwrとも衝突チェックして、外れるまでゼロから繰り返し
              isconflictrangelock lockobj.pwrobj, lockobj.min, lockobj.max, 0, (isconflict)->
                if not isconflict
                  isconflictrangelock lockobj.wobj, lockobj.min, lockobj.max, 0, (isconflict)->
                    if not isconflict
                      _break metalockobj
                    else
                      unlock metalockobj, ->
                        locklog lockobj, 'waiting1'
                        randomwait waittimeobj, _next
                else
                  unlock metalockobj, ->
                    locklog lockobj, 'waiting2'
                    randomwait waittimeobj, _next
          ,(metalockobj)->
            localnext2 metalockobj
    ,(_dummynext, metalockobj)->
      #writerともpwrとも衝突していないので、readerを無視してロックをかけてしまう
      writerlock metalockobj,(metalockobj)->
        args = [lockobj.wobj, lockobj.min, lockobj.name+':left']
        rcli.ZADD args, (err,reply)->
          args[1] = lockobj.max
          args[2] = lockobj.name+':right'
          rcli.ZADD args, (err,reply)->
            unlock metalockobj, ->
              unlock metalockobj, ->
                #重複するrangereadlockが外れるのを待つ
                waittimeobj.time = lockconfig.waitmin
                cs._while []
                ,(_break,_next)->
                  isconflictrangelock lockobj.robj, lockobj.min, lockobj.max, 0, (isconflict)->
                    if not isconflict
                      _break()
                    else
                      randomwait waittimeobj, _next
                ,->
                  locklog lockobj, 'locked'
                  next lockobj



###
   RangeWriterロックの対象を縮小する
###
module.exports.rangewriterlock_shrink = rangewriterlock_shrink = (lockobj, rangemin, rangemax, next)->
  if not next?
    if lockconfig.log then console.log 'no next'
    next = -> return
  if lockobj.type == 'rangewriter' and lockobj.min <= rangemin and rangemin <= rangemax and rangemax <= lockobj.max
    args = [lockobj.wobj, rangemin, lockobj.name+':left']
    rcli.ZADD args, (err,reply)->
      args[1] = rangemax
      args[2] = lockobj.name+':right'
      rcli.ZADD args, (err,reply)->
        lockobj.min = rangemin
        lockobj.max = rangemax
        locklog lockobj, 'shrinked'
        next lockobj
  else
    console.log 'err'
    process.exit 1

テストコード

では、テストコードだ。initの第1引数に、テスト毎にちょうど良い粒度のログが設定できるようになっている。ただし、あんまりログをたくさん吐くと、ログのせいでタイミングが変わって、異なる実行順序になったりするのでご注意。

redislock = require('../lib/redis-readwritelock')



###
---------------------------------------------
   test
---------------------------------------------
###


randsleep = (next)->
  setTimeout next, Math.floor(Math.random()*1000)


testA = (num)->
  randsleep ->
    redislock.simplelock 'test', (lockobj)->
      randsleep ->
        console.log 'testA'+num
        redislock.unlock lockobj, ->
          return

testB = (num)->
  randsleep ->
    redislock.simplelock 'test2', (lockobj)->
      randsleep ->
        console.log 'testB'+num
        redislock.unlock lockobj, ->
          return

testC = (num)->
  randsleep ->
    redislock.simplelock 'test3', (lockobj)->
      randsleep ->
        console.log 'testC'+num
        redislock.unlock lockobj, ->
          return

testD = (num)->
  randsleep ->
    redislock.multilock ['test','test2','test3'], (lockobj)->
      randsleep ->
        console.log 'testD'+num
        redislock.unlock lockobj, ->
          return

testrwA = (num)->
  randsleep ->
    redislock.readerlock 'test', (lockobj)->
      randsleep ->
        console.log 'testA'+num
        redislock.unlock lockobj, ->
          return

testrwB = (num)->
  randsleep ->
    redislock.readerlock 'test', (lockobj)->
      randsleep ->
        console.log 'testB'+num
        redislock.unlock lockobj, ->
          randsleep ->
            randsleep ->
              redislock.readerlock 'test', (lockobj)->
                randsleep ->
                  console.log 'testBB'+num
                  redislock.unlock lockobj, ->
                    randsleep ->
                      randsleep ->
                        redislock.readerlock 'test', (lockobj)->
                          randsleep ->
                            console.log 'testBBB'+num
                            redislock.unlock lockobj, ->
                              randsleep ->
                                randsleep ->
                                  redislock.readerlock 'test', (lockobj)->
                                    randsleep ->
                                      console.log 'testBBBB'+num
                                      redislock.unlock lockobj, ->
                                        return

testrwC = (num)->
  randsleep ->
    redislock.pwrlock 'test', (lockobj)->
      randsleep ->
        redislock.writerlock lockobj, (lockobj)->
          console.log 'testC'+num+':'+lockobj.score
          redislock.unlock lockobj, ->
            randsleep ->
              redislock.unlock lockobj, ->
                return

testrwD = (num)->
  randsleep ->
    redislock.writerlock 'test', (lockobj)->
      console.log 'testD'+num
      redislock.unlock lockobj, ->
        return


testrangeA = (num)->
  randsleep ->
    redislock.rangereaderlock 'test', 0, 2, (lockobj)->
      randsleep ->
        console.log 'testA'+num
        redislock.unlock lockobj, ->
          return

testrangeB = (num)->
  randsleep ->
    redislock.rangereaderlock 'test', 0, 1, (lockobj)->
      randsleep ->
        console.log 'testB'+num
        redislock.unlock lockobj, ->
          randsleep ->
            randsleep ->
              redislock.rangereaderlock 'test', 1, 2, (lockobj)->
                randsleep ->
                  console.log 'testBB'+num
                  redislock.unlock lockobj, ->
                    randsleep ->
                      randsleep ->
                        redislock.rangereaderlock 'test', 2, 3,  (lockobj)->
                          randsleep ->
                            console.log 'testBBB'+num
                            redislock.unlock lockobj, ->
                              randsleep ->
                                randsleep ->
                                  redislock.rangereaderlock 'test', 1, 2, (lockobj)->
                                    randsleep ->
                                      console.log 'testBBBB'+num
                                      redislock.unlock lockobj, ->
                                        return

testrangeC = (num)->
  randsleep ->
    redislock.rangepwrlock 'test', 0.5, 2.0, (lockobj)->
      randsleep ->
        redislock.rangewriterlock lockobj, 1.2, 1.8, (wlockobj)->
          console.log 'testC'+num
          redislock.unlock wlockobj, ->
            randsleep ->
              redislock.unlock lockobj, ->
                return

testrangeD = (num)->
  randsleep ->
    redislock.rangewriterlock 'test', 2.2, 2.3, (lockobj)->
      console.log 'testD'+num
      redislock.unlock lockobj, ->
        return



redislock.init {
  logwait: no
  logshrink: no
  logsimple: yes
  logrw: yes
  logrange: yes
},->
  switch 1
    when 1
      for i in [0..5]
        testA i
        testB i
        testC i
        testD i
    when 2
      for i in [0..5]
        testrwA i
        testrwB i
        testrwC i
        testrwD i
    when 3
      for i in [0..5]
        testrangeA i
        testrangeB i
        testrangeC i
        testrangeD i
12
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
11