はじめましての人ははじめまして。alumiと申します。
まず本編の前に先駆者であるごりちゃんさんへ感謝とリスペクトを。
Pythonの標準ライブラリのdequeは、内部実装の都合で任意要素へindexを指定して参照するランダムアクセスに$O(N)$かかります。
C++のdequeは$O(1)$でランダムアクセス可能なため、これを前提とした問題が出されるとPython勢は慌てふためくことになります。
しかし偉大な先駆者であるごりちゃんさんがリングバッファで実装したdequeを公開してくださっているので、Python勢がこのケースで慌てることはなくなりました。良かった。
この記事ではごりちゃんさんの実装よりもappend, pop操作が若干早い実装ができたので、実装とその説明をしたいと思います1。
実装
class TwoStackDeque:
__slots__ = ("front", "back")
def __init__(self, iterable=None) -> None:
init_arr = list(iterable) if iterable else []
mid = len(init_arr) >> 1
self.front = init_arr[:mid][::-1]
self.back = init_arr[mid:]
def __len__(self) -> int:
return len(self.front) + len(self.back)
def __str__(self) -> str:
return f"{self.__class__.__name__}({list(self)})"
__repr__ = __str__
def __iter__(self):
yield from reversed(self.front)
yield from self.back
def __reversed__(self):
yield from reversed(self.back)
yield from self.front
def __contains__(self, item) -> bool:
return item in self.front or item in self.back
def _normalize_index(self, i: int) -> int:
length = len(self)
if i < -length or length <= i:
raise IndexError("deque index out of range")
return i if i >= 0 else i + length
def __getitem__(self, i: int):
i = self._normalize_index(i)
if i < len(self.front):
return self.front[~i]
else:
return self.back[i - len(self.front)]
def __setitem__(self, i: int, item) -> None:
i = self._normalize_index(i)
if i < len(self.front):
self.front[~i] = item
else:
self.back[i - len(self.front)] = item
def __delitem__(self, i: int) -> None:
i = self._normalize_index(i)
if i < len(self.front):
del self.front[~i]
else:
del self.back[i - len(self.front)]
def _balance(self) -> None:
source, target = (
(self.front, self.back) if not self.back else (self.back, self.front)
)
mid = len(source) >> 1
target.extend(source[: mid + 1][::-1])
del source[: mid + 1]
def append(self, item) -> None:
self.back.append(item)
def appendleft(self, item) -> None:
self.front.append(item)
def extend(self, iterable) -> None:
self.back.extend(iterable)
def extendleft(self, iterable) -> None:
self.front.extend(iterable)
def pop(self):
if not self:
raise IndexError("pop from empty deque")
if not self.back:
self._balance()
return self.back.pop()
def popleft(self):
if not self:
raise IndexError("popleft from empty deque")
if not self.front:
self._balance()
return self.front.pop()
def rotate(self, n: int = 1) -> None:
if not self:
return
length = len(self)
n %= length
if n == 0:
return
if n <= length // 2:
for _ in range(n):
self.appendleft(self.pop())
else:
for _ in range(length - n):
self.append(self.popleft())
def count(self, item) -> int:
return sum(1 for x in self if x == item)
def clear(self) -> None:
self.front.clear()
self.back.clear()
def remove(self, item) -> None:
for i, x in enumerate(self):
if x == item:
del self[i]
return
raise ValueError(f"{item} is not in deque")
def insert(self, i: int, item) -> None:
i = i if i >= 0 else i + len(self)
if i <= len(self.front):
if i == 0:
self.front.append(item)
else:
self.front.insert(~i + 1, item)
else:
self.back.insert(i - len(self.front), item)
def index(self, item, start: int = 0, stop=None) -> int:
if stop is None:
stop = len(self)
start = start if start >= 0 else start + len(self)
stop = stop if stop >= 0 else stop + len(self)
for i in range(max(start, 0), min(stop, len(self))):
if self[i] == item:
return i
raise ValueError(f"{item} is not in deque")
def deepcopy(self) -> "TwoStackDeque":
new_ins = TwoStackDeque()
new_ins.front = self.front[:]
new_ins.back = self.back[:]
return new_ins
2つのStackを使っているので、説明上「2スタックdeque」と呼びます。
やっていることは非常に単純で、
- appendleft, popleftを受け付ける可変長配列
front
- append, popを受け付ける可変長配列
back
を持って、appendleft
・popleft
をfront
へのappend
・pop
で代替し、ランダムアクセス時はindexの大きさによってfront
, back
のどっちを参照するかを判断しています。
またpopleft
時にfront
が空、もしくはpop
時にback
が空のときは、_balance
を走らせてもう一方の配列から半分要素をもらいます2。front
側は要素が逆向きに入っているので、振り分け時に反転しています(下図参照)。要素が空の方をtarget
、要素を与える方をsource
とすることで、どちらの場合でも同じように振り分けができます。
一応insert
、count
なども実装してありますが$O(N)$なので注意です(count
はdeque内の要素とその要素数を辞書で持って、append
時に+1、pop
時に-1すればメモリとトレードオフで$O(1)$実装できます。使い道なさそうだからやらんけど)。
速度テスト
標準ライブラリのdeque・今回の2スタックdeque・ごりちゃんさんdeque(コードではRingBufferDeque
)のappend
, appendleft
, pop
, popleft
, random accessの速度比較を行います。
テストコード
class TwoStackDeque:
__slots__ = ("front", "back")
def __init__(self, iterable=None) -> None:
init_arr = list(iterable) if iterable else []
mid = len(init_arr) >> 1
self.front = init_arr[:mid][::-1]
self.back = init_arr[mid:]
def _balance(self) -> None:
source, target = (
(self.front, self.back) if not self.back else (self.back, self.front)
)
mid = len(source) >> 1
target.extend(source[: mid + 1][::-1])
del source[: mid + 1]
def append(self, item) -> None:
self.back.append(item)
def appendleft(self, item) -> None:
self.front.append(item)
def pop(self):
if not self:
raise IndexError("pop from empty deque")
if not self.back:
self._balance()
return self.back.pop()
def popleft(self):
if not self:
raise IndexError("popleft from empty deque")
if not self.front:
self._balance()
return self.front.pop()
def __getitem__(self, i: int):
l = len(self)
if i < -l or l <= i:
raise IndexError("deque index out of range")
i = i if i >= 0 else i + l
if i < len(self.front):
return self.front[~i]
else:
return self.back[i - len(self.front)]
def __len__(self) -> int:
return len(self.front) + len(self.back)
# ごりちゃんさんのDeque
class RingBufferDeque:
def __init__(self, src_arr=[], max_size=300000):
self.N = max(max_size, len(src_arr)) + 1
self.buf = list(src_arr) + [None] * (self.N - len(src_arr))
self.head = 0
self.tail = len(src_arr)
def __index(self, i):
l = len(self)
if not -l <= i < l:
raise IndexError("index out of range: " + str(i))
if i < 0:
i += l
return (self.head + i) % self.N
def __extend(self):
ex = self.N - 1
self.buf[self.tail + 1 : self.tail + 1] = [None] * ex
self.N = len(self.buf)
if self.head > 0:
self.head += ex
def is_full(self):
return len(self) >= self.N - 1
def is_empty(self):
return len(self) == 0
def append(self, x):
if self.is_full():
self.__extend()
self.buf[self.tail] = x
self.tail += 1
self.tail %= self.N
def appendleft(self, x):
if self.is_full():
self.__extend()
self.buf[(self.head - 1) % self.N] = x
self.head -= 1
self.head %= self.N
def pop(self):
if self.is_empty():
raise IndexError("pop() when buffer is empty")
ret = self.buf[(self.tail - 1) % self.N]
self.tail -= 1
self.tail %= self.N
return ret
def popleft(self):
if self.is_empty():
raise IndexError("popleft() when buffer is empty")
ret = self.buf[self.head]
self.head += 1
self.head %= self.N
return ret
def __len__(self):
return (self.tail - self.head) % self.N
def __getitem__(self, key):
return self.buf[self.__index(key)]
def __setitem__(self, key, value):
self.buf[self.__index(key)] = value
def __str__(self):
return "Deque({0})".format(str(list(self)))
import time
from collections import deque
import random
import matplotlib.pyplot as plt
import numpy as np
import statistics
# 特定操作の時間計測
def test_operation(deque_obj, operation, n):
start_time = time.perf_counter()
for _ in range(n):
if operation == "append":
deque_obj.append(random.randint(-100, 100))
elif operation == "appendleft":
deque_obj.appendleft(random.randint(-100, 100))
elif operation == "pop":
if len(deque_obj) > 0:
deque_obj.pop()
elif operation == "popleft":
if len(deque_obj) > 0:
deque_obj.popleft()
elif operation == "random_access":
if len(deque_obj) > 0:
index = random.randrange(len(deque_obj))
_ = deque_obj[index]
return time.perf_counter() - start_time
def run_tests(deque_class, size, operations, repetitions, num_trials=5):
results = {}
for op in operations:
op_times = []
for _ in range(num_trials):
deque_obj = deque_class(list(range(size)))
op_times.append(test_operation(deque_obj, op, repetitions))
results[op] = {
"mean": statistics.mean(op_times),
"stdev": statistics.stdev(op_times) if len(op_times) > 1 else 0,
}
return results
# テスト結果の描画関数
def plot_results(sizes, std_results, rb_results, ts_results, operations):
fig, axs = plt.subplots(
len(operations), 1, figsize=(10, 4 * len(operations)), sharex=True
)
fig.suptitle("Deque Performance Comparison", y=1.02)
x = np.arange(len(sizes))
width = 0.25
for i, op in enumerate(operations):
std_times = [std_results[size][op]["mean"] for size in sizes]
rb_times = [rb_results[size][op]["mean"] for size in sizes]
ts_times = [ts_results[size][op]["mean"] for size in sizes]
std_err = [std_results[size][op]["stdev"] for size in sizes]
rb_err = [rb_results[size][op]["stdev"] for size in sizes]
ts_err = [ts_results[size][op]["stdev"] for size in sizes]
axs[i].bar(
x - width,
std_times,
width,
yerr=std_err,
label="Std Deque",
color="blue",
capsize=5,
)
axs[i].bar(
x,
rb_times,
width,
yerr=rb_err,
label="Ring buffer Deque",
color="green",
capsize=5,
)
axs[i].bar(
x + width,
ts_times,
width,
yerr=ts_err,
label="Two Stack Deque",
color="red",
capsize=5,
)
axs[i].set_ylabel("Time (s)")
axs[i].set_title(f"{op} Operation")
axs[i].set_xticks(x)
axs[i].set_xticklabels(sizes)
axs[i].legend(loc="upper left")
# random_accessの場合のみ対数スケールを使用
if op == "random_access":
axs[i].set_yscale("log")
else:
axs[i].set_yscale("linear")
plt.xlabel("Number of Elements")
plt.tight_layout()
plt.subplots_adjust(top=0.95)
plt.show()
# テスト実行
sizes = [1000, 10000, 100000, 1000000]
operations = ["append", "appendleft", "pop", "popleft", "random_access"]
repetitions = 10000
num_trials = 100 # 試行回数
std_results = {}
rb_results = {}
ts_results = {}
for size in sizes:
print(f"Testing with {size} elements:")
std_results[size] = run_tests(deque, size, operations, repetitions, num_trials)
rb_results[size] = run_tests(
RingBufferDeque, size, operations, repetitions, num_trials
)
ts_results[size] = run_tests(
TwoStackDeque, size, operations, repetitions, num_trials
)
# print("Standard deque results:", std_results[size])
# print("Ring Buffer deque results:", rb_results[size])
# print("Two Stack deque results:", ts_results[size])
# print()
# 結果を描画
plot_results(sizes, std_results, rb_results, ts_results, operations)
テスト結果
append
, appendleft
, pop
, popleft
においては標準ライブラリdequeが圧勝ですね。ランダムアクセスを犠牲にしてdequeインターフェースで要求される基本操作だけに特化したような感じなんでしょうか(参考:python - Why is deque implemented as a linked list instead of a circular array? - Stack Overflow)。
今回の2スタックdequeはごりちゃんさんdequeに比べてappend
, appendleft
, pop
, popleft
で多少早くなっています。ただ pop
, popleft
は_balance
(ごりちゃんさんdequeなら__extend
)が生じやすい条件で結構変わってくると思います。
ランダムアクセスについては2スタックdequeとごりちゃんさんdequeとの差はほぼなさそうですね。まあindex計算と配列参照という同じことをしているのでそれはそう。
余談
今回の実装はSWAG(Sliding Window Aggregation)の実装を見直しているときに思いついたものです(後でちゃんと調べたら車輪の再発明だったけど)。見比べていただければわかりますがSWAGの実装からfoldに必要な構成を取り除いただけです。tolist
やreverse
なども直感的に実装できるし汎用性も高そうで、なによりわかりやすくてソラでかけそう(個人の感想)と思ったのでちゃんと実装してみました。
ミスやバグなどあったら教えてくださると嬉しいです❗一応次の問題は通せてます。
今回は要素数が偏ることのデメリットよりも実装の簡潔さを優先したので、片方が空になったときだけ_balance
を呼んでいますが、片方が所定の要素数(例えばもう一方の1/3以下など)になったら_balance
の処理を行うとより要素数の平衡を保てメモリ効率がよくなると思います(毎回_balance
呼ぶので速度は悪化しますが)。
こちらが把握できていないpros・consやメソッドの追加、他実装の提案などもお待ちしています。
皆さんの競プロライフの一助となれば幸いです。読んでくださりありがとうございました(いいね押してくれるとより嬉しいです🙏)
参考
- Pythonで各要素にO(1)でランダムアクセスできるdeque(両端キュー)を書いてみた - ごりちゃんがゆく
- Python3のdequeの実装を調べる: #時間計算量 - Qiita
- python - Why is deque implemented as a linked list instead of a circular array? - Stack Overflow
-
実用上はごりちゃんさんの実装で困ることはないので、そんな実装もあるんだな~という気持ちで読んでください。類似の実装に関する実装や計算量解析についてはOpen Data Structuresなどの資料をご参考ください ↩
-
このような操作を行う先行例として、永続データ構造 - Wikipedia内に「銀行家のキュー(banker's queue)」が記載されています。しかしこれが単なるqueueの実装なのか、永続queueの実装の呼び名なのかは不明瞭です(「銀行家」は償却分析のbanker法から採っている?:9.3.3. Bankers and Physicists · Functional Programming in OCaml)。 ↩