6
5

PythonでO(1)ランダムアクセス可能なdequeの新規提案

Last updated at Posted at 2024-08-24

はじめましての人ははじめまして。alumiと申します。
まず本編の前に先駆者であるごりちゃんさんへ感謝とリスペクトを。

Pythonの標準ライブラリのdequeは、内部実装の都合で任意要素へindexを指定して参照するランダムアクセスに$O(N)$かかります。
C++のdequeは$O(1)$でランダムアクセス可能なため、これを前提とした問題が出されるとPython勢は慌てふためくことになります。
しかし偉大な先駆者であるごりちゃんさんがリングバッファで実装したdequeを公開してくださっているので、Python勢がこのケースで慌てることはなくなりました。良かった。

この記事ではごりちゃんさんの実装よりもappend, pop操作が若干早い実装ができたので、実装とその説明をしたいと思います1

実装

class TwoListDeque:
    __slots__ = ("front", "back")

    def __init__(self, iterable=None) -> None:
        init_arr = list(iterable) if iterable else []
        mid = len(init_arr) >> 1
        self.front = init_arr[:mid][::-1]
        self.back = init_arr[mid:]

    def __len__(self) -> int:
        return len(self.front) + len(self.back)

    def __str__(self) -> str:
        return f"{self.__class__.__name__}({list(self)})"

    __repr__ = __str__

    def __iter__(self):
        yield from reversed(self.front)
        yield from self.back

    def __reversed__(self):
        yield from reversed(self.back)
        yield from self.front

    def __contains__(self, item) -> bool:
        return item in self.front or item in self.back

    def _normalize_index(self, i: int) -> int:
        length = len(self)
        if i < -length or length <= i:
            raise IndexError("deque index out of range")
        return i if i >= 0 else i + length

    def __getitem__(self, i: int):
        i = self._normalize_index(i)
        if i < len(self.front):
            return self.front[~i]
        else:
            return self.back[i - len(self.front)]

    def __setitem__(self, i: int, item) -> None:
        i = self._normalize_index(i)
        if i < len(self.front):
            self.front[~i] = item
        else:
            self.back[i - len(self.front)] = item

    def __delitem__(self, i: int) -> None:
        i = self._normalize_index(i)
        if i < len(self.front):
            del self.front[~i]
        else:
            del self.back[i - len(self.front)]

    def _balance(self) -> None:
        source, target = (
            (self.front, self.back) if not self.back else (self.back, self.front)
        )
        mid = len(source) >> 1
        target.extend(source[: mid + 1][::-1])
        del source[: mid + 1]

    def append(self, item) -> None:
        self.back.append(item)

    def appendleft(self, item) -> None:
        self.front.append(item)

    def extend(self, iterable) -> None:
        self.back.extend(iterable)

    def extendleft(self, iterable) -> None:
        self.front.extend(iterable)

    def pop(self):
        if not self:
            raise IndexError("pop from empty deque")
        if not self.back:
            self._balance()
        return self.back.pop()

    def popleft(self):
        if not self:
            raise IndexError("popleft from empty deque")
        if not self.front:
            self._balance()
        return self.front.pop()

    def rotate(self, n: int = 1) -> None:
        if not self:
            return
        length = len(self)
        n %= length
        if n == 0:
            return

        if n <= length // 2:
            for _ in range(n):
                self.appendleft(self.pop())
        else:
            for _ in range(length - n):
                self.append(self.popleft())

    def count(self, item) -> int:
        return sum(1 for x in self if x == item)

    def clear(self) -> None:
        self.front.clear()
        self.back.clear()

    def remove(self, item) -> None:
        for i, x in enumerate(self):
            if x == item:
                del self[i]
                return
        raise ValueError(f"{item} is not in deque")

    def insert(self, i: int, item) -> None:
        i = i if i >= 0 else i + len(self)
        if i <= len(self.front):
            if i == 0:
                self.front.append(item)
            else:
                self.front.insert(~i + 1, item)
        else:
            self.back.insert(i - len(self.front), item)

    def index(self, item, start: int = 0, stop=None) -> int:
        if stop is None:
            stop = len(self)
        start = start if start >= 0 else start + len(self)
        stop = stop if stop >= 0 else stop + len(self)
        for i in range(max(start, 0), min(stop, len(self))):
            if self[i] == item:
                return i
        raise ValueError(f"{item} is not in deque")

    def deepcopy(self) -> "TwoListDeque":
        new_ins = TwoListDeque()
        new_ins.front = self.front[:]
        new_ins.back = self.back[:]
        return new_ins

2つのlistを使っているので、説明上「2リストdeque」と呼びます。
やっていることは非常に単純で、

  • appendleft, popleftを受け付ける可変長配列front
  • append, popを受け付ける可変長配列back

を持って、appendleftpopleftfrontへのappendpopで代替し、ランダムアクセス時はindexの大きさによってfront, backのどっちを参照するかを判断しています。
またpopleft時にfrontが空、もしくはpop時にbackが空のときは、_balanceを走らせてもう一方の配列から半分要素をもらいます2front側は要素が逆向きに入っているので、振り分け時に反転しています(下図参照)。要素が空の方をtarget、要素を与える方をsourceとすることで、どちらの場合でも同じように振り分けができます。

balance01.png

balance02.png

一応insertcountなども実装してありますが$O(N)$なので注意です(countはdeque内の要素とその要素数を辞書で持って、append時に+1、pop時に-1すればメモリとトレードオフで$O(1)$実装できます。使い道なさそうだからやらんけど)。

速度テスト

標準ライブラリのdeque・今回の2リストdeque・ごりちゃんさんdeque(コードではRingBufferDeque)のappend, appendleft, pop, popleft, random accessの速度比較を行います。

テストコード
class TwoListDeque:
    __slots__ = ("front", "back")

    def __init__(self, iterable=None) -> None:
        init_arr = list(iterable) if iterable else []
        mid = len(init_arr) >> 1
        self.front = init_arr[:mid][::-1]
        self.back = init_arr[mid:]

    def _balance(self) -> None:
        source, target = (
            (self.front, self.back) if not self.back else (self.back, self.front)
        )
        mid = len(source) >> 1
        target.extend(source[: mid + 1][::-1])
        del source[: mid + 1]

    def append(self, item) -> None:
        self.back.append(item)

    def appendleft(self, item) -> None:
        self.front.append(item)

    def extend(self, iterable) -> None:
        self.back.extend(iterable)

    def extendleft(self, iterable) -> None:
        self.front.extend(iterable)

    def pop(self):
        if not self:
            raise IndexError("pop from empty deque")
        if not self.back:
            self._balance()
        return self.back.pop()

    def popleft(self):
        if not self:
            raise IndexError("popleft from empty deque")
        if not self.front:
            self._balance()
        return self.front.pop()

    def __getitem__(self, i: int):
        l = len(self)
        if i < -l or l <= i:
            raise IndexError("deque index out of range")
        i = i if i >= 0 else i + l
        if i < len(self.front):
            return self.front[~i]
        else:
            return self.back[i - len(self.front)]

    def __len__(self) -> int:
        return len(self.front) + len(self.back)


# ごりちゃんさんのDeque
class RingBufferDeque:
    def __init__(self, src_arr=[], max_size=300000):
        self.N = max(max_size, len(src_arr)) + 1
        self.buf = list(src_arr) + [None] * (self.N - len(src_arr))
        self.head = 0
        self.tail = len(src_arr)

    def __index(self, i):
        l = len(self)
        if not -l <= i < l:
            raise IndexError("index out of range: " + str(i))
        if i < 0:
            i += l
        return (self.head + i) % self.N

    def __extend(self):
        ex = self.N - 1
        self.buf[self.tail + 1 : self.tail + 1] = [None] * ex
        self.N = len(self.buf)
        if self.head > 0:
            self.head += ex

    def is_full(self):
        return len(self) >= self.N - 1

    def is_empty(self):
        return len(self) == 0

    def append(self, x):
        if self.is_full():
            self.__extend()
        self.buf[self.tail] = x
        self.tail += 1
        self.tail %= self.N

    def appendleft(self, x):
        if self.is_full():
            self.__extend()
        self.buf[(self.head - 1) % self.N] = x
        self.head -= 1
        self.head %= self.N

    def pop(self):
        if self.is_empty():
            raise IndexError("pop() when buffer is empty")
        ret = self.buf[(self.tail - 1) % self.N]
        self.tail -= 1
        self.tail %= self.N
        return ret

    def popleft(self):
        if self.is_empty():
            raise IndexError("popleft() when buffer is empty")
        ret = self.buf[self.head]
        self.head += 1
        self.head %= self.N
        return ret

    def __len__(self):
        return (self.tail - self.head) % self.N

    def __getitem__(self, key):
        return self.buf[self.__index(key)]

    def __setitem__(self, key, value):
        self.buf[self.__index(key)] = value

    def __str__(self):
        return "Deque({0})".format(str(list(self)))



import time
from collections import deque
import random
import matplotlib.pyplot as plt
import numpy as np
import statistics


# 特定操作の時間計測
def test_operation(deque_obj, operation, n):
    start_time = time.perf_counter()
    for _ in range(n):
        if operation == "append":
            deque_obj.append(random.randint(-100, 100))
        elif operation == "appendleft":
            deque_obj.appendleft(random.randint(-100, 100))
        elif operation == "pop":
            if len(deque_obj) > 0:
                deque_obj.pop()
        elif operation == "popleft":
            if len(deque_obj) > 0:
                deque_obj.popleft()
        elif operation == "random_access":
            if len(deque_obj) > 0:
                index = random.randrange(len(deque_obj))
                _ = deque_obj[index]
    return time.perf_counter() - start_time


def run_tests(deque_class, size, operations, repetitions, num_trials=5):
    results = {}
    for op in operations:
        op_times = []
        for _ in range(num_trials):
            deque_obj = deque_class(list(range(size)))
            op_times.append(test_operation(deque_obj, op, repetitions))
        results[op] = {
            "mean": statistics.mean(op_times),
            "stdev": statistics.stdev(op_times) if len(op_times) > 1 else 0,
        }
    return results


# テスト結果の描画関数
def plot_results(sizes, std_results, rb_results, tl_results, operations):
    fig, axs = plt.subplots(
        len(operations), 1, figsize=(10, 4 * len(operations)), sharex=True
    )
    fig.suptitle("Deque Performance Comparison", y=1.02)

    x = np.arange(len(sizes))
    width = 0.25

    for i, op in enumerate(operations):
        std_times = [std_results[size][op]["mean"] for size in sizes]
        rb_times = [rb_results[size][op]["mean"] for size in sizes]
        tl_times = [tl_results[size][op]["mean"] for size in sizes]

        std_err = [std_results[size][op]["stdev"] for size in sizes]
        rb_err = [rb_results[size][op]["stdev"] for size in sizes]
        tl_err = [tl_results[size][op]["stdev"] for size in sizes]

        axs[i].bar(
            x - width,
            std_times,
            width,
            yerr=std_err,
            label="Std Deque",
            color="blue",
            capsize=5,
        )
        axs[i].bar(
            x,
            rb_times,
            width,
            yerr=rb_err,
            label="Ring buffer Deque",
            color="green",
            capsize=5,
        )
        axs[i].bar(
            x + width,
            tl_times,
            width,
            yerr=tl_err,
            label="Two list Deque",
            color="red",
            capsize=5,
        )

        axs[i].set_ylabel("Time (s)")
        axs[i].set_title(f"{op} Operation")
        axs[i].set_xticks(x)
        axs[i].set_xticklabels(sizes)
        axs[i].legend(loc="upper left")

        # random_accessの場合のみ対数スケールを使用
        if op == "random_access":
            axs[i].set_yscale("log")
        else:
            axs[i].set_yscale("linear")

    plt.xlabel("Number of Elements")
    plt.tight_layout()
    plt.subplots_adjust(top=0.95)
    plt.show()


# テスト実行
sizes = [1000, 10000, 100000, 1000000]
operations = ["append", "appendleft", "pop", "popleft", "random_access"]
repetitions = 10000
num_trials = 100  # 試行回数

std_results = {}
rb_results = {}
tl_results = {}

for size in sizes:
    print(f"Testing with {size} elements:")
    std_results[size] = run_tests(deque, size, operations, repetitions, num_trials)
    rb_results[size] = run_tests(
        RingBufferDeque, size, operations, repetitions, num_trials
    )
    tl_results[size] = run_tests(
        TwoListDeque, size, operations, repetitions, num_trials
    )
    # print("Standard deque results:", std_results[size])
    # print("Ring Buffer deque results:", rb_results[size])
    # print("Two List deque results:", tl_results[size])
    # print()

# 結果を描画
plot_results(sizes, std_results, rb_results, tl_results, operations)

テスト結果

Figure_1.png

append, appendleft, pop, popleftにおいては標準ライブラリdequeが圧勝ですね。ランダムアクセスを犠牲にしてdequeインターフェースで要求される基本操作だけに特化したような感じなんでしょうか(参考:python - Why is deque implemented as a linked list instead of a circular array? - Stack Overflow)。
今回の2リストdequeはごりちゃんさんdequeに比べてappend, appendleft, pop, popleftで多少早くなっています。ただ pop, popleft_balance(ごりちゃんさんdequeなら__extend)が生じやすい条件で結構変わってくると思います。
ランダムアクセスについては2リストdequeとごりちゃんさんdequeとの差はほぼなさそうですね。まあindex計算と配列参照という同じことをしているのでそれはそう。

余談

今回の実装はSWAG(Sliding Window Aggregation)の実装を見直しているときに思いついたものです(なので普通に車輪の再発明)。見比べていただければわかりますがSWAGの実装からfoldに必要な構成を取り除いただけです。to_listreverseなども直感的に実装できるし汎用性も高そうで、なによりわかりやすくてソラでかけそう(個人の感想)と思ったのでちゃんと実装してみました。
ミスやバグなどあったら教えてくださると嬉しいです❗一応次の問題は通せてます。

今回は要素数が偏ることのデメリットよりも実装の簡潔さを優先したので、片方が空になったときだけ_balanceを呼んでいますが、片方が所定の要素数(例えばもう一方の1/3以下など)になったら_balanceの処理を行うとより要素数の平衡を保てメモリ効率がよくなると思います。

こちらが把握できていないpros・consやメソッドの追加、他実装の提案などもお待ちしています。
皆さんの競プロライフの一助となれば幸いです。読んでくださりありがとうございました(いいね押してくれるとより嬉しいです🙏)

参考

  1. 実用上はごりちゃんさんの実装で困ることはないので、そんな実装もあるんだな~という気持ちで読んでください

  2. このような操作を行う先行例として、永続データ構造 - Wikipedia内に「銀行家のキュー(banker's queue)」が記載されています。しかしこれが単なるqueueの実装なのか、永続データqueueの実装なのかは不明瞭です(多分前者。「銀行家」は償却分析のbanker法から採っていると思う:9.3.3. Bankers and Physicists · Functional Programming in OCaml)。

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5