Pythonの競技プログラミング用ライブラリの整備を行いました。
ライブラリ魔改造の過程で高速化とコード長短縮法のノウハウが溜まったので、備忘録として残しておきます。
なにかのお役にたてば幸いです。
更新履歴
2024/10/19 最小費用流の実装例のBellman-Ford法を修正しました。
参考文献
特に参考にさせていただきました。
ありがとうございました。
Library Checker
ACL-for-python
プログラミングコンテストチャレンジブック [第2版] 問題解決のアルゴリズム活用力とコーディングテクニックを鍛える
基本テクニック
非負整数のビットシフト積み込み
複数の 非負整数 をタプルで格納する場合、代わりに$64$bit整数に収まるように積み込めば、多くの場合で高速化ができます。
積み込んだ値の復元にはビットシフトとビットマスクを用います。
ビットマスクの値を埋め込む際はhex( 2 ** x - 1 )
で16進数変換すれば簡潔です。
#積み込みたい2 ^ 31未満の非負整数2値を指定
Ai, Bi = 12345, 6789
#ビットシフトで31bit刻みに積み込み
X = Ai << 31 | Bi
#値の復元: ビットマスクを取る
mask = 2 ** 31 - 1
Aj, Bj = X >> 31, X & mask
assert Ai == Aj and Bi == Bj
#ビットマスクの埋め込みは、かわりに16進数表記を使うと簡潔
print( hex( (1 << 31) - 1 ) ) #'0x7fffffff'
Ak, Bk = X >> 31, X & 0x7fffffff
assert Ai == Ak and Bi == Bk
特にheapq
ではタプル比較より整数比較のほうが高速なので、整数積み込みが活きます。
より上位桁にある値を優先して比較します。比較順に注意して積み込みましょう。
import heapq
Q = [(5, 3), (10, 2), (20, 4)]
heapq.heapify(Q)
Ai, Bi = heapq.heappop(Q)
print(Ai, Bi) #5, 3
Q = [ ( 5 << 31 | 3 ), ( 10 << 31 | 2 ), ( 20 << 31 | 4 ) ]
heapq.heapify(Q)
X = heapq.heappop(Q)
Ai, Bi = X >> 31, X & ( 2 ** 31 - 1 )
print(Ai, Bi) #5, 3
メモリ消費量は$3$割程度減ります。実行時間の減少幅はまちまちです。
セイウチ演算子
:=
でカッコの中でも変数代入が行えます。
if
やwhile
でも代入ができるので、コードゴルフだけでなくランダムアクセス回数の削減にも一役買います。
今後のライブラリで多用します。
#累積和
A = [3, 1, 4, 1, 5]
C = [Ci := 0] + [Ci := Ci + Ai for Ai in A]
print(C) #[0, 3, 4, 8, 9, 14]
print(Ci) #14: 内包表記で用いた Ai と異なり、スコープ外に残る
#ランダムアクセス回数削減
i = 0
while i != ( Ai := A[i] ): #A[i]へのランダムアクセスを1回に抑える
print(i, Ai) #(0, 3) → (3, 1) → break
i = Ai
スタック高速化
たとえばDFS非再帰化ではスタックを用い、再帰情報をスタックにappend
ないしpop
することで定数倍改善をしています。
ここでスタックの最大長が予測できる場合、事前に配列を確保し、配列内参照で処理することで高速化できます。
使い終わった配列は他の用途にリサイクルしてあげると経済的です。
stack = [-1] * 10 #スタックは最大でも長さ10だと予測し、配列確保
stack[ d := 0 ] = 0 #d := スタックの末尾の要素
while d >= 0:
now = stack[d]
stack[ d := d + 1 ] = 2 #stack.append(2)
stack[ d := d + 1 ] = 9 #stack.append(9)
#while stack: X = stack.pop()
while d >= 0:
X = stack[d]
d -= 1
配列長に注意すればBFSもできます。
queue = [-1] * 10 #スタック高速化で使った配列と同様の大きさ
queue[0] = 0
Rt = 0 #キューのうち、使っている末尾の番号
for Lt, now in enumerate( queue ):
#if now < 5: queue.append(now + 1)
if now < 5:
queue[ Rt := Rt + 1 ] = now + 1
if Lt == Rt:
break
隣接リストの一次元化
頂点$0$から$N - 1$の$N$頂点$M$辺の有向グラフを与える。
$i$番目の辺は$U_i$→$V_i$である。
各頂点について、隣接リストを出力せよ。辺の列挙順は問わない。入力例:
$N, M = 4, 5$
$(U_i, V_i) = [(0, 2), (1, 3), (1, 2), (2, 3), (3, 0)]$
出力例:
$[[2], [3, 2], [3], [0]]$
二次元リストを使うのが一般的な方法です。
N, M = 4, 5
edges = [(0, 2), (1, 3), (1, 2), (2, 3), (3, 0)]
G = [[] for _ in range(N)]
for Ui, Vi in edges:
G[Ui].append(Vi)
print(G) #[[2], [3, 2], [3], [0]]
しかしこの方法ではappend
が必要になるので、append
の速度面でもメモリ面でも不経済です。
また、特に何度もランダムアクセスを行うとnxt = G[now][cur]
のように[now]
と[cur]
の$2$回のランダムアクセスが必要となり、積み重なると不利になります。
一次元化の手法はいろいろありますが、ここではfunctional graphによる手法を説明します。
- 頂点に対応する$N$個のノードと、辺に対応する$M$個のノード、合計$N + M$個のノードを用意します。
はじめ、すべてのノードは自分自身のノードに有向辺が向いています。つまり、自己ループです。 - 頂点
now
に、この頂点を起始とする辺i
を追加する
G[now].append(i)
に対応します。
頂点now
のノードと辺i
のノードのfunctional graphの移動先を交換します。 - 頂点
now
を起始とするすべての辺を列挙する
for nxt in G[now]:
に対応します。
頂点now
のノードからfunctional graph上で移動し、頂点now
のノードに戻ってくるまでに到達した辺番号を順に出力します。
新しく追加した辺番号から順に出力できます。
以上は追加空間$N + M$で実装できます。
なお、実装上は超頂点$-1$を終端とする、擬似的なfunctional graphを作る方が簡単だと思います。
変更点を 太字 で記します。
- 頂点に対応する$N$個のノードと、辺に対応する$M$個のノード、合計$N + M$個のノードを用意します。
はじめ、 頂点のノードは超頂点$-1$へ、 辺は自分自身のノードへ、それぞれ有向辺が向いています。 -
G[now].append(i)
頂点now
のノードと辺i
のノードのfunctional graphの移動先を交換します。 -
for nxt in G[now]:
頂点now
のノードからfunctional graph上を移動し、 次の移動先が超頂点$-1$となるまでに到達した辺番号を順に出力 します。
後者の$-1$を用いた実装例を示します。
N, M = 4, 5
edges = [(0, 2), (1, 3), (1, 2), (2, 3), (3, 0)]
#辺番号iはnow → nxtに辺を伸ばすものとする
#H[i]: 辺番号iの有向辺の到達先nxt
#F[i]: following : 辺番号iの有向辺のひとつ前の辺番号(なければ-1)
# 二次元リストでは G[now] = [・・・, F[i], i, ・・・] と昇順に並んでいる
#B[now]: base : 頂点nowが最後に採用した辺番号(なければ-1)
H = []
F = []
B = [-1] * N
#G[now].append(i)
for i, (now, nxt) in enumerate( edges ):
#H[i], F[i]を初期化。その後、B[now]とF[i]をswap
H.append(nxt)
F.append(i)
B[now], F[i] = F[i], B[now]
#隣接リストを復元する
new_G = [[] for _ in range(N)]
for now in range(N):
#for i in G[now]: にあたる。( nxt := H[i] )を隣接リストに追加する
i = B[now]
while i != -1:
new_G[now].append( nxt := H[i] )
i = F[i]
print(new_G) #[[2], [2, 3], [3], [0]]
その他のテクニックは、姉妹記事をご覧ください。
以降はライブラリ魔改造編となります。
二項係数
これは遅い実装です。ライブラリ化にはおすすめしません。
$1$から$n$までの$n$個の自然数から、$r$個を選ぶ場合の数 ${}_n C_k$を 素数$P$で 割った値を何度も計算したい状況を考えます。
階乗$x!$ mod $P$とその逆元$\frac{1}{x!}$ mod $P$の前計算がいりますが、ここにセイウチ演算子を用いた累積和の手法を使うとコード長を短縮できます。
なお便利のため、 $0! = 1$と定義します。
P = 998244353 #素数
N = 100 #nCr mod Pで計算したい最大のn
#fact[x]: x! mod P
#finv[x]: 1 / x! mod P と定義する場合
fact = [ v := 1 ] + [ v := v * i % P for i in range(1, N + 1) ]
finv = [ v := pow(v, -1, P) ] + [ v := v * i % P for i in range(N, 0, -1) ]
finv.reverse()
#finv[- x]: 1 / x! mod P と定義する場合: reverse()を省略できる
fact = [ v := 1 ] + [ v := v * i % P for i in range(1, N + 1) ]
finv = [1] + [ v := pow(v, -1, P) ] + [ v := v * i % P for i in range(N, 1, -1) ]
二項係数 実装例
遅さの原因はリストの連結にありました。
[v := 1] + [v := v * i % P for ~]
の+
で損しているので、使わないようにしましょう。
以下の実装例では早い版を載せておきます。(終)
実装例(早い版): LC Binomial Coefficient(Prime mod) 771ms, 230Mb
実装例(遅い版): LC Binomial Coefficient(Prime mod) 896ms, 340Mb
#nCr mod P
class nCr_mod_P:
def __init__(self, maxN: int, P: int):
self.P = P
self._fact = [1] * (maxN + 1)
self._finv = [1] * (maxN + 1)
v = 1
for i in range( 2, maxN + 1 ):
self._fact[i] = v = v * i % P
self._finv[-1] = v = pow(v, -1, P)
for i in range( maxN, 1, -1 ):
self._finv[i - 1] = v = v * i % P
def fact(self, v: int): #v! mod P ただし 0! = 1とする
return self._fact[v]
def finv(self, v: int): #1 / v! mod P
return self._finv[v]
def nCr(self, n: int, r: int): #nCrを計算
assert 0 <= n and 0 <= r
if n < r:
return 0
else:
return self._fact[n] * self._finv[n - r] % self.P * self._finv[r] % self.P
UnionFind
$N$頂点$0$辺の無向グラフに無向辺を追加したときの連結判定を行います。
find
で親を探し、unite
で$2$頂点を連結するのが基本機能です。
経路圧縮
並列代入で再帰処理やスタックを回避できます。
コメントの通り、 代入順に注意してください。
#頂点Viの親を探し、経路圧縮する
def find(self, Vi: int):
Pi = Vi
#1. Viの親Piを探す
while self._parent[Pi] >= 0:
Pi = self._parent[Pi]
#2. Vi → Piの経路上のすべての頂点に対し、親をPiに変更する
while self._parent[Vi] >= 0:
#parent[Vi] ← Pi と Vi ← parent[Vi] を同時に行う
#代入順に注意!左辺は parent[Vi], Vi の順にすること(逆だと壊れる)
self._parent[Vi], Vi = Pi, self._parent[Vi]
return Pi
連結成分の列挙
unite
操作の時にfunctional graphをswapすることで、連結成分が列挙できます。
追加で空間$N$が必要になるほか、列挙順は「最初は自分自身」以外きまりがないので注意してください。
実装例ではself._next[now]
がこれを担います。
計算量はクエリあたり$O(連結成分数)$です。
def __init__(self, N):
self._next = [i for i in range(N)] #同一連結成分の次の頂点番号
def unite(self, Ui, Vi):
#1. Ui, Viを親に置換しつつ、異なる連結成分かを評価
if ( Ui := self.find(Ui) ) == ( Vi := self.find(Vi) ):
return False
#2. union by size Uiがより大きいサイズの親になるようswapしてからマージ
if self._parent[Ui] > self._parent[Vi]:
Ui, Vi = Vi, Ui
self._parent[Ui] += self._parent[Vi]
self._parent[Vi] = Ui
#3. connect(要素の列挙)用の配列をswap
self._next[Ui], self._next[Vi] = self._next[Vi], self._next[Ui]
return True
#for v in self.connect(Vi): で、頂点Viの連結成分の要素を列挙する Vi以降は順不同
def connect(self, Vi):
#1. 頂点Viを出力しつつ、Piに頂点Viを代入
yield ( Pi := Vi )
#2. 頂点Viに戻ってくるまで、次の頂点に移動
while ( Pi := self._next[Pi] ) != Vi:
yield Pi
親の列挙
使う機会がほぼない機能ですが、一応追加空間$N$で達成できます。
常に親と判定する 超頂点に対応するノード$N$を含めた、$N + 1$ノードのfunctional graphを用意します。
はじめ、ノード$n$は$(n + 1)$ $mod$ $(N + 1)$に有向辺を張っています。
親の列挙クエリのたび、以下のように移動と有向辺の張り直しを行えばよいです。
超頂点に対応するノード$N$に駒を置く。
駒がノード$N$に戻ってくるまで、以下のステップを繰り返す:
- 駒の現在値をノードbackとして記録する。
- 駒が親に到達するまで、駒を有向辺に沿って $1$回以上 動かす。
- ノードbackの有向辺を、「back → 駒の現在値」に修正する。
- 現在値が超頂点なら終了。そうでなければ、頂点番号を出力し1.に戻る。
計算量はならし$O(親の個数)$と呼んでよいかと思います。
実装上は超頂点を持たないほうが丁寧です。
def __init__(self, N):
self._direct = [i + 1 for i in range(N)] #右側最寄りの親
self._Lt = 0 #暫定的な最左の親番号
#for v in self.leader: で、親(代表値)を昇順に列挙 ならしO(親の数)
@property
def leader(self):
#1. 最も番号の小さい親を探す
while self._parent[ self._Lt ] >= 0: #子なら
self._Lt = self._direct[ self._Lt ] #最寄りの親に移動
#2. self._directの情報を更新しながら、超頂点Nまで移動
back = now = self._Lt
while now != N:
#a) 次の頂点に移動する。Nに到達するか、親になったらbreak
while ( now := self._direct[now] ) != N and self._parent[now] >= 0:
pass
#b) 親back → 親nowに辺を張り直し
self._direct[back] = now
#c) backを出力してから、back ← now
yield back
back = now
連結成分の列挙と親の列挙は稀にしか使わない機能なので、ライブラリ上は整数のビットシフト積み込みでひとつの配列に圧縮したほうが良いでしょう。
UnionFind 実装例
リンクの圧縮版は、可読性を犠牲にライブラリの幅を削減したものです。(終)
提出例: ACLPC-A Disjoint Set Union
圧縮版: LC UnionFind
class UnionFind:
def __init__(self, N: int):
assert 0 <= N < 1 << 31
self._parent = [-1] * N #負なら親で、要素数 正なら子で、ひとつ親の頂点番号
self._next = [i for i in range(N)] #同一連結成分の次の頂点番号
self._direct = [i + 1 for i in range(N)] #右側で最寄りの親 親の間のみ更新する
self._Lt = 0 #暫定的な最左の親番号
#頂点Viの親を探し、経路圧縮する
def find(self, Vi: int):
Pi = Vi
#1. Viの親Piを探す
while self._parent[Pi] >= 0:
Pi = self._parent[Pi]
#2. Vi → Piの経路上のすべての頂点に対し、親をPiに変更する
while self._parent[Vi] >= 0:
#parent[Vi] ← Pi(経路圧縮) と Vi ← parent[Vi](ひとつ親に移動) を同時に行う
#代入順に注意!左辺は parent[Vi], Vi の順にすること(逆だと壊れる)
self._parent[Vi], Vi = Pi, self._parent[Vi]
return Pi
def unite(self, Ui: int, Vi: int):
#1. Ui, Viを親に置換しつつ、異なる連結成分かを評価
if ( Ui := self.find(Ui) ) == ( Vi := self.find(Vi) ):
return False
#2. union by size Uiがより大きいサイズの親になるようswapしてからマージ
if self._parent[Ui] > self._parent[Vi]:
Ui, Vi = Vi, Ui
self._parent[Ui] += self._parent[Vi]
self._parent[Vi] = Ui
#3. connect(要素の列挙)用の配列をswap
self._next[Ui], self._next[Vi] = self._next[Vi], self._next[Ui]
return True
def same(self, Ui: int, Vi: int):
return self.find(Ui) == self.find(Vi)
def size(self, Vi: int):
return - self._parent[ self.find(Vi) ]
#for v in self.connect(Vi): で、頂点Viの連結成分の要素を列挙する Vi以降は順不同
def connect(self, Vi: int):
#1. 頂点Viを出力
yield ( Pi := Vi )
#2. 頂点Viに戻ってくるまで、次の頂点に移動
while ( Pi := self._next[Pi] ) != Vi:
yield Pi
#for v in self.leader: で、親(代表値)を昇順に列挙 ならしO(親の数)
@property
def leader(self):
#1. 最も番号の小さい親を探す
while self._parent[ self._Lt ] >= 0: #子なら
self._Lt = self._direct[ self._Lt ] #当時の記録を頼りに最寄りの親に移動
#2. self._directの情報を更新しながら、超頂点Nまで移動
back = now = self._Lt
while now != N:
#a) 次の頂点に移動する。Nに到達するか、親になったらbreak
while ( now := self._direct[now] ) != N and self._parent[now] >= 0:
pass
#b) 親backから親nowの間には子しかないので、self._direct[back]を経路圧縮
self._direct[back] = now
#c) backを出力してから、back ← now
yield back
back = now
ダイクストラ法
$N$頂点$M$辺の有向グラフにおける、$S$-$T$最短路を求めます。
実装方針の比較
過去記事でも触れましたが、heapq
を使うよりセグメント木を使う方が高速な印象です。
特にheapq
はビットシフトによる整数積み込みができないと急激に遅くなる傾向があるので、ライブラリ化するならセグメント木の一考の余地があるかと思います。
速度比較 (Library Checker)
Library Checker: Shortest Pathで比較しました。
自作二分ヒープ以外は同程度でしょうか。
heapq タプルで比較(1077ms)
heapq ビットシフト(1108ms)
自作二分ヒープ(1681ms)
セグメント木 高速化案全乗せ(1134ms)
なお、PyPy3 Fastestはheapq
を使っているので使い分けだと思います。(終)
セグメント木 実装概略
最小費用流でも触れる部分なので、セグメント木を用いたダイクストラ法に触れておきます。
辺のコストは非負とします。また後述する高速化のため、 木のサイズは$2$冪を仮定します。
頂点数$N$に対し、木のサイズを$N$以上最小の$2$冪と定義します。
特に、2 ** (N - 1).bit_length()
と計算できます。
ダイクストラ用のセグメント木と、最短距離記録用の$2$個の配列を用意します。
ダイクストラ法で必要なセグメント木の機能は一点更新・全体min・最小値のノード取得です。
これは一点更新・区間最小値のセグメント木を改造すればよいです。
注意点として、 最小値ノードの取得後は、セグメント木のノードは距離infに更新してください。 これにより、未pop最小値の取得が可能となります。
ここまでを実装してみます。
class dijkstra:
def __init__(self, N: int, inf = '4 * 10 ** 18'):
self.N = N
self.inf = inf if type(inf) != str else 4 * 10 ** 18
self._size = 1 << (N - 1).bit_length()
self._dist = [self.inf] * N
self._node = [self.inf] * 2 * self._size
#内部関数 for now in dijkstra: で距離最小順に頂点を列挙
def __iter__(self):
return self
def __next__(self):
if ( now := self.get() ) == -1:
raise StopIteration
else:
return now
def chmin(self, i: int, v): #chmin( dist[i] ← v )
if self._dist[i] > v:
self._dist[i] = self._node[ x := i | self._size ] = v
while x != 1:
x >>= 1
self._node[x] = min( self._node[x << 1], self._node[x << 1 | 1] )
def get(self): #最小値を満たすノードnowを取得し、セグメント木内だけinfに更新
if ( v := self._node[ x := 1 ] ) == self.inf:
return -1
while x < self._size:
x <<= 1
if self._node[x] != v:
x |= 1
now = x ^ self._size
assert self._dist[now] == v
self._node[x] = self.inf
while x != 1:
x >>= 1
self._node[x] = min( self._node[x << 1], self._node[x << 1 | 1] )
return now
def dist(self, i: int): return self._dist[i]
高速化案はおもに$3$点です。
chmin高速化
セグメント木の性質上、一度でもchminで更新できなければ以降の更新は起きえません。
なので途中で打ち切るようにすれば、計算量が$Θ(logN)$から$O(logN)$になります。
def chmin(self, i: int, v): #chmin( dist[i] ← v )
if self._dist[i] > v:
self._dist[i] = self._node[ x := i | self._size ] = v
while x != 1:
x >>= 1
if self._node[x] > v:
self._node[x] = v
else:
break
高さの制限
ノードを左詰めに割り当てることで、セグメント木に必要な高さを$Θ(logN)$から$O(logN)$に抑えられます。
- 利点: まあまあ早くなる、パスグラフだと$O(N)$でオーダー改善
- 欠点: $Θ(logN)$ケースが作れる、追加メモリ$size + N$程度、追加ランダムアクセス$O(N+M)$回、実装が大変
def __init__(self, N: int, inf = '4 * 10 ** 18'):
#rev[i]: 写像 top, free, update: 最高ノード, 次の空き, top上昇のフラグ
self._rev = [-1] * self._size + [ i + self._size + 1 for i in range(N) ]
self._top, self._free, self._update = self._size, self._size, self._size + 1
def chmin(self, i: int, v): #chmin( dist[i] ← v )
if self._dist[i] > v:
#頂点iに対応するノードを rev[i] で調べる
if ( x := self._rev[i] ) == -1: #ノードをはじめて割り当てる場合
if self._free == self._update: #topの変更がいる場合
self._top >>= 1
self._node[ self._top ] = self._node[ self._top << 1 ]
self._update += ( LSB := self._update & - self._update )
x = self._rev[i] = self._free #空きノードを割り当て
self._free, self._rev[x] = self._rev[x], i
assert self._rev[x] == i and self._rev[i] == x
#ノードxを更新
self._dist[i] = self._node[x] = v
while x != self._top:
x >>= 1
if self._node[x] > v:
self._node[x] = v
else:
break
遅延更新
総積(全区間min)を取得する時にまとめて更新すれば、総積取得$1$回あたりの計算量を抑えられます。
- 利点: わずかに早くなる、密グラフだと$O( min(N^2,MlogN) )$でオーダー改善、実装が楽
- 欠点: 追加メモリ$size + N$程度、追加ランダムアクセス$O(MlogN)$回
def __init__(self, N):
self._lazy = [0] * ( self._size + N )
self._tail = 0
def chmin(self, i: int, v): #chmin( dist[i] ← v )
if self._dist[i] > v:
#(中略 対応するnode[x]を割り当て)
self._dist[i] = self._node[x] = v
if self._lazy[x] == 0: #更新予定がなかった場合、更新フラグを立てる
self._lazy[ self._tail ] = self._lazy[x] = self._tail = x
def get(self): #最小値を満たすノードnowを取得し、セグメント木内だけinfで更新
head = 0
while head != self._tail: #更新箇所をheadとする
self._lazy[head], head = 0, self._lazy[head] #次の更新箇所に移動しつつ初期化
v = self._node[head]
if head != self._top and v < self._node[ nxt := head >> 1 ]: #親の更新
self._node[nxt] = v
if self._lazy[nxt] == 0:
self._lazy[ self._tail ] = self._lazy[nxt] = self._tail = nxt
self._lazy[ self._tail ] = self._tail = 0
好みに合わせて選んでください。
ダイクストラ法(セグメント木) 実装例
計測上は遅延更新の高速化効果は薄かったです。
とはいえ実行速度の枷にもなっていないので、密グラフ対策として盛り込むのはありだと感じます。(終)
実装例: LC Shortest Path(3種盛り)
class dijkstra:
def __init__(self, N: int, inf = '4 * 10 ** 18'):
self.N = N
self.inf = inf if type(inf) != str else 4 * 10 ** 18
self._size = 1 << (N - 1).bit_length()
self._dist = [self.inf] * N
self._node = [self.inf] * ( self._size + N + (N & 1) )
#高さの制限: 追加
self._rev = [-1] * self._size + [ i + self._size + 1 for i in range(N) ]
self._top, self._free, self._update = self._size, self._size, self._size + 1
#遅延更新: 追加
self._lazy = [0] * ( self._size + N )
self._tail = 0
#内部関数
def __iter__(self):
return self
def __next__(self):
if ( now := self.get() ) == -1:
raise StopIteration
else:
return now
def chmin(self, i: int, v): #chmin( dist[i] ← v )
if self._dist[i] > v:
#高さの制限: 追加
if ( x := self._rev[i] ) == -1: #ノードをはじめて割り当てる場合
if self._free == self._update: #topの変更がいる場合
self._top >>= 1
self._node[ self._top ] = self._node[ self._top << 1 ]
self._update += ( LSB := self._update & - self._update )
x = self._rev[i] = self._free #空きノードを割り当て
self._free, self._rev[x] = self._rev[x], i
assert self._rev[x] == i and self._rev[i] == x
self._dist[i] = self._node[x] = v
#遅延更新: 変更
if self._lazy[x] == 0: #更新予定がなかった場合、更新フラグを立てる
self._lazy[ self._tail ] = self._lazy[x] = self._tail = x
def get(self): #最小値を満たすノードnowを取得し、セグメント木内だけinfで更新
#遅延更新: 追加
head = 0
while head != self._tail: #更新箇所をheadとする
self._lazy[head], head = 0, self._lazy[head] #次の更新箇所に移動しつつ初期化
v = self._node[head]
if head != self._top and v < self._node[ nxt := head >> 1 ]: #親の更新
self._node[nxt] = v
if self._lazy[nxt] == 0:
self._lazy[ self._tail ] = self._lazy[nxt] = self._tail = nxt
self._lazy[ self._tail ] = self._tail = 0
if ( v := self._node[ x := self._top ] ) == self.inf: #高さの制限: 変更
return -1
while x < self._size:
if self._node[x := x << 1] != v: x |= 1
#高さの制限: 変更
now = self._rev[x]
assert self._dist[now] == v
assert self._rev[now] == x and self._rev[x] == now
self._node[x] = self.inf
self._rev[now], self._rev[x] = -1, self._free
self._free = x #空きノードをリサイクルできるように登録
while x != self._top:
x >>= 1
self._node[x] = min( self._node[x << 1], self._node[x << 1 | 1] )
return now
def dist(self, i: int): return self._dist[i]
SCC
@AkariLuminous(Akari) さんのSCCの記事を参考にしています。
記事にあるTarjanのアルゴリズムで、DFSを$1$回に抑えた実装を行います。
Tarjanのアルゴリズム 自分用メモ
自分用のメモです。
乱文なので、読み飛ばしてください。
Tarjanのアルゴリズムでは、DFS到達順を示すorder[now]
と、後退辺・横断辺を使って戻れる最も浅い SCC id未確定の 頂点を示すlowlink[now]
の値を管理する。
この正当性について議論する。
簡単のため、適当に頂点R(root)を取り、RからのBFSで通過できる頂点・辺のみを抜き出した部分有向グラフを対象に考える。
RからのDFS有向木をひとつ取ると、強連結成分ごとに SCC root というDFS木中最も浅い頂点を一意に定義できる。
(略証: 同一SCCの2頂点$U_i$, $V_i$であって、最小共通祖先$P_i$は異なるSCCに属するものがあると仮定する。DFS探索順を考えると、$U_i$と$V_i$を結ぶ横断辺のどれかは木辺となるはずなので矛盾する)
このSCC rootの性質がとても偉い。
「SCC rootなら、自身(とその子孫から)BFSをしても、到達できる最も浅い頂点は自身である」
「SCC root でない なら、自身より真に浅い頂点に到達できる」と判定が可能となる。
次に、SCC rootの特定のため、order値とlowlink値を各頂点に割り振る。
このorder-lowlink値を考える。
DFS有向木で採用した辺を木辺、木辺でない辺を先行辺(子孫に向かう)・後退辺(先祖に向かう)・横断辺(先祖・子孫の関係にない2頂点を横断する)と分類する。
先行辺は木辺で考慮しているので、無視してよい。
まず横断辺を無視して(木辺と)後退辺のみのグラフで考えると、後退辺を使って到達できる最も浅い頂点の情報をDFS帰りがけに伝播すれば適切に処理できる。
次に横断辺をDFS順に考慮する。
ここで、DFS木の探索順上、 横断辺はDFS到達順が後の頂点から先の頂点にしか出ない 点に注意したい。
もしも横断辺の移動先の頂点にまだSCC idが割り当てられていない場合、後退辺・横断辺を用いることで自身との共通祖先のどれかに戻れることを意味する。
なので、少なくとも最小共通祖先まではSCC rootではないと評価できた。
元のグラフに戻ると、以上のアルゴリズムを反復することでトポロジカルソートの逆順にSCCの判定が行える。(終)
Tarjanのアルゴリズムを非再帰DFSで実装すれば、十分高速なSCCアルゴリズムが得られます。
さらに高速化したい場合は次の工夫をご検討ください。
- 隣接リストの一次元化
実装にもよりますが、二次元リストへのappend
のコストや辺番号へのランダムアクセス回数を落とせます。 - SCC idとlowlink値の配列共有
lowlink値はSCC id決定前まで必要になるので、配列を共有できます。
SCC idはビット反転して負値で管理すると便利です。
SCC 実装例
make_adjacency_list
は、縮約後の二次元リストを作る機能です。
計算量は$Θ(N+M)$で、set
を使わず実装しているのが嬉しいポイントです。
ぜひご利用ください。(終)
class SCC_Tarjan:
'''
線形時間でSCCを行います。
add_edgeで辺を追加し、self.SCC( make_adjacency_list = True )で構築してください。
make_adjacency_listは自動でDAGの隣接リストを作成する機能です。
self.n: 強連結成分の個数
self.id[now]: 頂点nowのSCC id
self.team[i]: トポロジカルソート順がiの強連結成分の頂点一覧
self.G[now]: make_adjacency_list == Trueの時のみ作成。強連結成分のDAGの隣接リスト
'''
def __init__(self, N: int, G: list = None):
#E[i]: edge : 辺iの停止(nxt)
#P[i]: post : 辺iと起始nowが同じ辺であって、iよりも小さい最大の辺番号
# 二次元配列との比較: G[now](sorted) = [・・・, P[i], i, ・・・], P[i] < i
#B[now]: base : 頂点nowに起始する辺番号のうち最大のもの 辺がない場合、-1
#id[now]: SCC中はlow[i]と配列を共有し、非負ならlowlinkとする
self.N = N
self.n = 0
self._E = []
self._P = []
self._B = [-1] * N
self.id = [0] * N
self.team = []
if G != None: #隣接リストを読み込む場合
for now in range(N):
for nxt in G[now]:
self.add_edge(now, nxt)
#SCC主要部を内部関数化しておく
def _SCC(self, make_adjacency_list = True):
#order[now]: 頂点nowのorder値
#low[now]: idと同じ配列を使用 SCC_idが決まるまではlowlink値、決定後は~ SCC_id
#stack[d]: DFS用stack 今の深さをdとして、到達順に頂点を格納する
#Q[q]: ordering stack SCC_idが未確定の頂点を到達順に格納
N, E, P, B, id = self.N, self._E, self._P, self._B, self.id
order = [-1] * N
low = self.id
stack = [0] * N
Q, q = [0] * N, -1
c = -1
if make_adjacency_list:
A = B[:] #配列Bをコピーしておく
for v in range(N):
if id[v] < 0: #頂点vにSCC idが割り振り済
continue
#DFSを行う d := stack DFSの深さ
stack[ d := 0 ] = v
while True:
if order[ now := stack[d] ] == -1: #初訪問時の処理
order[now] = low[now] = ( c := c + 1 ) #到達順を付与
Q[ q := q + 1 ] = now #SCC id未確定頂点のstackにも追加
else:
B[now] = P[ B[now] ] #帰りがけだった場合の処理
#頂点nowに起始する辺iを降順列挙
while ( i := B[now] ) != -1:
if order[ nxt := E[i] ] == -1: #未到達頂点なら移動
stack[ d := d + 1 ] = nxt
break
else: #到達済みなら、lowlink値を更新してから次の辺へ
low[now] = min( low[now], order[nxt] )
B[now] = P[i]
else: #whileを完走したなら
assert B[now] == -1
#order[now] == lowlink[now]なら、nowはSCC root
if ( cur := order[now] ) == ( Ln := low[now] ):
self.team.append([])
while True: #SCC idを割り振る
nxt = Q[q]
order[nxt] = N
id[nxt] = ~ self.n
q = q - 1
self.team[-1].append(nxt)
if nxt == now:
break
self.n += 1
#戻りがけにlowlink[back]へ波及
if now == v:
assert d == 0
break
back = stack[ d := d - 1 ]
low[back] = min( low[back], Ln )
#トポロジカルソート順になるよう最後に修正
for v in range(N):
id[v] += self.n
self.team.reverse()
if make_adjacency_list: #縮約後の隣接リストを作成(Q, stackを破壊的に再利用する)
self.G = [[] for _ in range( self.n )]
#Q[j]: 強連結成分iから強連結成分jに到達できるならTrue
for j in range( self.n ):
Q[j] = False
for i, Ti in enumerate( self.team ):
Q[i] = True #強連結成分iからiへは辺を張りたくないので、到達済扱いとする
stack[ d := 0 ] = i #辺を張った扱いの強連結成分をstackに積む
for now in Ti:
while ( i := A[now] ) != -1:
nxt = T[i]
if Q[ j := id[nxt] ] == False: #i → jに辺を張ったことがなければ
Q[j] = True
stack[ d := d + 1 ] = j
self.G[i].append(j)
A[now] = P[i] #次の辺へ移動
#次の連結成分を見る前に、Qの状態を初期化
for d in range( d + 1 ):
Q[ j := stack[d] ] = False
#辺の追加
def add_edge(self, now: int, nxt: int):
assert now in range( self.N ) and nxt in range( self.N )
self._P.append( len(self._E) )
self._E.append(nxt)
self._P[-1], self._B[now] = self._B[now], self._P[-1]
#SCCを実行 make_adjacency_list == Trueの場合、DAGの隣接リストをself.Gに登録する
def SCC(self, make_adjacency_list = True): self._SCC( make_adjacency_list )
LC最速提出との比較 自分用メモ
勝手ながら執筆当時のLibrary Checker最速提出と比較すると、筆者の実装とはDFS探索手法と辺のイテレート方法が大きく異なります。
違いを詳しく見てみます。
筆者の実装ではDFSの最大深さを$N$で抑えるかわりに、「辺番号の変更はDFSで戻ってきてから行う」というルールを課しています。
一方最速実装ではよくEuler Tourでやるように、スタックに移動先の頂点を先に積む手法を取っています。
より厳密には、行きがけ時点でスタックに未到達頂点を積みます。
ここで重要なのが 後退辺と横断辺は到達済頂点への辺なので行きがけ時点で区別できる 点です。
これによりスタックから取り出す際には木辺と先行辺を区別するだけでよく、処理が軽く収まります。
結局のところ、スタックの最大深さ$N+M$を確保すれば、戻りがけのpop
処理が$1$回で済む点が魅力といえます。
辺の列挙は前処理をして高速化しています。
SCC開始前に$Θ(N+M)$かけて辺の起始をキーに辺をソートします。
すると辺列挙にrange
関数が使えるようになるのが魅力に感じます。
勝手ながら大変勉強にさせて頂きました。ありがとうございました。
細かな違いまで見るともっとありますが、今回はここまでとします。(終)
最大流
$N$頂点$M$辺の容量つきグラフに対し、頂点$S$から頂点$T$への最大流を求めます。
実装方針としてはDinic法を高速化します。
実装・計算量評価については以下の記事を参考にしています。
Dinic法 自分用メモ
自分用のメモです。
乱文なので、読み飛ばしてください。
Dinic法では以下のdual-primal stepを最大$N - 1$回行うことで、これを達成する。
- dual step: 残余グラフでの$S$→$T$最短路を求める。
- primal step: 最短路に限界までフローを流す。その後、順辺逆辺を更新する。
今回は1.をBFS、2.をDFSで実装する。
正当性
特に、Dinic法で(最短路昇順に) 貪欲に 流したフローが最大流を満たすことを示す。
任意の$S$→$T$フローと任意の$S$-$T$カットにおいて、頂点$S$側に属する$S$側集合から$T$側集合への(流出量 - 流入量)は$S$-$T$カット以下である。
Dinic法を行い、$S$から残余辺のみで到達できる頂点集合を$S$側集合、そうでない頂点集合を$T$側集合と割り振る。
すると$S$側集合→$T$側集合の順辺は容量限界までフローが流れており、逆辺にはフローが流れていないので (流出量 - 流入量) == $S$-$T$カット となる。
これは$S$-$T$カットの不等号の等号成立状態といえるので、Dinic法で得られたフローは最大流・最小カットを同時に満たす。
計算量
dual-primal step(BFS + DFS)を$1$セット行うために必要な計算量から評価する。
dual step(BFS)は明らかに$O(N + M)$である。
primal step(DFS)を評価する。探索順ではなく流れるフローに着目して訪問回数を数えると、DFSの計算量を2つに分けて評価できる。
- 頂点と辺を高々$1$回ずつ訪れる。
- $S$から$T$に流れたフロー$1$本あたり、経路中の頂点・辺を$1$回ずつ経由する。
1.の計算量は$O(N + M)$である。 2.を評価する。
まず、$1$本のフローが通過する頂点数・辺数は$O(N)$である。
次にフローの合計本数を評価する。primal step(DFS)ではフローを限界まで流すたび、$1$本以上の辺が容量限界を迎え、グラフから削除される。従ってフローの本数は$O(M)$本で抑えられる。
以上から2.の計算量は$O(NM)$となり、$1$回のDFSあたりの計算量評価$O(N + M + NM)$が得られる。
以上からdual-primal step(BFS + DFS)は$O(N + M + NM)$すなわち$O(NM)$と評価できた。
最後にdual-primal stepが何セット行われるか評価すると、$1$セット行うたびに最短路が$1$以上長くなることから、高々$O(N)$回の反復で最大流が得られる。
以上より、Dinic法の計算量は$O(N^2M)$である。
辺容量がすべて整数の場合
特に辺容量がすべて整数のとき、グラフに$2$冪単位降順にフローを流すことで、計算量を$O(NMlogC)$とできる($C$は最大辺容量)。
計算量を解析する。$2$冪単位降順に流すと、前回の$S$-$T$カットの辺には高々$1$回しかフローを流せないことから、今回の$2$冪単位で流せる 合計の フローの本数は前回の$S$-$T$カットの辺数$O(M)$で抑えられる。
primal-dual stepは合計で$O(N)$回行われることを考慮すると、この$2$冪単位あたりの計算量は
- BFS: $O(N)$回、$1$回あたり計算量$O(N + M)$
- DFS: $O(N)$回、$1$回あたり計算量$O(N + M)$
- フローの本数: Dinic法が終了するまでに$O(M)$本、$1$本あたり計算量$O(N)$
となり、合計で$O(NM)$と評価できる。
実際には容量スケーリングでこのステップを容量降順に$O(logC)$回実施するので、最終的な計算量評価は$O(NMlogC)$となる。(終)
実装において最重要なのが隣接リストの一次元化です。
Dinic法では辺番号にアクセスする回数が非常に多いので、G[now][cur]
の形式のランダムアクセスはそれだけで不利です。
その他の高速化案は以下の通りです。
- BFSとDFSは長さ$N$の配列$1$個を使い回すと経済的です。
- primal step(DFS)は適切に実装すると、$1$回のDFSで最短路にすべてのフローを流せます。
ナイーブな「流せたらDFSやり直し」方式と比べて定数倍に優れます。
最大流 実装例
容量スケーリングや小数容量に対応するために許容誤差機能をつけましたが、この実装で適切かの確認は行っておりません。ご容赦ください。(終)
class maxflow:
'''
燃やす埋める: S → A/B → G, S → Aを切るならS → Bも切る は A → B(inf)
最小流量制限: A → B(Lt ~ Rt)
A → B(Rt - Lt), St2 → B(Lt), A → Gl2(Lt) の辺を代わりに張る。
St2 → Gl2, St2 → Gl, St → Gl2, St → Gl の順に流し、St2, Gl2の辺がcap = 0ならOK
---
G[now]: (頂点nowに起始する辺番号)の二次元配列を回避し、定数倍高速化を目指します。
順辺の個数は self.M で取得できます。
'''
def __init__(self, N: int, inf = '4 * 10 ** 18'):
#edge[i << 1], edge[i << 1 | 1]: i個目の「順辺」のnow, nxt
#cap[i]: 辺iの容量
#post[i] : 辺番号iの起始をnowとしたとき、nowの前の辺番号
# 二次元配列との対比: G[now] = [・・・, post[i], i, ・・・]
#base[now] : 頂点nowのイテレート開始辺 len(G[now]) == 0の場合、-1
self.N = N
self.M = 0
self.inf = max( 0, inf ) if type(inf) != str else 4 * 10 ** 18
self._edge = []
self._cap = []
self._post = []
self._base = [-1] * N
#最大流計算でのみ使う配列もあらかじめ宣言する
self._dist, self._flow, self._leak = [N] * N, [0] * N, [0] * N
self._arrow, self._queue_and_stack = [-1] * N, [0] * N
#最大流計算 主要部を内部関数化
def _calc(self, St, Gl, permissible_error, flow_limit):
#dist[now]: StからのBFS上の最短距離
#flow[now], leak[now]: 頂点nowに流入したフロー・流出が確定したフロー
#arrow[now] : 頂点nowがちょうど今見ている辺番号
#queue, stack: 同一配列を参照 BFSではdequeの代用 DFSでは非再帰化に用いる
N = self.N
edge, cap, post, base = self._edge, self._cap, self._post, self._base
dist = self._dist
flow = self._flow
leak = self._leak
arrow = self._arrow
queue = stack = self._queue_and_stack
flow_limit = max( 0, flow_limit ) if type(flow_limit) != str else self.inf
pe = max( 0, permissible_error ) #許容誤差
ans = 0
#Dual-Primal step
while flow_limit > ans:
#Phase 1. BFS
for now in range(N): #初期化
dist[now] = N
dist[St] = 0
queue[ Rt := 0 ] = St
for Lt, now in enumerate( queue ):
arrow[now] = base[now] #BFS出現辺のみarrow[now]を初期化すると定数倍改善
x = dist[now] + 1 #次の距離
#now → nxtの辺を列挙
i = base[now]
while i != -1:
#容量が有意に残る辺で、かつBFS未到達ならキューに入れる
if cap[i] > pe and dist[ nxt := edge[i ^ 1] ] == N:
dist[nxt] = x
queue[ Rt := Rt + 1 ] = nxt
i = post[i]
if Lt == Rt or now == Gl:
break
if dist[Gl] == N: #St → Glの辺がない場合、終了
break
#Phase 2. St ← Gl方向にDFS
flow[Gl] = flow_limit - ans #流せる総量(基本的にinf)
leak[Gl] = 0
stack[ d := 0 ] = Gl
while True:
#1. DFSがGl(DFS開始点)に戻ってきた段階で、残フローをすべて使い切れた場合
if ( now := stack[d] ) == Gl and flow[Gl] == leak[Gl]:
return ans + leak[Gl] #全部流せたと報告し、DFSを打ち切る
#2. 目的地Stに到達するか、これ以上流せない場合
if now == St or flow[now] == leak[now]:
#(St側) → now → back → (Gl側) の順に頂点backを定義する。
#流量 f := flow[now] すなわち 届いたフロー全部 を流す
#now ← back は(backから見た)順辺i。 now → backは逆辺となるのでi ^ 1
back = stack[ d := d - 1 ]
f = flow[now]
cap[ i := arrow[back] ] += f #逆辺容量を変更
cap[i ^ 1] -= f
leak[back] += f #ひとつ前の頂点の流出量を反映
continue
#3. (St側) → nxt → now → (Gl側) の順に頂点nxtを定義する。
# フローの本来の向きは探索順の逆で、 nxt → now なので注意
# なのでDFS上では残容量のある、(nowから見た)逆辺を探すことになる
x = dist[now] - 1 #次の距離
while ( i := arrow[now] ) != -1:
if dist[ nxt := edge[i ^ 1] ] == x and ( rev_cap := cap[i ^ 1] ) > pe:
#次の頂点に入るときにはじめてflow[nxt], leak[nxt]を更新
flow[nxt] = min( flow[now] - leak[now], rev_cap )
leak[nxt] = 0
stack[ d := d + 1 ] = nxt
break #ここでは辺番号を変更しない。帰りがけの処理時に変えてもらう
else:
arrow[now] = post[i]
#4. 帰りがけの処理
else:
assert arrow[now] == -1
if now == Gl: #このDFSを終える処理
assert d == 0
ans += leak[now]
break #DFS終了
#(St側) → now → back → (Gl側) の順に頂点backを定義する。
#now → back に流量leak[now] : 流れることが決まったフロー総量 を流す
#その後、この辺の削除操作を行う( A[back]を変える )
back = stack[ d := d - 1 ]
cap[ i := arrow[back] ] -= ( f := leak[now] )
cap[i ^ 1] += f
leak[back] += f
arrow[back] = post[i]
#dual-primal stepが終了したのでansを出力
return ans
#辺の追加
def add_edge(self, From: int, To: int, Cap):
assert From in range(self.N) and To in range(self.N) and 0 <= Cap
self._post.append( len(self._edge) ); self._edge.append(From); self._cap.append(Cap)
self._post.append( len(self._edge) ); self._edge.append( To ); self._cap.append( 0 )
self._base[From], self._post[-2] = self._post[-2], self._base[From]
self._base[ To ], self._post[-1] = self._post[-1], self._base[ To ]
self.M += 1
def get_edge(self, i: int): #i番目の順辺の now, nxt, cap(順辺), rev_cap(逆辺)
assert i in range( self.M )
fw, rv = i << 1, i << 1 | 1
return self._edge[fw], self._edge[rv], self._cap[fw], self._cap[rv]
#最大流を計算 許容誤差: 残余容量がper_error以下の辺は容量0とみなす
def calc(self, start: int, goal: int, permissible_error = 0, flow_limit = 'infinity'):
assert start in range( self.N ) and goal in range( self.N ) and start != goal
return self._calc(start, goal, permissible_error, flow_limit)
#辺容量がすべて整数の場合に限り、O(NMlogC)で解ける
def calc_int(self, start: int, goal: int,
max_capacity: int = 'infinity', flow_limit = 'infinity'):
assert start in range( self.N ) and goal in range( self.N ) and start != goal
m = max_capacity if type( max_capacity ) != str else round( self.inf )
f = flow_limit if type( flow_limit ) != str else round( self.inf )
#容量スケーリングの開始点を決める
m = min( m, f )
logm = max( 1, round(m) ).bit_length() - 1
ans = 0
#容量が(1 << e)以上の辺のみを使う → (1 << e) - 1より真に大きい辺のみを使う
for e in range( logm, -1, -1 ):
ans += self._calc( start, goal, ( 1 << e ) - 1, f - ans )
return ans
#St → Glに最大流を流し終えた状態を仮定する。最小カットのSt側頂点かの判定フラグを返す
def min_cut(self, St, permissible_error = 0):
assert St in range( self.N )
pe = max( 0, permissible_error ) #許容誤差
visited = [False] * self.N
visited[St] = True
queue = self._queue_and_stack
queue[ Rt := 0 ] = St
for Lt, now in enumerate( queue ):
i = self._base[now]
while i != -1:
if self._cap[i] > pe and visited[ nxt := self._edge[i ^ 1] ] == False:
visited[nxt] = True
queue[ Rt := Rt + 1 ] = nxt
i = self._post[i]
if Lt == Rt:
break
return visited
提出例: ABC285G Tatami
最小費用流
$N$頂点$M$辺の容量・費用つきグラフに対し、頂点$S$から頂点$T$への最小費用流を求めます。
なお簡単のため、 目標流量$F$は非負整数と仮定します。
最短路反復のアルゴリズムにポテンシャルを持たせ、負辺なしなら$O(FMlogN)$、負辺がある場合は$O(FMlogN + NM)$で実装します。
最小費用流 自分用メモ
自分用のメモです。
乱文なので、読み飛ばしてください。
正当性
蟻本が詳しいが、略証をメモする。
流量に対する帰納法で最小費用であることを示す。
流量$k$で最小費用流を満たすとし、このグラフに$1$の貪欲最小のフローを追加で流したとする。
この$k+1$のフローより真に小さい費用のフローが存在すると仮定すると、流量$k$の最小費用フローとのグラフ差分を取ることで$S$→$T$パスと$0$個以上の閉路に分けられる。
$S$→$T$の貪欲最小性から、この中には$1$個以上の負閉路が存在することになる。
しかし負閉路は「流量$k$の最小費用流に含まれていた正閉路」を意味し、従って費用最小の仮定と矛盾する。
以上の帰納法より、最短路反復で得られたフローはそのコストでの最小費用流である。
ポテンシャル
最短路反復のアルゴリズムは、残容量が正の辺であって合計費用最小となるパスを発見しフローを反復して流す。
この「合計費用最小のパス」を探す部分は最短経路問題となるが、負の費用(特に逆辺)が生じることから、ナイーブに実装すると$1$回あたり$O(NM)$のベルマンフォード法が必要となる。
しかし以下のようにポテンシャルを定めることで、ダイクストラ法で$1$回あたり$O(MlogN)$で計算できるようになる。($N \leq M \leq N^2$とする)
ポテンシャルを「前回の最短路反復で計算した、始点からの最短距離」と定義する。
このポテンシャルを用い、次の最短路は「各頂点の最短路のうち、ポテンシャルとの差分」をキーとしてダイクストラ法を行うことにする。
P[now]: 頂点nowのポテンシャル
T[now]: 頂点nowの今回のグラフでの最短路距離差分
と割り当てると、T[nxt] ← min( T[now] + { P[now] - P[nxt] + cost(now, nxt) } )
と立式できる。
この遷移中に負辺が出現しない(すなわち、式中{ }
内が非負である)ことを示す。
前回のグラフでの最短路の仮定から、既存の辺については{ }
内は非負である。
追加された辺は前回の最短路の逆辺のみだが、これは{ }
内がちょうど$0$になる。
従って、辺コストがすべて非負というダイクストラ法の条件を満たせるため、高速である。
ただし、最初から負費用の辺があるのにダイクストラ法を使うと$O(2^N)$となるようだ。(終)
参考: snukeさんのブログ ダイクストラ法のよくあるミスと落し方
実装ですが、辺コスト小数に対応するならダイクストラ法をセグメント木で行った方がよいと思います。
heapq
で小数まで対応するならタプル化が必要なので遅くなります。
また、最小費用流に関しては隣接リストは一次元化せず、通常通りの二次元リストで持つほうが効率的です。
最短路計算に用いるベルマンフォード法もダイクストラ法も、頂点に入ったときに全辺イテレートするので、ランダムアクセス回数による定数倍悪化がかからないからです。
最小費用流 実装例
セグメント木の高速化は遅延更新のみとしていますが、高さ制限もつけてもよいと思います。
なおフローは筆者の提出経験が少ないため、不便な実装をしているかもしれません。ご容赦ください。(終)
提出例: ACLPC-E MinCostFlow
class mincostflow:
'''
頂点数N・辺数M・予定流量をFとして、O( F(N + M)logN )で最小費用流を計算します。
負辺がある場合、初回のみBellman Ford法を用いてO(NM)で計算します。
順辺の個数は self.M で取得できます。
'''
def __init__(self, N: int, inf = '4 * 10 ** 18'):
#edge[i], edge[i ^ 1]: now, nxt
#cap[i]: 容量
#cost[i]: フロー1あたりのコスト
#G[now]: 隣接リスト 二次元リスト
#history[now]: 最短経路復元用 最短路を達成する、頂点nowに入る辺番号
#P[now]: ポテンシャル
#dist[v]: SegTreeダイクストラ用 今回の最短距離のうち、P[now]との差分
#node[v]: SegTree用ノード
#lazy[v]: SegTree遅延更新用
#prev: -2 ~ (N - 1) -2: 負辺追加フラグ -1: 初期値 0~: 最後にフローを流した始点
self.N = N
self._size = 1 << (N - 1).bit_length()
self.M = 0
self.inf = max( 0, inf ) if type(inf) != str else 4 * 10 ** 18
self._edge = []
self._cap = []
self._cost = []
self._G = [[] for _ in range(N)]
self._history = [-1] * N
self._P = [0] * N
self._dist = [self.inf] * N
self._node = [self.inf] * ( self._size + N + (N & 1) )
self._lazy = [0] * ( self._size + N )
self._prev = -1
#内部関数: 最短路計算ではP[now]・history[now]の再計算を行う
def _bellman_ford(self, St, Gl, pe):
edge, cap, cost, history = self._edge, self._cap, self._cost, self._history
G, P = self._G, self._P
#初期化
for now in range( self.N ):
P[now], history[now] = self.inf, -1
P[St] = 0
update = True
self._prev = St #prevはここで変更する
while update == True:
udpate = False
for now in range( self._N ):
if P[now] == self.inf:
continue
for i in G[now]:
if cap[i] > pe and P[ nxt := edge[i ^ 1] ] > ( d := P[now] + cost[i] ):
P[nxt], history[nxt] = d, i
update = True
def _dijkstra(self, St, Gl, pe): #O( min(N^2, MlogN) ) 木の高さはlogNで固定
assert self._prev == St
edge, cap, cost, history = self._edge, self._cap, self._cost, self._history
size, dist, node, lazy = self._size, self._dist, self._node, self._lazy
G, N, P = self._G, self.N, self._P
#初期化
for now in range(N):
dist[now], history[now] = self.inf, -1
dist[St] = 0 #最短路反復なので、dist[St] == 0は保障される
node[ x := St | size ] = node[0] = 0
while x != 0:
node[ x := x >> 1 ] = 0
tail = 0
#ダイクストラ法
while True:
#a) pop前の遅延反映 node[1] < node[0] == 0 は常にFalseになる
x = 0
while x != tail:
lazy[x], x = 0, lazy[x]
if ( d := node[x] ) < node[ parent := x >> 1 ]:
node[parent] = d
if lazy[parent] == 0:
lazy[tail] = lazy[parent] = tail = parent
lazy[tail] = tail = 0
#b) 頂点nowを取り出し、SegTreeの内部的な距離をinfに更新
if ( v := node[ x := 1 ] ) == ( w := self.inf ):
break
while x < size:
if node[ x := x << 1 ] != v:
x |= 1
now, node[x] = x ^ size, self.inf
while x != 1:
node[ x := x >> 1 ] = ( w := min( w, node[x ^ 1] ) )
#c) 頂点nowからグラフを見てchmin
#dist[nxt] = min( ( X := dist[now] + P[now] ) - P[nxt] + cost[i] )
X = dist[now] + P[now]
for i in G[now]:
if cap[i] > pe and dist[ nxt := edge[i ^ 1] ] > ( d := X - P[nxt] + cost[i] ):
dist[nxt] = node[ x := nxt | size ] = d
history[nxt] = i
if lazy[x] == 0: #遅延更新
lazy[tail] = lazy[x] = tail = x
#ポテンシャルの更新
for now in range(N):
P[now] = min( dist[now] + P[now], self.inf )
#内部関数: 最小費用流の計算
def _calc(self, St, Gl, flow, pe):
cap, cost, edge, history = self._cap, self._cost, self._edge, self._history
pe = max( 0, pe ) #許容誤差 残余容量がpe未満の辺を容量0とみなす
ans = f = 0
#はじめてフローを流す場合で、負辺追加をしていない場合を例外処理
if self._prev == -1:
self._prev = St
#最短路反復
while ( flow := flow - f ) > 0:
#1. 最短路を計算
if self._prev == St:
self._dijkstra(St, Gl, pe)
else:
self._bellman_ford(St, Gl, pe)
#2. 到達不能(これ以上フローを流せない)なら、infを返す
if history[Gl] == -1:
return self.inf
#3. フローを反映
f, now = flow, Gl
while now != St:
f = min( f, cap[ i := history[now] ] )
now = edge[i]
now = Gl
while now != St:
cap[ i := history[now] ] -= f
cap[i ^ 1] += f
ans += cost[i] * f
now = edge[i]
return ans
#辺の追加・取得
def add_edge(self, From: int, To: int, Cap, Cost):
assert From in range( self.N ) and To in range( self.N ) and 0 <= Cap
self._G[From].append( len(self._edge) ); self._edge.append(From)
self._G[ To ].append( len(self._edge) ); self._edge.append( To )
self._cap.append(Cap); self._cost.append( Cost)
self._cap.append( 0 ); self._cost.append(- Cost)
self.M += 1
#dist[nxt] = min( dist[now] + { P[now] - P[nxt] + cost(now → nxt) } )で遷移する
#{ }内は追加時点で非負か?(負辺なら-2にフラグを変更)
if self._P[From] - self._P[ To ] + Cost < 0:
self._prev = -2
def get_edge(self, i: int): #i個目の順辺の now, nxt, cost, cap(順辺), cap(逆辺)
assert i in range( self.M )
fw, rv = i << 1, i << 1 | 1
return self._edge[fw], self._edge[rv], self._cost[fw], self._cap[fw], self._cap[rv]
#最小費用流を計算 許容誤差: 残余容量がper_error以下の辺は容量0とみなす
def calc(self, start: int, goal: int, flow: int, permissible_error = 0):
assert start in range( self.N ) and goal in range( self.N ) and start != goal
return self._calc( start, goal, flow, permissible_error )
おわりに
おわりです。
バグがあれば修正します。