LoginSignup
13
15

More than 5 years have passed since last update.

Redis Sorted Set を用いた, 同点を加味したランキング処理を Python で実装する

Last updated at Posted at 2014-03-21

発端

Redis Sorted Set を用いたリアルタイムランキングというネタは, 既に手垢が付きまくっているが, さっと検索した感じ, Python での具体的な実装例が見当たらなかったため, 普段, 社内で利用しているライブラリを簡易化して紹介する.

Source Code

ランキング処理

ranking.py
from datetime import timedelta
from .rankinglist import RankingList


_default_expire = int(timedelta(weeks=2).total_seconds())


class Ranking(object):
    def __init__(self, client, key, expire=_default_expire):
        """
        クラスの初期化を行う.

        :param object client: Redis のクライアント.
        :param string key: ランキングの識別子.
        :param int expire: ランキングの有効期限. 省略すると二週間.
        """
        self._r = client
        self._key = key
        self._expire = expire

    def push(self, unique_id, value):
        """
        ランキングの更新を行う.

        :param string unique_id: ランキングする ID.
        :param string value: ランキングのソース値(ポイント等)
        """
        self._r.zadd(self._key, long(value), unique_id)
        self.touch()

    def get_rank(self, unique_id):
        """
        順位の取得を行う.

        :param string unique_id: ランキングする ID.
        :return: 順位
        """
        value = self._r.zscore(self._key, unique_id)
        if value is None:
            return None
        return self._r.zcount(self._key, '({}'.format(int(value)), 'inf') + 1

    def get_range(self, start, end):
        """
        ランキングの範囲取得を行う.

        :param int start: 添字の開始位置.
        :param int end: 添字の終了位置. start=0 and end=0 で, 先頭の一件を取得.
        :return: ['push() で指定した unique_id', ...]
        """
        result = self._r.zrevrange(self._key, start, end)
        self.touch()
        return result

    def get_count(self):
        """
        件数の取得を行う.

        :return: 件数
        """
        return self._r.zcard(self._key)

    def touch(self):
        """
        ランキングの有効期限を延長する.
        push() と get_rank() でも実行される.
        """
        self._r.expire(self._key, self._expire)

    def clean(self):
        """
        ランキングを削除する.
        """
        self._r.delete(self._key)

    def gen_list(self, wrapper=None):
        """
        ランキングリストを取得する.

        :param function wrapper: 要素を包む関数
        :return: RankingList オブジェクト

        RankingList は, ある程度 List として振る舞う.
        Django の Paginator に渡す際などに使用する.
        """
        return RankingList(self, wrapper)

ちょっと判り難いのが, get_rank() と gen_list().
gen_rank() は, 同点を加味したランキングを得るため, 現在のスコアより上位の人数を数えている.
gen_list() が返す RankingList オブジェクトは後述する.

ランキングをリストとして扱う

rankinglist.py
class RankingList(object):
    def __init__(self, rank, wrapper=None):
        self._rank = rank
        self._wrapper = wrapper

    def __getitem__(self, k):
        if isinstance(k, slice):
            start = k.start if k.start else 0
            end = k.stop - 1 if k.stop else self.__len__() - 1
            step = k.step

            unique_ids = self._rank.get_range(start, end)
            if step:
                unique_ids = unique_ids[::step]

            return [self._wrap(unique_id) for unique_id in unique_ids]
        else:
            if self.__len__() <= k:
                raise IndexError('list index out of range')
            unique_ids = self._rank.get_range(k, k)
            return self._wrap(unique_ids[0])

    def _wrap(self, unique_id):
        return self._wrapper(unique_id) if self._wrapper else unique_id

    def __len__(self):
        return self._rank.get_count()

Python List 風に振る舞う RankingList に Ranking を委譲して使用する.
また, Wapper を渡す事で, Redis から取り出した unique_id を元に Object を生成して返す.

使い方

>>> from redis import Redis
>>> from ranking import Ranking
>>>
>>> ranking = Ranking(Redis(), 'event1')
>>>
>>> ranking.push('p1', 200)
>>> ranking.push('p2', 100)
>>> ranking.push('p3', 300)
>>> ranking.push('p1', 1000)
>>> ranking.push('p4', 1000)
>>>
>>> l1 = ranking.gen_list() # ['p4', 'p1', 'p3', 'p2']
>>> l1[2:] # ['p3', 'p2']
>>>
>>> import Player # e.g. Django Model
>>> def wrapper(id):
        return Player.objects.get(pk=id)
>>> l2 = ranking.gen_list(wrapper) # [Player('p4'), Player('p1'), Player('p3'), Player('p2')]
>>> l2[2:] # [Player('p3'), Player('p2')]
>>>
>>> [ranking.get_rank(player_id) for player_id in l1] # [1, 1, 2, 3]
13
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
15