DEFLATE は、RFC 1951 で定義され、PNG 画像や ZIP アーカイブなどで幅広く用いられている圧縮形式である。
この形式では、符号長を圧縮器が (決められた範囲内で) 独自に設定したハフマン符号を用いることができる。
しかし、RFC 1951 には、ハフマン符号の符号長をどのようにして決めたらいいかは書かれていない。
そこで、今回は、この符号長の決め方を考えてみた。
問題設定
今回解く問題を、競技プログラミングの形式で表してみた。
問題
各アルファベットの出現数を表す数列 $(A_1, A_2, \ldots , A_N)$ が与えられる。
各アルファベットに対応する符号の長さを表す数列 $(L_1, L_2, \ldots , L_N)$ を適当に定め、
圧縮結果のビット列の長さ
$$
\sum_{i=1}^N A_i L_i
$$
が最小になるようにせよ。
ただし、定めた符号長で二分木を構築でき、かつアルファベットに対応しない符号が無いようにしなければならない。
すなわち
$$
\sum_{i=1}^N \frac{1}{2^{L_i}} = 1
$$
でなければならない。
さらに、$i = 1, \ldots , N$ について $1 \leq L_i \leq K$ でなければならない。
条件を満たす数列 $(L_1, L_2, \ldots , L_N)$ が複数ある場合は、どれを出力してもよい。
二分木が構築できない符号の長さでは、役に立たない。
アルファベットに対応しない符号がある場合、デコーダーによっては警告が発生したりエラーになることがある。
入力
入力は、以下の形式で与えられる。
N K
A_1
A_2
...
A_N
1 行目に、要素数 $N$ および符号の長さの制限 $K$ が半角空白区切りで与えられる。
2 行目から $N+1$ 行目までに、各アルファベットの出現数 $A_i$ が 1 個ずつ与えられる。
出力
条件を満たし、圧縮結果のビット列の長さが最小になる符号の長さ $L_i$ を 1 行に 1 個ずつ、$N$ 行で出力せよ。
L_1
L_2
...
L_N
制約
- 入力は全て整数
- $2 \leq N \leq 300$
- $1 \leq K \leq N$
- $N \leq 2^K$
- $1 \leq A_i \leq {10}^9$
部分点:配点の 50% 分のテストケースにおいては $K = N$
DEFLATE の動的ハフマン符号ブロックで扱うアルファベットの数は、最大 286 個である。
よって、これより少し大きく、キリがいい $N \leq 300$ という問題設定が妥当である。
制約 $N \leq 2^K$ が満たされない場合、すべての符号の長さを最大にしてもノードが足りず、ハフマン木を構築することができない。
入出力例
入力:
5 5
2
5
3
1
1
出力:
3
1
2
4
4
この例は、Wikipedia に載っている例に対応する。
既存手法 (部分点)
などから、以下のアルゴリズムで符号の長さを設定すると、圧縮結果のビット列の長さが最小になることがわかる。
- $i = 1, \ldots , N$ について、それぞれ以下のパラメータを持つ二分木のノードを作り、集合に加える
- アルファベット:$i$
- 出現数:$A_i$
- 子:左右ともなし
- 集合の要素数が 1 になるまで、以下を繰り返す
- 集合から、出現数が最小のノードと、2 番目に小さいノードを取り出す (集合から外す)
- 以下のパラメータを持つ二分木のノードを作り、集合に加える
- アルファベット:なし
- 出現数:子となる 2 個のノードの出現数の和
- 子:取り出したノードをそれぞれ左右の子とする
- 集合に残った 1 個のノードを根とし、それぞれのアルファベットが割り当てられたノードの深さを符号の長さとして出力する
# coding: utf-8
import sys
import heapq
# 入力を読み込む
N, K = [int(x) for x in sys.stdin.readline().rstrip().split(" ")]
A = [int(sys.stdin.readline().rstrip()) for _ in range(N)]
# 最初のノードを用意する
# (出現数, 順序付け用の連番, ノード) のタプルを集合に格納する
# ノードは、要素数が 1 の場合は葉を、2 の場合は中間のノードを表す
nodes = [(A[i], i, (i,)) for i in range(N)]
heapq.heapify(nodes)
next_id = N
# 集合に含まれる要素が 1 個になるまで、処理を繰り返す
while len(nodes) > 1:
# 出現数が少ない方からノード (タプル) を 2 個取り出す
node1 = heapq.heappop(nodes)
node2 = heapq.heappop(nodes)
# それらを合わせたノードを集合に格納する
heapq.heappush(nodes, (node1[0] + node2[0], next_id, (node1[2], node2[2])))
next_id += 1
# 木から符号の長さを読み取る
code_len = [-1 for _ in range(N)]
def traverse(node, depth):
if len(node) == 1:
code_len[node[0]] = depth
else:
traverse(node[0], depth + 1)
traverse(node[1], depth + 1)
traverse(nodes[0][2], 0)
# 答えを出力する
print("\n".join([str(x) for x in code_len]))
しかし、このアルゴリズムは、符号の長さの制限を考慮していない。
最悪の場合、$N-1$ 個のノードが以下のように縦に並んでしまう。
たとえば $A_i = 2^i$ とすることで、これを実現できる。
したがって、部分点の制約である $K = N$ の場合は問題ないが、一般の場合では符号の長さが制限を超えてしまい、誤答となる場合がある。
DEFLATE においては、符号の最大長は 15 である。
提案手法
DP (動的計画法) を用いて、長さの制限を含む制約を満たす符号の長さを求める。
ここで、もし仮にある $i, j$ において $A_i \geq A_j$ かつ $L_i \geq L_j$ となっている場合を考える。
このとき
$$
\begin{eqnarray}
&& (A_i L_i + A_j L_j) - (A_i L_j + A_j L_i) \\
&=& A_i (L_i - L_j) - A_j (L_i - L_j) \\
&=& (A_i - A_j) (L_i - L_j) \geq 0
\end{eqnarray}
$$
となる。
これは、$L_i$ と $L_j$ を入れ替えたほうが圧縮結果のビット列の長さを短くできる (少なくとも、入れ替えても長くならない) ことを示している。
したがって、$A_i$ を降順にソートし、これに対して $L_i$ を昇順 (単調非減少) に割り当てていく場合のみを考えればよい。
これを実現するため、以下の状態を考える。
- 降順にソートされた $A_i$ のうち現在注目している要素の位置 (初期値:1)
- 次に割り当てる符号の長さ (初期値:1)
- この符号の長さを割り当てることができる残りの個数 (初期値:2)
以下の操作により状態を更新していくことを考える。
- 現在注目している要素に、次に割り当てる符号の長さを割り当てる
- 現在注目している要素の位置を 1 増やす
- この符号の長さを割り当てることができる残りの個数を 1 減らす
- 答え (圧縮結果のビット列の長さ) の候補は、更新後の状態の答えに「次に割り当てる符号の長さ × 現在注目している要素 (出現数)」を加えたもの
- 次に割り当てる符号の長さを 1 増やす
- 次に割り当てる符号の長さを 1 増やす
- この符号の長さを割り当てることができる残りの個数を 2 倍にする
- 答え (圧縮結果のビット列の長さ) の候補は、更新後の状態の答えそのまま
「定めた符号長でハフマン木を構築でき、かつアルファベットに対応しない符号が無いようにしなければならない」という条件を満たすためには、この符号の長さを割り当てることができる残りの個数は最後の要素に符号を割り当てたときちょうど 0 にならなければならず、また最後の要素に符号を割り当てる前に 0 になってはいけない。
状態が以下の条件のいずれかを満たした場合は、有効な割り当てを行うことはできないので、答えは $\infty$ となる。
- 最後の要素の次の要素に注目し、残りの個数が 1 以上
- 最後の要素またはそれより前の要素に注目し、残りの個数が 0
- 残りの個数が「今注目している要素から最後の要素までの要素の個数」を超える
- 次に割り当てる符号の長さが制限を超える
状態が以下の条件を満たした場合は、有効な割り当てが完了しているので、答えは 0 となる。
- 最後の要素の次の要素に注目し、残りの個数が 0
これらに基づいて、まずはメモ化探索を実装する。
# coding: utf-8
import sys
# 入力を読み込む
N, K = [int(x) for x in sys.stdin.readline().rstrip().split(" ")]
A = [int(sys.stdin.readline().rstrip()) for _ in range(N)]
# 出現数ともとの添字の組に変換し、出現数の降順にソートする
freq = [(A[i], i) for i in range(N)]
freq.sort(reverse=True)
# メモ化探索を実行する
INF = max(A) * K * N + 1
memo = {}
def calc(pos, code_len, code_left):
# 割り当て完了
if (pos >= N) and code_left == 0:
return 0
# 割り当て不可能
if pos >= N or code_left == 0 or code_left > N - pos or code_len > K:
return INF
# メモ参照
key = (pos, code_len, code_left)
if key in memo:
return memo[key][0]
# 計算
# 符号を割り当てる
candidate1 = calc(pos + 1, code_len, code_left - 1) + code_len * freq[pos][0]
# 符号の長さを増やす
candidate2 = calc(pos, code_len + 1, code_left * 2)
# どっちがいいかを考える
if candidate1 <= candidate2:
ans = (candidate1, True)
else:
ans = (candidate2, False)
# メモを更新し、結果を返す
memo[key] = ans
return ans[0]
calc(0, 1, 2)
# 選択を復元する
ans = [-1 for _ in range(N)]
status = (0, 1, 2)
while status[0] < N:
if memo[status][1]:
# 符号を割り当てる
ans[freq[status[0]][1]] = status[1]
status = (status[0] + 1, status[1], status[2] - 1)
else:
# 符号の長さを増やす
status = (status[0], status[1] + 1, status[2] * 2)
# 答えを出力する
print("\n".join([str(x) for x in ans]))
次に、これを配列操作 (DP) に変換することを考える。
calc
内で calc
を呼び出している場所に注目すると、pos
または code_len
に 1 を足して呼び出している。
したがって、pos
または code_len
が自分より大きい部分が全て計算できていれば、自分の値が計算できることがわかる。
この性質を用いると、pos
の降順に計算を行い、同じ pos
の中では code_len
の降順に計算を行えばいいことがわかる。
計算を行う範囲 (答えが $\infty$ や 0 となる条件を満たす場合も含む) は、以下の通りである。
- $0 \leq \textrm{pos} \leq N$
- $1 \leq \textrm{code_len} \leq K + 1$
- $0 \leq \textrm{code_left} \leq N - \textrm{pos} + 1$
これに基づいて実装を行う。
# coding: utf-8
import sys
# 入力を読み込む
N, K = [int(x) for x in sys.stdin.readline().rstrip().split(" ")]
A = [int(sys.stdin.readline().rstrip()) for _ in range(N)]
# 出現数ともとの添字の組に変換し、出現数の降順にソートする
freq = [(A[i], i) for i in range(N)]
freq.sort(reverse=True)
# DP を実行する
INF = max(A) * K * N + 1
dp = [
[
[None for _ in range(N + 2)]
for _ in range(K + 2)]
for _ in range(N + 1)]
# 割り当て完了
for code_len in range(1, K + 2):
dp[N][code_len][0] = (0, None)
# 割り当て不可能
for code_len in range(1, K + 2):
for code_left in range(1, N + 2):
dp[N][code_len][code_left] = (INF, None)
for pos in range(0, N):
dp[pos][code_len][0] = (INF, None)
dp[pos][code_len][N - pos + 1] = (INF, None)
for pos in range(0, N):
for code_left in range(0, N + 1):
dp[pos][K + 1][code_left] = (INF, None)
# 残りの部分を計算する
for pos in range(N - 1, -1, -1):
for code_len in range(K, 0, -1):
for code_left in range(1, N - pos + 1):
candidate1 = dp[pos + 1][code_len][code_left - 1][0] + code_len * freq[pos][0]
candidate2 = dp[pos][code_len + 1][code_left * 2][0] if code_left * 2 <= N - pos else INF
if candidate1 <= candidate2:
dp[pos][code_len][code_left] = (candidate1, True)
else:
dp[pos][code_len][code_left] = (candidate2, False)
# 選択を復元する
ans = [-1 for _ in range(N)]
pos, code_len, code_left = 0, 1, 2
while pos < N:
if dp[pos][code_len][code_left][1]:
# 符号を割り当てる
ans[freq[pos][1]] = code_len
pos, code_len, code_left = pos + 1, code_len, code_left - 1
else:
# 符号の長さを増やす
pos, code_len, code_left = pos, code_len + 1, code_left * 2
# 答えを出力する
print("\n".join([str(x) for x in ans]))
これにより、符号の長さに制限がある場合でも、圧縮結果のビット列の長さが最小になる符号長を求めることができる。
時間計算量は $O(N^2 K)$ であり、$K \leq N \leq 300$ では十分高速に動作することが期待できる。
空間計算量も $O(N^2 K)$ である。
結論
DEFLATE 圧縮で用いることを想定した条件で、符号長に制限がある場合でも、DP (動的計画法) を用いて圧縮結果のビット列の長さが最小となる符号長の割り当て方を求めることができた。
とはいえ、DEFLATE 圧縮にはブロックの区切り方をどのようにして決めるかなどの未確定な点が残っており、このアルゴリズムと RFC 1951 に載っているアルゴリズムだけで圧縮ができるわけではない。
RFC 1951 には、以下の内容が書かれている。
- DEFLATE 圧縮の形式は、LZ77 が生成する圧縮形式と関係がある
- LZ77 から派生した多くの形式について特許が取られている
- 圧縮器の実装者は、特許が取られていないことが知られているここで紹介するアルゴリズムに従うべきである
今回紹介した手法は、RFC 1951 で紹介されているアルゴリズムから逸脱するものである。
(そもそも、RFC 1951 では符号長を決めるアルゴルズムは何も紹介されていない)
今回紹介した手法が特許に抵触するかの調査は行っていないため、特許に抵触しないとはいえない。