LoginSignup
9
9

More than 5 years have passed since last update.

Redisでツリー構造を扱うなら「入れ子区間モデル」でOK

Last updated at Posted at 2015-06-19

「ソート済セット型」すげえいい。

これね

「入れ子区間モデル」が超容易かつ効率的に実装できる。

んでもって、これね。リンク先に書いてない「入れ子区間モデル」の凄いところをひとつ挙げておこう。それは、ある枝の配下のノードを「乱択」するアルゴリズムが書けるのだ。

あと、これは元々RDBSで入れ子区間モデルが有効な理由でもあるのだが、操作する区間ごとにロックをかけられるのが素敵なのだ。

これだけ書いただけでも、鋭い人は判ってくれると思う。

まあ、Redisにツリー構造を入れるなら、このやり方がベストだと思うわけよ。ただね、ちょっと弄ってみた結果、子ノードが指定された時の親ノード獲得の効率が悪いところが気になる。あと、親ノードを特定するまで、まともにロックがかけられないのもイタイ。だから、隣接リストモデルを兼用して親を得る必要があるときにはそっちを参照する方が良いかもしれない。

後でちゃんと書く。

ちゃんと書く時間あるのかな。

未検証コード

あとでちゃんと検証&説明を書くけど、とりあえずざっと書き下してみた。まだ多分バグだらけ。一切未テスト。TDDなにそれおいしーのな感じ。だって、大半はケータイで書いたんだもん。TDDも何もないよね。

とりあえず、ということで、自己突っ込みをしておく。自己突っ込み内容は、いずれ治ると思ってもらって良い。

  • そもそも、何でRANKを元に走査してんのさ。基本的にSCOREで走査しないと、こうゆうがっつりな排他制御が必要になるじゃん。
  • 上記とセットの話だが、leftとrightのペアを書き込むのだけwriter-lock, その他の全てはreader-lockで良いよね。ただし、自己reader-lockは保持したままreader-lockをwriter-lockに昇格させるような構造にしないといけないのがちょいとトリッキーだけどさ。
  • あと、どうせロックかけるんなら、SETEXでExpireさせるようにしないと、ツリー走査系の関数に渡されたfuncの中で「return」をやらかすあほうが居たときに嵌まるよ。
redis = require('redis')
rcli  = redis.createClient()
cs    = require('control-structures')

rcli.once "error", (err)->
  console.log("Error " + err);



###
   utility
###
# 
# ロックする
# 複数ロックへの対応と、Expireを仕込め
# 
redislock = (objectname, next)->
  cs._while []
  ,(_break,_next) ->
    rcli.SETNX objectname+'lock', 'lock', (err, reply) ->
      if reply == 1
        _break()
      else
        setTimeout _next, 0
  ,next



# 
# ロックを外す
# 
redisunlock = (objectname, next)->
  rcli.DEL objectname+'lock', (err, reply) ->
    next()



# 
# 兄弟ノードを逆順に全て列挙する
# 
reverse_sibling_traversal = (tree, mostright, next) ->
  mostleft = mostright.replace(/right$/, 'left')
  result = []
  cs._while [mostright]
  ,(_break,_next,rightmember)->
    rcli.ZRANK tree, rightmember, (err, reply)->
      rightrank = reply
      leftrank = rightrank-1
      rcli.ZRANGE tree, leftrank, leftrank, (err, replies)->
        leftmember = replies[0]
        result.push {right: rightmember, left: leftmember}
        if leftmember == mostleft
          _break
        else
          _next leftmember.replace(/right$/, 'left')
  ,->
    next result



# 
# 兄弟ノードを全て列挙する
# 
forward_sibling_traversal = (tree, mostleft, next) ->
  mostright = mostleft.replace(/left$/, 'right')
  result = []
  cs._while [mostleft]
  ,(_break,_next,leftmember)->
    rcli.ZRANK tree, leftmember, (err, reply)->
      leftrank = reply
      rightrank = leftrank+1
      rcli.ZRANGE tree, rightrank, rightrank, (err, replies)->
        rightmember = replies[0]
        result.push {right: rightmember, left: leftmember}
        if rightmember == mostright
          _break
        else
          _next rightmember.replace(/left$/, 'right')
  ,->
    next result



# 
# 親ノードを検索する
# 
forward_search_parent = (tree, nowtarget_right, next) ->
  cs._while [nowtarget_right]
  ,(_break,_next,rightmember)->
    rcli.ZRANK tree, rightmember, (err, reply)->
      rightrank = reply
      nextrank = reply+1
      rcli.ZRANGE tree, nextrank, nextrank, (err, replies)->
        nextmember = replies[0]
        if /right$/.test nextmember
          _break nextmember.replace(/\:right$/, '')
        else
          _next nextmember.replace(/left$/, 'right')
  ,(result)->
    next result



# 
# 親ノードを指定して子供達を探す
# 
forward_search_children = (tree, parent, next) ->
  mostleft = parent+':left'
  mostright = parent+':right')
  result = []
  cs._while [mostleft]
  ,(_break,_next,leftmember)->
    rcli.ZRANK tree, leftmember, (err, reply)->
      leftrank = reply
      rightrank = leftrank+1
      rcli.ZRANGE tree, rightrank, rightrank, (err, replies)->
        rightmember = replies[0]
        if rightmember == mostright
          _break
        else
          result.push(rightmember.replace(/\:left$/, ''))
          _next rightmember.replace(/left$/, 'right')
  ,->
    next result



# 
# 指定した深さの最初のノードを探す
# 
levelscan_first = (tree, level, next)->
  rcli.ZCOUNT tree, (err,reply)->
    maxindex = reply
    scan = 0
    cnt = 0

    cs._while []
    ,(_break,_next)->
      if level*cnt >= maxindex
        _break ''
      rcli.ZRANGE tree,level*cnt,level*cnt+level-1, (err, replies)->
        for reply in replies
          if /left$/.test reply
            scan += 1
            if scan == level
              _break reply
              return
          else
            scan -= 1
        _next()
    ,(reply)->
      next reply



# 
# 既知のノードと同じ深さで、直後に格納されたノードを探す
# 
levelscan_next = (tree, nowtarget, next)->
  rcli.ZRANK tree,nowtarget, (err, reply)->
    ptr = reply+1
    rcli.ZCOUNT tree, (err,reply)->
      maxindex = reply
      scan = -1
      cnt = 0
      seq = 16

      cs._while []
      ,(_break,_next)
        if ptr+seq*cnt >= maxindex
          _break ''
        rcli.ZRANGE tree,ptr+seq*cnt,ptr+seq*cnt+seq-1, (err, replies)->
          for reply in replies
            if /left$/.test reply
              scan += 1
              if scan == 0
                _break reply
              return
          else
            scan -= 1
        _next()
    ,(reply)
      next reply



# 
# 指定したマークの間に新たなノードの両端マークを挿入する
# 
insert_single_node = (tree, node, leftscore, rightscore, next)
  newleftscore = (leftscore*2 + rightscore)/3
  newrightscore = (leftscore + rightscore*2)/3
  rcli.ZADD tree, newleftscore, node+':left', (err,reply)->
    rcli.ZADD tree, newrightscore, node+':right', (err,reply)->
      next()



# 
# 指定したマークの間に新たな複数のマークを挿入する
# 
insert_multi_nodes = (tree, arr, leftscore, rightscore, next)
  div = arr.length+1
  cnt = 1

  cs._while []
  ,(_break, _next)->
    rcli.ZADD tree, (leftscore*(div-cnt) + rightscore*cnt)/3, arr[cnt-1], (err,reply)->
      cnt += 1
      if(cnt > arr.length)
        _break()
      else
        _next()
  ,next



# 
# 指定したノードの両端マークを得る
# 
get_ranks = (tree, target, next)->
  rcli.ZRANK tree,target+':left', (err, reply)->
    rcli.ZRANK tree,target+':right', (err, reply2)->
      next reply,reply2


###
   Creater
###
# 
# ルートノードを新たに作る
# 既にルートノードがあるなら、新たに作ったルートノードの子ノードにする
# 
_tree_add_root = (tree, root, next) ->
  rcli.ZRANGE,0,0,'WITHSCORES', (err,reply)->
    if reply.length == 0
      leftscore = 1.0/(1024.0*1024.0)
    else
      leftscore = reply[1]*0.5
    rcli.ZREVRANGE,0,0,'WITHSCORES', (err,reply)->
      if reply.length == 0
        rightscore = 1024.0*1024.0
      else
        rightscore = reply[1]*2
      rcli.ZADD tree, leftscore, root+':left', (err,reply)->
        rcli.ZADD tree, rightscore, root+':right', (err,reply)->
          next()

tree_add_root = (tree, root, next) ->
  redislock tree,->
    _tree_add_root tree,root,->
      redisunlock tree,->
        next()



# 
# 親ノードを指定して、指定した新たなノードを長子ノードにする
# 
_tree_add_first_child = (tree, parent, child, next) ->
  rcli.ZSCORE tree, parent+':left', (err, reply)->
    leftscore = reply
    rcli.ZRANK tree, parent+':left', (err, reply)->
      leftrank = reply
      rightrank = leftrank+1
      rcli.ZRANGE tree, rightrank, rightrank, 'WITHSCORES', (err, replies)->
        rightscore = replies[1]
        insert_single_node tree, child, leftscore, rightscore, next

tree_add_first_child = (tree, parent, child, next) ->
  redislock tree,->
    _tree_add_first_child tree,parent,child,->
      redisunlock tree,->
        next()



# 
# 親ノードを指定して、指定した新たなノードを末弟ノードにする
# 
_tree_add_last_child = (tree, parent, child, next) ->
  rcli.ZSCORE tree, parent+':right', (err, reply)->
    rightscore = reply
    rcli.ZRANK tree, parent+':right', (err, reply)->
      rightrank = reply
      leftrank = leftrank-1
      rcli.ZRANGE tree, leftrank, leftrank, 'WITHSCORES', (err, replies)->
        leftscore = replies[1]
        insert_single_node tree, child, leftscore, rightscore, next

tree_add_last_child = (tree, parent, child, next) ->
  redislock tree,->
    _tree_add_last_child tree,parent,child,->
      redisunlock tree,->
        next()



# 
# 親ノードを指定して、指定した新たなノードをn番目の子ノードにする
# nがマイナスなら、末尾から数えて-n番目の子ノードにする
# 
_tree_add_nth_child = (tree, parent, child, num, next) ->
  ((lnext)->
    if num < 0
      num = -(num+1)
      reverse_sibling_traversal tree, parent+':right', (arr)->
        lnext(arr)
    else
      forward_sibling_traversal tree, parent+':left', (arr)->
        lnext(arr)
  )(->
    if arr.length <= num
      num = arr.length-1
    rcli.ZSCORE tree, arr[num].left, (err, reply)->
      leftscore = reply
      rcli.ZSCORE tree, arr[num].right, (err, reply)->
        rightscore = reply
        insert_single_node tree, child, leftscore, rightscore, next
  )

tree_add_nth_child = (tree, parent, child, num, next) ->
  redislock tree,->
    _tree_add_nth_child tree,parent,child,num,->
      redisunlock tree,->
        next()



###
   Deleter
###
# 
# 単一ノードを削除する
# そのノードに子ノードがあれば、そのノードの親ノードの子ノードにする
# 
_tree_remove_single_node = (tree, target, next) ->
  next()

tree_remove_single_node = (tree, target, next) ->
  redislock tree,->
    _tree_remove_single_node tree,target,->
      redisunlock tree,->
        next()



# 
# ノードを削除する
# そのノードに子ノードがあれば、一緒に削除する
# 
_tree_remove_node_subtree = (tree, target, next) ->
  next()

tree_remove_node_subtree = (tree, target, next) ->
  redislock tree,->
    _tree_remove_node_subtree tree,target,->
      redisunlock tree,->
        next()



###
   Updater
###
# 
# 単一ノードを別の親ノードのn番目の子ノードに移動させる
# nがマイナスなら、末尾から数えて-n番目の子ノードにする
# そのノードに子ノードがあれば、そのノードの親ノードの子ノードにする
# 
tree_move_single_node_to_nth_child = (tree, node, newparent, index, next)->
  next()

tree_move_single_node_to_nth_child = (tree, node, newparent, index, next)->
  redislock tree,->
    _tree_move_single_node_to_nth_child tree,node,newparent,index,->
      redisunlock tree,->
        next()



# 
# ノードを別の親ノードのn番目の子ノードに移動させる
# nがマイナスなら、末尾から数えて-n番目の子ノードにする
# そのノードに子ノードがあれば、一緒に移動する
# 
_tree_move_node_subtree_to_nth_child = (tree, node, newparent, index, next)->
  next()

tree_move_node_subtree_to_nth_child = (tree, node, newparent, index, next)->
  redislock tree,->
    _tree_refer_next_node tree,node,newparent,index,->
      redisunlock tree,->
        next()



# 
# 別キーのzsetに収納されたキーを用いて子ノードを並べ替える
# 気をつけろ!デッドロック注意報!
# 
_tree_sort_children_nodes = (tree, parent, value_zset, next) ->
  next()

tree_sort_children_nodes = (tree, parent, value_zset, next) ->
  redislock value_zset,->
    redislock tree,->
      _tree_refer_next_node tree, parent, value_zset,->
        redisunlock tree,->
          redisunlock value_zset,->
            next()



# 
# 操作を繰り返してノード間の間隔が狭まってきた時に番号を振り直す
# 
_tree_renumber = (tree, next) ->
  next()

tree_renumber = (tree, next) ->
  redislock tree,->
    _tree_renumber tree,->
      redisunlock tree,->
        next()



###
   Referer
###
# 
# ルートノードを得る
# 
_tree_get_root = (tree, next) ->
  rcli.ZRANGE,0,0,(err,replies)->
    if replies.length == 0
      next null,''
    rcli.ZREVRANGE,0,0,(err,replies)->
      if replies.length == 0
        next null,''
      next null,replies[0],replace(/\:right$/, '')

tree_get_root = (tree, next) ->
  redislock tree,->
    _tree_get_root tree,target,(err, result)->
      redisunlock tree,->
        next err, result



# 
# 深さ優先探索して、各ノードでfuncを実行する
# 
# func(err,nodename,level,_break,_next)
# 
_tree_scan_width = (tree,func,next)->
  rcli.ZCOUNT tree, (err,reply)->
    maxindex = reply
    level = 1
    seq = 16
    cnt = 0
    cs._while []
    ,(_break,_next) ->
      if seq*cnt >= maxindex
        _break
      rcli.ZRANGE tree,seq*cnt,seq*cnt+seq-1,(err,replies)->
        for reply in replies
          if /right$/.test reply
            level -= 1
            _next()
          else
            func null,reply,level,next,->
              level += 1
              _next()
    ,next

tree_scan_depth = (tree, func, next) ->
  redislock tree,->
    _tree_scan_depth tree,func,->
      redisunlock tree,next



# 
# 幅優先探索して、各ノードでfuncを実行する
# 
# func(err,nodename,level,_break,_next)
# 
_tree_scan_width = (tree,func,next)->
  level = 1
  cs._while []
  ,(_break,_next) ->
    levelscan_first tree, level, (result)->
      if result == ''
        _break()
      func null,result,level,next,->
        cs._while [result]
        ,(_break2,_next2,target) ->
          levelscan_next tree, target, (result2)->
            if result2 == ''
              _break2()
            else
              func null,result2,level,next,->
                _next2 result2
        ,->
          level += 1
          _next()
  ,next

tree_scan_width = (tree, func, next) ->
  redislock tree,->
    _tree_scan_width tree,target,->
      redisunlock tree,next


# 
# 子ノードを指定して親ノードを得る
# 
_tree_get_parent_node = (tree, child, next) ->
  forward_search_parent tree, child+':right', (result)->
    next null,result

tree_get_parent_node = (tree, child, next) ->
  redislock tree,->
    _tree_get_parent_node tree,target,(err, parent)->
      redisunlock tree,->
        next err, parent


# 
# 親ノードを指定して子ノード配列を得る
# 
_tree_get_children_nodes = (tree, parent, next) ->
  forward_search_children tree, parent, (result)->
    next null,result

tree_get_children_nodes = (tree, parent, next) ->
  redislock tree,->
    _tree_get_children_nodes tree,target,(err, arr)->
      redisunlock tree,->
        next err, arr



# 
# srcノードからdstノードまでの最短距離を得る
# 
_tree_get_shortest_path = (tree,dst,src,next)->
  #src equals dst
  if src == dst
    next null, []

  result = []
  get_ranks tree,src,(srcleft,srcright)->
    get_ranks tree,dst,(dstleft,dstright)->
      #src is in dst
      if dstleft<srcleft and srcright<dstright
        cs._while [src]
        ,(_break,_next,metasrc)->
          _tree_get_parent_node tree,metasrc,(err,parent)->
            result.push {direction: '..', node: parent}
            if parent != dst
              _next parent
            else
              _break()
        ,->
          next null, result
      else
        #dst is not in src
        cs._ []
        ,(next2)->
          if (dstleft-srcleft)*(dstright-srcright) > 0
            cs._while [src]
            ,(_break,_next,metasrc)->
              _tree_get_parent_node tree,metasrc,(err,parent)->
                result.push {direction: '..', node: parent}
                get_ranks tree,parent,(srcleft,srcright)->
                  if (dstleft-srcleft)*(dstright-srcright) > 0
                    _next parent
                  else
                    _break dst,parent
            ,(dst,parent)->
              next2 dst,parent
          else
            next2 dst,src
        ,(next3,dst,metasrc)->
          result2 = [{direction: '->', node: dst}]
          cs._while [dst]
          ,(_break,_next,metadst)->
            _tree_get_parent_node tree,metadst,(err,parent)->
              if parent != metasrc
                result2.unshift {direction: '->', node: parent}
                _next parent
              else
                _break()
          ,->
            result.concat(result2)
            next null, result

tree_get_shortest_path = (tree, dst, src, next) ->
  redislock tree,->
    _tree_get_shortest_path tree,dst,src,(err, arr)->
      redisunlock tree,->
        next err, arr
9
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
9