0. はじめに
2020年9月7日にAtCoder公式のアルゴリズム集 AtCoder Library (ACL)が公開されました。
私はACLに収録されているアルゴリズムのほとんどが初見だったのでいい機会だと思い、アルゴリズムの勉強からPythonでの実装までを行いました。
この記事では internal_csr をみていきます。
internal_csr は、internal と付いていることからわかる通り、ACLの内部で使われているものなので、ACLを使う上で意識することは無いと思います。ACL v1.3 の段階では mincostflow や internal_scc で使われています。
対象としている読者
- ACL内部で使われている CSR 形式について知りたい方。
- ACLのコードを見てみたけど何をしているのかわからない方。
- C++はわからないのでPythonで読み進めたい方。
参考にしたもの
疎行列に関する Wikipedia の記事です
1. CSRとは
CSR とは、疎行列で有効なデータ格納形式の一つです。
1.1. 疎行列とは
疎行列(スパース行列、sparse matrix)とは成分の多くがゼロであるような行列のことです。具体的に非ゼロ成分が何%未満などと決まっているわけでは無いと思いますが、とにかくゼロが多い行列のことを疎行列と言います。例えば、単位行列などの $N$ 次対角行列は $N$ が大きい場合には疎行列といえます。
競技プログラミングでは主にグラフの問題で疎行列を目にすることになります。
典型的なものとしては「$N$ 個の街と $M$ 本の道があり $i$ 番目の道は街 $a_i$ と街 $b_i$ を〜」みたいな問題です。
具体例として下図のような、自己ループ、多重辺を含まない重みなし有向グラフを考えます。(本記事では頂点番号や辺番号、行列の添字を 0-indexed で統一します。)
ここからはこのグラフが与えられた時にどのようにデータを保持するかをみていきます。
1.2. データを保持する方法1:そのままの行列
まずは最もシンプルな方法で、隣接行列と呼ばれる方法です。この方法では、頂点数 $V$ のグラフに対して、$V$ 次正方行列 $A$ を
A_{ij} := \left\{
\begin{aligned}
1 \;\;& (頂点 \:i\: から頂点 \:j\: への辺が存在する)\\
0 \;\;& (頂点 \:i\: から頂点 \:j\: への辺が存在しない)
\end{aligned}\right.
と定めます。今回の例では
A = \left(\begin{array}{rrrrrrr}
0 & 1 & 0 & 0 & 1 & 0 & 0\\
0 & 0 & 1 & 1 & 0 & 0 & 0\\
0 & 0 & 0 & 0 & 0 & 1 & 0\\
0 & 0 & 1 & 0 & 0 & 0 & 0\\
0 & 0 & 0 & 0 & 0 & 1 & 1\\
0 & 0 & 0 & 0 & 0 & 0 & 0\\
0 & 0 & 0 & 0 & 0 & 1 & 0\\
\end{array}\right)
となります。
この方法のメリットは頂点 $i$ から頂点 $j$ への辺が存在するかを瞬時に判定できるところです。
一方、デメリットとしてはデータを保持するために必要なメモリが大きくなることです。今回の例では頂点数が $V = 7$ だったので問題ありませんが、$V = 10^5$ のような場合には配列の確保すらできないというようなことになってしまいます。競技プログラミングではこのデメリットが問題になることが多いので、隣接行列を使う機会は少ないと思います。隣接行列が活きるのは完全グラフのような辺の数が十分多い場合になります。
1.3. データを保持する方法2:リストのリスト(LIL)
隣接行列の方法では辺が存在しない場合にもデータを保持していることが問題で、これは特に疎行列の場合に顕著になります。この問題を解決する方法の一つが隣接リストと呼ばれる方法です。この方法では長さ $V$ のリスト $A$ を
A[i] := 頂点\:i\:から直接移動可能な頂点集合のリスト
と定めます。今回の例では
\begin{aligned}
A = [\; &[1, 4],\\
&[2, 3],\\
&[5],\\
&[2],\\
&[5, 6],\\
&[],\\
&[5]\;]
\end{aligned}
となります。
この方法では辺が存在する部分のみを保持するので頂点数 $V = 10^5$、辺数 $E = 10^5$ といった問題も扱うことができます。また、DFS や BFS でグラフ上を traverse する際にも扱いやすい形式なので、競技プログラミングではこの形式で保持することが多いと思います。
しかし、言語によっては様々な長さの配列を要素とした配列を作成できないこともあるので、その場合には別の方法を考える必要があります。
1.4. データを保持する方法3:Compressed Sparse Row (CSR)
隣接リストで見えた問題を解決する方法の一つが CSR 形式です。CSR形式では $elist, start$ という2つの1次元配列を用意し、
\begin{aligned}
elist\;\;\;\; &:= 辺を始点の昇順にソートした時の終点のリスト\\
start[i] &:= \left\{
\begin{aligned}
&elistで頂点\:i\:が始点となる最初の\:index \;\; (i \ne V)\\
&E \;\;(i = V)
\end{aligned}\right.
\end{aligned}
と定めます。$elist$ は先ほどの隣接リストを1次元に平坦化したものと等しいので
\begin{aligned}
elist = [1, 4, 2, 3, 5, 2, 5, 6, 5]&\\
(始点 = [0, 0, 1, 1, 2, 3, 4, 4, 6]&)
\end{aligned}
となります。頂点 $0$ を始点とする辺は $elist$ において $0$ から始まるので $start[0] = 0$ です。また、頂点 $1$ を始点とする辺は $elist$ において $2$ から始まるので $start[1] = 2$ です。このようにしていくと
start = [0, 2, 4, 5, 6, 8, 8, 9]
となります。
この形式では頂点 $i$ から直接移動可能な頂点の集合は $elist$ の $[start[i], start[i+1])$ の範囲の値に一致します。
したがって、例えば Python では
# i: 現在いる頂点
# j: iから直接移動可能な頂点
for j in elist[start[i]:start[i+1]]:
...
のように traverse することができます。
この方法であれば、長さ $(辺数 E)$ の1次元配列と長さ $(頂点数 V + 1)$ の1次元配列の2つを用意することでデータを保持できます。
2. CSR形式の実装方法
では、具体的に与えられた入力から $elist, start$ を作成する方法をみていきます。
$start$ は $elist$ における開始位置を表すものですが、これは
start[i] = 始点の頂点番号が\:i\: 未満である辺の数
ということもできます。例えば、
- $i = 0$ のとき、始点の頂点番号が $0$ 未満である辺の数は $0$ なので $start[0] = 0$
- $i = 3$ のとき、始点の頂点番号が $3$ 未満($=0, 1, 2$)である辺の数は $5$ なので $start[3] = 5$
という具合です。
したがって、まず
start[i+1] = 頂点 \:i\: を始点とする辺の数
とし、その後累積和を取ることで $start$ を作成できます。
次に $elist$ ですが、$start$ が $elist$ における開始位置を表すものであることを思い出すと、これをポインタとして使うことが考えられます。例えば
- 始点 $0$、終点 $1$ の辺は $start[0]=0$ なので $elist[0] = 1$ とすれば良い
- 始点 $2$、終点 $5$ の辺は $start[2] = 4$ なので $elist[4] = 5$ とすれば良い
となります。上記操作の後、例えば始点 $0$、終点 $4$ の辺を格納する場合には $elist[1]$ に格納するので一般的に始点 $i$ の辺の終点を $elist$ に格納した後には $start[i]$ をインクリメントする(=ポインタを1つ進める)必要があります。ただし、$start$ を直接変更するわけにはいかないので新たに $counter$ を $start$ の複製として用意しこちらをポインタとして用いることにします。
まとめると
- $start[i+1] = $ 頂点 $i$ を始点とする辺の数とする。
- 累積和を取り $start$ を得る。
- $start$ を複製しこれを $counter$ とする。
- 各辺について
- $elist[counter[始点]]$ に終点を格納
- $counter[始点]$ をインクリメント
となります。
3. CSR形式の実装
では具体的にPythonで実装します。頂点数をn
とし、E
は E[i]=[辺iの始点, 辺iの終点]
である2次元のリストです。(始点、終点は 0-indexed)
def csr(n, E):
start = [0] * (n + 1)
elist = [0] * len(E)
# start[i+1] = 頂点 i を始点とする辺の数
for e0, e1 in E:
start[e0 + 1] += 1
#累積和
for i in range(1, n + 1):
start[i] += start[i - 1]
#挿入位置を表すポインタ
counter = start[:]
for e0, e1 in E:
elist[counter[e0]] = e1
counter[e0] += 1 # ポインタを進める
return start, elist
本記事で扱った例で使用してみると以下のようになります。
n = 7
E = [
[3, 2],
[0, 1],
[1, 2],
[1, 3],
[4, 5],
[2, 5],
[4, 6],
[6, 5],
[0, 4]
]
start, elist = csr(n, E)
print(start) # [0, 2, 4, 5, 6, 8, 8, 9]
print(elist) # [1, 4, 2, 3, 5, 2, 5, 6, 5]
4. CSRの応用
本記事では重みなし有向グラフを例として扱ってきましたが、無向グラフや重み付きグラフでも問題ないです。また、多重辺や自己ループが存在しても問題ありません。
無向グラフの場合には隣接リストでもやるように辺の入力 $a_i, b_i$ に対して $[a_i, b_i], [b_i, a_i]$ の両方を $E$ に加えれば良いです。
重み付きグラフの場合には $elist$ に格納する値を (終点, 重み) というタプルにすれば良いです。
また、より多くの情報を持ちたい場合には構造体を作成しこれを $elist$ に格納すれば良いです。
5. SciPyで疎行列を扱う
AtCoder の Python(3.8.2) 環境で使える科学計算ライブラリ SciPy は疎行列を扱うモジュール scipy.sparse を備えています。本記事の見出しで用いた LIL、 CSRといった表記はこれに準拠したものです。
scipy.sparse では他にも COO や CSC といった格納形式にも対応しており、また、これらの形式間で相互に変換することができます。
scipy.sparseを用いてcsr形式を得るには以下のようにすれば良いです。
$indptr (index ~ pointer)$ が $start$ に、$indices$ が $elist$ にそれぞれ対応します。
n = 7
E = [
[3, 2],
[0, 1],
[1, 2],
[1, 3],
[4, 5],
[2, 5],
[4, 6],
[6, 5],
[0, 4]
]
# 始点、終点、重みに分ける
shiten, shuten, value = [], [], []
for e0, e1 in E:
shiten.append(e0)
shuten.append(e1)
value.append(1)
# csr_matrixの構築
from scipy.sparse import csr_matrix
G = csr_matrix((value, (shiten, shuten)))
print(G.indptr) # [0 2 4 5 6 8 8 9]
print(G.indices) # [1 4 2 3 5 2 5 6 5]
print(G.data) # [1 1 1 1 1 1 1 1 1]
ただし、scipy.sparse.csr_matrixでは多重辺は重みの総和をとった1つの辺にまとまってしまうので注意が必要です。
また、この疎行列クラスは行列の積などの演算にも対応しており、通常の行列積では無駄に計算してしまう $0$ になる部分を飛ばして計算するので疎行列の場合にはこのクラスに変換してから計算することで非常に高速に計算できます。
import numpy as np
from scipy.sparse import csr_matrix
from timeit import timeit
randint = np.random.randint
# Aを全ての要素が0の1000x1000の行列とする
A = np.zeros((1000, 1000), np.int64)
# ランダムに選んだ1000箇所にランダムな値を入れる
R, C, V = randint(1, 1000, 1000), randint(1, 1000, 1000), randint(1, 1000, 1000)
A[R, C] = V
# 同じものをcsr_matrixクラスで作る
A_csr = csr_matrix((V, (R, C)), shape=(1000, 1000), dtype=np.int64)
# Bをランダムな値を持つ1000x1000の行列とする
B = randint(1, 1000, (1000, 1000))
# 通常の行列(numpy.ndarray)で行列積 ABを計算
print(timeit(lambda: A.dot(B), number=1)) # 2.980995751
# csr_matrixで行列積 ABを計算
print(timeit(lambda: A_csr.dot(B), number=1)) # 0.003365975000000354
6. おわりに
今回は internal_csr をみてきました。CSR形式は AtCoder で Numba や NumPy を使いたい方にとっては有用なテクニックだと思います。
説明の間違いやバグ、アドバイス等ありましたらお知らせください。