以下の仕様書を参照しています。
- RFC 1950 : ZLIB Compressed Data Format Specification version 3.3
- RFC 1951 : DEFLATE Compressed Data Format Specification version 1.3
- RFC 1952 : GZIP file format specification version 4.3
データ圧縮の概要
主に、二種類の圧縮用符号化が使用されています。
- LZ 符号化
- ハフマン符号化
LZ 符号化
LZ 符号化は、次のような変換イメージです。
データ:
'code length, literal/length alphabet, distance alphabet, abababababa'
LZ符号:
'code length, literal/' {6,16} ' alphabet, distance' {11,19} 'ab' {9,2}
{文字数, 前方距離} 形式は、「前方距離」分の文字を遡り、「文字数」を複写
{ 6, 16} -> 'length'
{11, 19} -> ' alphabet, '
{ 9, 2} -> 'ababababa'
DEFLATE 圧縮データ形式での LZ 符号化
DEFLATE 圧縮データ形式での LZ 符号では、文字の範囲を拡張して複写情報などを含みます。
値 | 種類 | 内容 |
---|---|---|
0 〜 255 | 文字 | 8 ビットのデータ |
256 | 制御文字 | ブロック終端(後述) |
257 〜 285 | 複写 | 複写の文字数情報も兼ねる(付加情報あり) |
286 〜 287 | 未使用 |
値は、文字や複写情報などですが、これを「 文字(長) 」と略記します。
■ 複写の場合
複写の場合は、次のようになります。
文字(長) | 文字数用付加情報 | 前方距離コード | 前方距離用付加情報 |
---|---|---|---|
257 〜 285 | 0 〜 5 ビット | 0 〜 29 | 0 〜 13 ビット |
前半 2 項目が「文字数」で、後半 2 項目が「前方距離」です。これで {文字数, 前方距離} になります。
前半:文字数の詳細
文字(長) | 付加ビット数 | 文字数 |
---|---|---|
257 | 0 | 3 |
258 | 0 | 4 |
259 | 0 | 5 |
260 | 0 | 6 |
261 | 0 | 7 |
262 | 0 | 8 |
263 | 0 | 9 |
264 | 0 | 10 |
265 | 1 | 11 〜 12 |
266 | 1 | 13 〜 14 |
267 | 1 | 15 〜 16 |
268 | 1 | 17 〜 18 |
269 | 2 | 19 〜 22 |
270 | 2 | 23 〜 26 |
271 | 2 | 27 〜 30 |
272 | 2 | 31 〜 34 |
273 | 3 | 35 〜 42 |
274 | 3 | 43 〜 50 |
275 | 3 | 51 〜 58 |
276 | 3 | 59 〜 66 |
277 | 4 | 67 〜 82 |
278 | 4 | 83 〜 98 |
279 | 4 | 99 〜 99 |
280 | 4 | 115 〜 130 |
281 | 5 | 131 〜 162 |
282 | 5 | 163 〜 194 |
283 | 5 | 195 〜 226 |
284 | 5 | 227 〜 257 |
285 | 0 | 258 |
後半:前方距離の詳細
前方距離コード | 付加ビット数 | 前方距離 |
---|---|---|
0 | 0 | 1 |
1 | 0 | 2 |
2 | 0 | 3 |
3 | 0 | 4 |
4 | 1 | 5 〜 6 |
5 | 1 | 7 〜 8 |
6 | 2 | 9 〜 12 |
7 | 2 | 13 〜 16 |
8 | 3 | 17 〜 24 |
9 | 3 | 25 〜 32 |
10 | 4 | 33 〜 48 |
11 | 4 | 49 〜 64 |
12 | 5 | 65 〜 96 |
13 | 5 | 97 〜 128 |
14 | 6 | 129 〜 192 |
15 | 6 | 193 〜 256 |
16 | 7 | 257 〜 384 |
17 | 7 | 385 〜 512 |
18 | 8 | 513 〜 768 |
19 | 8 | 769 〜 1024 |
20 | 9 | 1025 〜 1536 |
21 | 9 | 1537 〜 2048 |
22 | 10 | 2049 〜 3072 |
23 | 10 | 3073 〜 4096 |
24 | 11 | 4097 〜 6144 |
25 | 11 | 6145 〜 8192 |
26 | 12 | 8193 〜 12288 |
27 | 12 | 12289 〜 16384 |
28 | 13 | 16385 〜 24576 |
29 | 13 | 24577 〜 32768 |
30 | 0 | (未使用) |
31 | 0 | (未使用) |
■ 上記例の場合
'code length, literal/' {6,16} ' alphabet, distance' {11,19} 'ab' {9,2}
上記の複写情報 {文字数, 前方距離} は、以下になります。
複写情報 | 文字(長) | 値(ビット数) | 前方距離コード | 値(ビット数) |
---|---|---|---|---|
{ 6, 16 } | 260 | 7 | 3 (2 ビット) | |
{ 11, 19 } | 265 | 0 (1 ビット) | 8 | 2 (3 ビット) |
{ 9, 2 } | 263 | 1 |
「文字(長)」と「前方距離コード」のビット数は、ハフマン符号化されるので不定です。
LZ 符号化例の DEFLATE 形式を疑似的に表現すると、以下のようになります。
'code length, literal/', 260,7,(3), ' alphabet, distance', 265,(0),8,(2), 'ab', 263,1
# (n) は文脈からビット数が決まり、(n) 以外[文字:0〜255を含む]はハフマン符号化されます
ハフマン符号化
例として、次の文字列を下記プログラム(huffman.py)で、ハフマン符号を作ります。
'huffman compressed data'
huffman.py : ハフマン符号生成プログラム
#!/usr/bin/env python3
#
# ハフマン符号の生成
#
class Huffman:
# 生成される符号
class Code:
def __init__(self, code, length, literal, freq):
self.code = code
self.length = length
self.literal = literal
self.freq = freq
self.set_binary()
self.set_char()
def set_binary(self):
self.binary = ('{0:0%sb}' % self.length).format(self.code)
def set_char(self):
c = chr(self.literal)
if c.isprintable():
self.char = '\'%s\'' % c
else:
self.char = '%#x' % self.literal
def key_code(self): return self.code
def key_length(self): return self.length
def key_length_literal(self): return (self.length, self.literal)
def key_literal(self): return self.literal
def key_freq_literal(self): return (self.freq, self.literal)
def key_binary(self): return self.binary
# 葉(Leaf)または節(Node)となる要素
class Element:
def __init__(self, freq, literal, branch):
self.freq = freq
self.literal = literal
self.branch = branch
def key_freq_literal(self):
return (self.freq, self.literal)
# 葉 情報
class Leaf(Element):
def __init__(self, freq, literal):
super().__init__(freq, literal, None)
# 節 情報
class Node(Element):
def __init__(self, freq, branch):
super().__init__(freq, 0, branch)
# 木の生成
@staticmethod
def build_tree(data):
tree = []
# 葉の一覧を作る
for literal in range(min(data), max(data) + 1):
freq = data.count(literal)
if freq != 0:
tree.append(Huffman.Leaf(freq, literal))
# 木を作る
while len(tree) > 1:
# 頻度,文字の優先順位でソート
tree = sorted(tree, key=Huffman.Element.key_freq_literal)
# 頻度の下位2つを、1つの節にまとめる
branch = tree[:2]
tree = tree[2:]
freq = branch[0].freq + branch[1].freq
tree.append(Huffman.Node(freq, branch))
return tree[0]
# 辞書の生成
@staticmethod
def make_dictionary(top):
dictionary = []
nodes = [(top, 0, 0)]
while len(nodes) != 0:
elem, code, length = nodes.pop()
if elem.branch is None:
dictionary.append(Huffman.Code(code, length, elem.literal, elem.freq))
continue
nodes.append((elem.branch[0], code * 2 + 0, length + 1))
nodes.append((elem.branch[1], code * 2 + 1, length + 1))
return sorted(dictionary, key=Huffman.Code.key_literal)
# RFC 1951: DEFLATE 形式に変換
@staticmethod
def to_deflate(dictionary):
dictionary = sorted(dictionary, key=Huffman.Code.key_literal)
length_min = min(dictionary, key=Huffman.Code.key_length).length
length_max = max(dictionary, key=Huffman.Code.key_length).length
length_size = length_max + 1
length_count = [0] * length_size
for code in dictionary:
length_count[code.length] += 1
code = 0
length_code = [0] * length_size
for n in range(length_max+1):
code <<= 1
length_code[n] = code
code += length_count[n]
for code in dictionary:
code.code = length_code[code.length]
code.set_binary()
length_code[code.length] += 1
#
# Huffman クラスの初期化
#
def __init__(self, data, deflate=False):
self.tree = Huffman.build_tree(data)
self.dictionary = Huffman.make_dictionary(self.tree)
if deflate:
Huffman.to_deflate(self.dictionary)
# 統計情報の取得
def statistics(huff):
length = 0
freq = 0
max_code = 0
max_length = 0
max_literal = 0
max_char = 0
max_freq = 0
max_binary = 0
for code in huff.dictionary:
c = code.code
b = code.length
l = code.literal
s = len(code.char)
w = code.freq
p = len(code.binary)
max_code = max(c, max_code)
max_length = max(b, max_length)
max_literal = max(l, max_literal)
max_char = max(s, max_char)
max_freq = max(w, max_freq)
max_binary = max(p, max_binary)
length += (b * w)
freq += w
return {
'total': {
'length': length,
'freq': freq,
},
'max': {
'code': max_code,
'length': max_length,
'literal': max_literal,
'char': max_char,
'freq': max_freq,
'binary': max_binary,
},
}
#
# コマンド ラインから呼び出し
#
if __name__ == '__main__':
def main():
import sys
import codecs
import argparse
#
# コマンド ライン引数の解析
#
parser = argparse.ArgumentParser(description='データからハフマン符号を生成します')
parser.add_argument('-T', '--text', help='入力文字列 (FILE は無視される)')
parser.add_argument('-SF', '--sort-freq', action='store_true', help='頻度でソート')
parser.add_argument('-SL', '--sort-literal', action='store_true', help='文字(値)でソート')
parser.add_argument('-SB', '--sort-length', action='store_true', help='符号長でソート')
parser.add_argument('-SC', '--sort-code-be', action='store_true', help='符号(左詰)でソート [デフォルト]')
parser.add_argument('-Sc', '--sort-code-le', action='store_true', help='符号(右詰)でソート')
parser.add_argument('-p', '--padding', metavar='CHAR', help='符号に CHAR 文字を追加')
parser.add_argument('-r', '--reverse', action='store_true', help='降順でソート')
parser.add_argument('-D', '--deflate', action='store_true', help='DEFLATE 形式の辞書にする')
parser.add_argument('-DL', '--deflate-literal', action='store_true',
help='RFC 1951 literal/length 固定辞書モード (FILE は無視される)')
parser.add_argument('-DD', '--deflate-distance', action='store_true',
help='RFC 1951 の distance 固定辞書モード (FILE は無視される)')
parser.add_argument('-F', '--format', metavar='STR', help='出力形式を指定')
parser.add_argument('-N', '--no-total', action='store_true', help='合計の出力を抑制する')
parser.add_argument('FILE', nargs='?')
args = parser.parse_args()
# 出力形式:
# %B: 符号長
# %C: 符号(10進数)
# %P: 符号(左詰:2進数)
# %p: 符号(右詰:2進数)
# %L: 文字
# %D: 文字の値(10進数)
# %X: 文字の値(16進数大文字)
# %x: 文字の値(16進数小文字)
# %F: 頻度
line_format = args.format
if line_format is None:
if args.deflate_literal:
line_format = '%D=0x%X %B:%P'
elif args.deflate_distance:
line_format = '%D %B:%P'
else:
line_format = '%L (%D=0x%X)[%F] %B:%P'
#
# ソート方法の設定
#
if args.sort_freq:
sort_key = Huffman.Code.key_freq_literal
elif args.sort_literal:
sort_key = Huffman.Code.key_literal
elif args.sort_length:
sort_key = Huffman.Code.key_length_literal
elif args.sort_code_be:
sort_key = Huffman.Code.key_code
else: # if args.sort_binary: # default
sort_key = Huffman.Code.key_binary
#
# データの読み込みとハフマン符号の作成
#
flag_deflate = args.deflate
flag_total = not args.no_total
if args.deflate_literal:
# RFC 1951 literal/length 固定辞書用
flag_deflate = True
flag_total = False
data = []
for n in range(144):
data += [(n + 0)] * 2
for n in range(112):
data += [(n + 144)] * 1
for n in range(24):
data += [(n + 256)] * 4
for n in range(8):
data += [(n + 280)] * 2
elif args.deflate_distance:
# RFC 1951 distance 固定辞書用
flag_deflate = True
flag_total = False
data = [n for n in range(32)]
elif args.text:
data = codecs.encode(args.text)
else:
if args.FILE is None:
parser.print_help()
return
if args.FILE == '-':
data = codecs.encode(sys.stdin.read())
else:
data = open(args.FILE, 'rb').read()
huff = Huffman(data, flag_deflate)
#
# 結果の出力
#
dinf = huff.statistics()
clen = len('%d' % dinf['max']['code'])
dlen = len('%d' % dinf['max']['literal'])
xlen = len('%x' % dinf['max']['literal'])
slen = dinf['max']['char']
flen = len('%d' % dinf['max']['freq'])
blen = len('%d' % dinf['max']['length'])
plen = dinf['max']['binary']
fmtstr_dict = {
'C': '{0:%dd}' % clen,
'B': '{1:%dd}' % blen,
'D': '{2:%dd}' % dlen,
'X': '{2:0%dX}' % xlen,
'x': '{2:0%dx}' % xlen,
'F': '{3:%dd}' % flen,
'P': '{4:<%ds}' % plen,
'p': '{4:>%ds}' % plen,
'L': '{5:<%ds}' % slen,
}
# 出力形式の解析
fmtstr = ''
prefix = False
for c in line_format:
if not prefix:
if c != '%':
fmtstr += c
continue
prefix = True
continue
prefix = False
if c in fmtstr_dict:
fmtstr += fmtstr_dict[c]
continue
fmtstr += c
# 辞書の出力
squote = '\''
pchar = args.padding and args.padding[0] or ' '
for code in sorted(huff.dictionary, key=sort_key, reverse=args.reverse):
binary = code.binary
if args.padding:
binary += (pchar * (plen - len(binary)))
pass
print(fmtstr.format(code.code, code.length, code.literal, code.freq, binary, code.char))
# 合計の出力
if flag_total:
total = dinf['total']
freq = total['freq']
print('入力: %d バイト (%d ビット)' % (freq, freq * 8))
print('符号: %d バイト (%d ビット)' % ((total['length'] + 7) >> 3, total['length']))
main()
$ python3 huffman.py -T 'huffman compressed data' -SL -F '%F %L %P'
2 ' ' 1110
3 'a' 100
1 'c' 10100
2 'd' 1111
2 'e' 000
2 'f' 001
1 'h' 10101
2 'm' 010
1 'n' 10110
1 'o' 10111
1 'p' 11000
1 'r' 11001
2 's' 011
1 't' 11010
1 'u' 11011
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)
リストの出力形式は
頻度 '文字' 符号
になっています。
■ 頻度 : 文字に対する文字列中の個数です。空白は 2 個、'a' は 3 個です。
■ 符号 : ビット列です。頻度が高いほど短くなり、低いほど長くなります。この性質がデータ圧縮の鍵です。 ハフマン木の分岐は左から右へ順に行います 。
■ 入力 : 1 文字を 1 バイトとする文字列として、23 バイトになります。
■ 符号 : 文字を符号に変換して、ビット列にすると 12 バイトにすることができます。ただし、符号化・復号で同じ辞書をプログラム側で用意する必要があります。
※ 辞書 : 文字と符号は 1 対 1 の対応になります。対応の一覧が辞書です。
符号化と復号
文字と符号が 1 対 1 なので、符号化・復号ともに置き換え処理を文字数だけループするプログラムになります。
上記で作った辞書(文字 ' acdefhmnoprst' だけで作られた文字列が対象になる)を使った、ハフマン符号化と復号のサンプル プログラムを作ってみます。
ハフマン符号化と復号
#!/usr/bin/env python3
# 辞書
dictionary = (
(' ', '1110'),
('a', '100'),
('c', '10100'),
('d', '1111'),
('e', '000'),
('f', '001'),
('h', '10101'),
('m', '010'),
('n', '10110'),
('o', '10111'),
('p', '11000'),
('r', '11001'),
('s', '011'),
('t', '11010'),
('u', '11011'),
)
# 事前準備
code_length_min = min((len(d[1]) for d in dictionary))
code_length_max = max((len(d[1]) for d in dictionary))
code_length_range = range(code_length_min, code_length_max + 1)
code_literal_dict = {code: literal for literal, code in dictionary}
literal_code_dict = {literal: code for literal, code in dictionary}
# 復号
def decode(data):
output = ''
while len(data):
for length in code_length_range:
code = data[:length]
if code in code_literal_dict:
output += code_literal_dict[code]
data = data[length:]
break
else:
raise Exception('バグってる')
return output
# 符号化
def encode(data):
return ''.join(literal_code_dict[literal] for literal in data)
# 例
data = 'huffman compressed data'
# 符号化と復号
enc_data = encode(data)
dec_data = decode(enc_data)
print('文字列: \'%s\'' % data)
print('符号化 %d ビット: %s' % (len(enc_data), enc_data))
print('復号 %d バイト: \'%s\'' % (len(dec_data), dec_data))
print('検証:', data == dec_data and '正常' or '異常')
(エンディアンの問題を避けるため、符号を文字列のまま処理しています)
文字列: 'huffman compressed data'
符号化 89 ビット: 10101110110010010101001011011101010010111010110001100100001101100011111110111110011010100
復号 23 バイト: 'huffman compressed data'
検証: 正常
符号化すると、ごちゃごちゃしたビット列の情報になり、復号すると元に戻ります。
DEFLATE 圧縮データ形式でのハフマン符号
huffman.py に '-D' オプションを付けると DEFLATE 圧縮データ形式で使用する符号を生成します。
上記と同じ例の文字列を、プログラムのオプションを増やして、ハフマン符号を作ります。
'huffman compressed data'
$ python3 huffman.py -T 'huffman compressed data' -SL -F '%F %L %B:%P' -D
2 ' ' 4:1010
3 'a' 3:000
1 'c' 5:11000
2 'd' 4:1011
2 'e' 3:001
2 'f' 3:010
1 'h' 5:11001
2 'm' 3:011
1 'n' 5:11010
1 'o' 5:11011
1 'p' 5:11100
1 'r' 5:11101
2 's' 3:100
1 't' 5:11110
1 'u' 5:11111
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)
リストの出力形式は
頻度 '文字' 符号長:符号
になっています。
これは '文字' でソートしたリストですが、(符号,'文字') の優先順位でソートしてみます。
$ python3 huffman.py -T 'huffman compressed data' -SB -F '%F %L %B:%P' -D
3 'a' 3:000
2 'e' 3:001
2 'f' 3:010
2 'm' 3:011
2 's' 3:100
2 ' ' 4:1010
2 'd' 4:1011
1 'c' 5:11000
1 'h' 5:11001
1 'n' 5:11010
1 'o' 5:11011
1 'p' 5:11100
1 'r' 5:11101
1 't' 5:11110
1 'u' 5:11111
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)
符号の 2 進数を 10 進数にすると
3 'a' 3:000 = 0
2 'e' 3:001 = 1
2 'f' 3:010 = 2
2 'm' 3:011 = 3
2 's' 3:100 = 4
2 ' ' 4:1010 = 5 * 2 + 0 = 10
2 'd' 4:1011 = 5 * 2 + 1 = 11
1 'c' 5:11000 = (5 * 2 + 2) * 2 + 0 = 24
1 'h' 5:11001 = (5 * 2 + 2) * 2 + 1 = 25
1 'n' 5:11010 = (5 * 2 + 2) * 2 + 2 = 26
1 'o' 5:11011 = (5 * 2 + 2) * 2 + 3 = 27
1 'p' 5:11100 = (5 * 2 + 2) * 2 + 4 = 28
1 'r' 5:11101 = (5 * 2 + 2) * 2 + 5 = 29
1 't' 5:11110 = (5 * 2 + 2) * 2 + 6 = 30
1 'u' 5:11111 = (5 * 2 + 2) * 2 + 7 = 31
だから、('文字',符号長) の一覧があれば符号を求められることが分かります。
■ 辞書を圧縮する
code_length の '文字' 順に符号長を配列 (4,3,5,4,3,3,5,3,5,5,5,3,5,5) にすると、符号長 3,4,5 に対応するハフマン符号を割り当てることができます。
$ python3 huffman.py -T '435433535555355' -SL -F '%F %L %B:%P' -D -N
5 '3' 2:10
2 '4' 2:11
8 '5' 1:0
この辞書は、配列 (0,0,0,2,2,1) から作ることができます。(0 は情報として存在しないことを意味する)
これで「符号長の配列=辞書相当」をハフマン符号化して圧縮できることになります。
DEFLATE 形式の利点は
「配列のインデックスが文字(値)」となる「符号長の配列」から「辞書を再現できる」
( length = code_length [ literal ] --> code, length = dictionary [ literal ] )
ことです。
配列から辞書を作るサンプル プログラム
#!/usr/bin/env python3
# 「文字と符号長」の一覧から辞書を作る
def make_dictionary(lengths):
if type(lengths) == dict:
literals = tuple(literal for literal in lengths)
lengths = [lengths[literal] for literal in lengths]
else:
literals = range(len(lengths))
def gen_length_code():
yield 0
code = 0
for length in range(1, max(lengths) + 1):
yield code
code = (code + lengths.count(length)) << 1
def gen_dictionary(length_code):
for index in range(len(lengths)):
literal = literals[index]
length = lengths[index]
code = length_code[length] if length != 0 else None
length_code[length] += 1
yield (literal, code, length)
return (*gen_dictionary([*gen_length_code()]),)
# 辞書の表示
def dump_dictionary(dictionary, fmtstr=None):
if not fmtstr:
fmtstr = '\'{0}\' {1:d}:{2}'
for literal, code, length in dictionary:
if length != 0:
binary = ('{0:0%db}' % length).format(code)
print(fmtstr.format(literal, length, binary))
if __name__ == '__main__':
print('文字列例の辞書')
dump_dictionary(make_dictionary({
' ': 4, 'a': 3, 'c': 5, 'd': 4, 'e': 3,
'f': 3, 'h': 5, 'm': 3, 'n': 5, 'o': 5,
'p': 5, 'r': 5, 's': 3, 't': 5, 'u': 5,
}))
print()
print('符号長の辞書')
dump_dictionary(make_dictionary((
0, 0, 0, 2, 2, 1
)))
文字列例の辞書
' ' 4:1010
'a' 3:000
'c' 5:11000
'd' 4:1011
'e' 3:001
'f' 3:010
'h' 5:11001
'm' 3:011
'n' 5:11010
'o' 5:11011
'p' 5:11100
'r' 5:11101
's' 3:100
't' 5:11110
'u' 5:11111
符号長の辞書
'3' 2:10
'4' 2:11
'5' 1:0
ZLIB/GZIP 圧縮データ形式
ZLIB, GZIP のデータは大まかに、次のようになっています。
項目 | ZLIB | GZIP |
---|---|---|
ヘッダ部 | ZLIB の情報 | GZIP の情報 |
圧縮データ | DEFLATE 形式 | DEFLATE 形式 |
チェック・サム | ADLER32 | CRC32 |
サイズ | なし | ISIZE |
ZLIB 形式では、ネットワーク・バイト・オーダー(ビッグ・エンディアン:上位バイト、下位バイトの順)で格納されます。解凍処理後のデータ検証には Adler-32 が利用できます。
GZIP 形式では、リトル・エンディアン(下位バイト、上位バイトの順)で格納されます。解凍処理後のデータ検証には、CRC-32 と ISIZE(元データのサイズ:32 bit)が利用できます。
※ ZLIB/GZIP の情報は、解凍処理に影響がないので折りたたみます。
ZLIB の情報
ZLIB の情報
ZLIB 圧縮データ形式は先頭から圧縮データの直前までは、次のようになっています。
項目 | バイト数 | 内容 | 備考 |
---|---|---|---|
CMF | 1 | 圧縮形式と圧縮形式の情報 | |
FLG | 1 | フラグなど | |
DICTID | 0 か 4 | 固定辞書の ID | FLG.FDICT が 0 の場合はなし |
CMF : Compression Method and Flags
CMF の内容は、次のとおりです。
ビット位置 | 項目 | 内容 |
---|---|---|
3 〜 0 | CM | 圧縮形式 |
7 〜 4 | CINFO | 圧縮形式に対する情報 |
CM : 圧縮形式
CM が 8 のとき、圧縮データは DEFLATE 形式になります。15 は予約です。
(RFC 1950 : Version 3.3 では CM=8 のみが許可されています)
CINFO : 圧縮形式に対する情報
CM=8 のとき、CINFO は LZ77 のウインドウ・サイズを、0 から 7 で表します。
ウインドウ・サイズ = 2^{CINFO+8}
FLG : FLaGs
FLG の内容は、次のとおりです。
ビット位置 | 項目 | 内容 |
---|---|---|
4 〜 0 | FCHECK | CMF,FLG の検証用 |
5 | FDICT | DICTID あり |
7 〜 6 | FLEVEL | 圧縮アルゴリズムの種類 |
FCHECK
次の計算が真になるように、FCHECK を定めます。
偽ならば、エラーです。
FDICT
後続に DICTID が存在する場合は 1 になります。(CM=8 の場合は 0 です)
FLEVEL
FLEVEL の値と内容は、次のとおりです。
値 | アルゴリズム |
---|---|
0 | 最速 : 処理時間が最短 |
1 | 高速 : 処理時間が短い |
2 | デフォルト |
3 | 最小 : 処理時間が長い |
再圧縮などでの参考値です。
DICTID
辞書のバイト・シーケンスを Adler-32 で計算した値が、辞書 ID になります。
GZIP の情報
GZIP の情報
GZIP 圧縮データ形式は先頭から圧縮データの直前までは、次のようになっています。
項目 | バイト数 | 内容 | 備考 |
---|---|---|---|
ID1 | 1 | 固定値 31 = 0x1f | |
ID2 | 1 | 固定値 139 = 0x8b | |
CM | 1 | 圧縮形式 | |
FLG | 1 | フラグなど | |
MTIME | 4 | 日時 | |
XFL | 1 | 追加フラグ | |
OS | 1 | 圧縮場所 | テキスト形式の参考情報 |
EXTRA | ? | 追加情報 | FLG.FEXTRA が 0 の場合はなし |
NAME | ? | 元のファイル名 | FLG.FNAME が 0 の場合はなし |
COMMENT | ? | コメント | FLG.FCOMMENT が 0 の場合はなし |
CRC16 | 0 か 2 | 上記のチェック・サム | FLG.FHCRC が 0 の場合はなし |
ID1,ID2 : IDentification 1,2
GZIP 圧縮データ形式の始まりを示します。
CM : Compression Method
CM が 0 から 7 まて予約になっています。
CM が 8 のとき、圧縮データは DEFLATE 形式になります。
FLG : FLaGs
FLG の内容は、次のとおりです。
ビット位置 | 項目 | 内容 |
---|---|---|
0 | FTEXT | テキストかも |
1 | FHCRC | CRC16 あり |
2 | FEXTRA | EXTRA あり |
3 | FNAME | NAME あり |
4 | FCOMMENT | COMMENT あり |
5 〜 7 | 予約 |
MTIME : Modification TIME
MTIME は GMT 1970/1/1 00:00:00 からの秒数です。
MTIME が 0 の場合は、日時情報は無効です。
XFL : eXtra FLags
XFL の内容は CM に依存します。
CM=8 の場合は、次のとおりです。
値 | アルゴリズム |
---|---|
2 | 最小 : 処理時間が長い |
4 | 最速 : 処理時間が最短 |
OS : Operating System
テキスト形式(CR/LF など)の参考情報です。
値 | OS |
---|---|
0 | FAT filesystem (MS-DOS, OS/2, NT/Win32) |
1 | Amiga |
2 | VMS (or OpenVMS) |
3 | Unix |
4 | VM/CMS |
5 | Atari TOS |
6 | HPFS filesystem (OS/2, NT) |
7 | Machintosh |
8 | Z-System |
9 | CP/M |
10 | TOPS-20 |
11 | NTFS filesystem (NT) |
12 | QDOS |
13 | Acorn RISCOS |
255 | unknown |
EXTRA : XLEN と Extra field
FLG.FEXTRA が 1 とき、追加情報があります。
項目 | バイト数 | 内容 |
---|---|---|
XLEN | 2 | Extra filed の大きさ |
Extra filed | XLEN | 追加情報 |
XLEN : eXtra LENgth
Extra filed の大きさを示します。
Extra field
追加情報は、次の形式のサブフィールドが並びます。
項目 | バイト数 | 内容 |
---|---|---|
SI1 | 1 | サブフィールド識別子 1 |
SI2 | 1 | サブフィールド識別子 2 |
LEN | 2 | サブフィールドの大きさ |
sub field data | LEN | データ |
NAME
FLG.FNAME が 1 とき、オリジナルのファイル名が NULL(=0x00)文字で終わる文字列として格納されます。
COMMENT
FLG.FCOMMENT が 1 とき、コメントが NULL(=0x00)文字で終わる文字列として格納されます。
CRC16
FLG.FHCRC が 1 とき、先頭から CRC16 の直前までデータの CRC32 の結果から下位 16 ビットが格納されます。
DEFLATE 圧縮データ形式
DEFLATE 圧縮データは、ブロック単位(可変長)のデータが並んでいます。
ブロック
ブロックは大まかに、次のようになっています。
項目 | ビット数 | 内容 |
---|---|---|
BFINAL | 1 | 最後のブロックであることを示す |
BTYPE | 2 | 後続のデータ形式を示す |
データ | 不定 | BTYPE による |
ブロックの並び
DEFLATE 圧縮データの全体は、次の並びになります。
ブロック \ 項目 | BFINAL | BTYPE | データ |
---|---|---|---|
ブロック 1 | 0 | ?? | . . . |
ブロック 2 | 0 | ?? | . . . |
... | 0 | ?? | . . . |
ブロック n | 1 | ?? | . . . |
ビット列の順序
左側を上位ビットとすると、ビット列の順序は次のようになります。
... 上位ビット | BTYPE | BFINAL | 下位ビット ... |
---|---|---|---|
... 後続のデータ | 2 ビット | 1 ビット | 以前のデータ ... |
データのビット列は、下位ビットから順に詰められたもの(リトル・エンディアン)になっています。
項目の順序と、数の表現(上位の桁から下位の桁へ)は逆向きになります。
ハフマン符号のビット列は反転させる
ハフマン木の分岐は左から右に進みますが、格納されるデータのビット列は右から左なので、ハフマン符号のビット列を反転させる必要があります。
BTYPE 別ブロック
BTYPE とブロックの内容は、次のようになっています。
種類 \ 項目 | BFINAL | BTYPE | バイト境界 | データ長 | 動的辞書 | データ |
---|---|---|---|---|---|---|
非圧縮形式 | ? | 00 | あり | あり | なし | 無変換 |
固定辞書形式 | ? | 01 | なし | なし | なし | 圧縮 |
動的辞書形式 | ? | 10 | なし | なし | あり | 圧縮 |
(予約) | ? | 11 | - | - | - | - |
BTYPE が 01 および 10 では、共通の圧縮データ形式(LZ 符号)です。また、LZ 符号の項目でハフマン符号化される「文字(長)」と「前方距離コード」は、別々の辞書を使います。
BTYPE=00 : 非圧縮形式
BTYPE=00 のブロックは、次のようになっています。
情報量 | 項目 | 内容 |
---|---|---|
1 ビット | BFINAL | 最終ブロックならば 1 |
2 ビット | BTYPE | 00 |
0 〜 7 ビット | 余白 | LEN をバイト境界から開始するための穴埋め |
16 ビット | LEN | データのバイト数 |
16 ビット | NLEN | LEN の 1 の補数 (~LEN) |
LEN バイト | データ | 無変換のデータ(最大 65535 バイト) |
解凍処理は LEN == ~NLEN を確認してデータをコピーするだけ。
BTYPE=01 : 固定辞書形式
BTYPE=01 のブロックは、次のようになっています。
情報量 | 項目 | 内容 |
---|---|---|
1 ビット | BFINAL | 最終ブロックならば 1 |
2 ビット | BTYPE | 01 |
??? | データ | 圧縮データ |
BTYPE=10 から動的辞書を省いた(固定辞書を使う)ものになっています。
固定辞書の生成プログラム
make_dictionary の引数(符号長の配列)が固定辞書になります。
#!/usr/bin/env python3
def make_dictionary(lengths):
def gen_length_code():
code = 0
for length in range(max(lengths) + 1):
yield code
code = (code + lengths.count(length)) << 1
def gen_dictionary(length_code):
for length in lengths:
yield (length_code[length], length)
length_code[length] += 1
return (*gen_dictionary([*gen_length_code()]),)
def dump_dictionary(dictionary, fmtstr=None):
if fmtstr is None:
fmtstr = '{0:3} {1:d}:{2}'
for literal in range(len(dictionary)):
code, length = dictionary[literal]
binary = ('{0:0%db}' % length).format(code)
print(fmtstr.format(literal, length, binary))
print()
print('文字(値)の固定辞書')
dump_dictionary(make_dictionary(
[8]*(144-0) + [9]*(256-144) + [7]*(280-256) + [8]*(288-280)
))
print('前方距離コード用固定辞書')
dump_dictionary(make_dictionary([5]*32))
BTYPE=01 の固定辞書
文字(値)の固定辞書
0 8:00110000
1 8:00110001
2 8:00110010
3 8:00110011
4 8:00110100
5 8:00110101
6 8:00110110
7 8:00110111
8 8:00111000
9 8:00111001
10 8:00111010
11 8:00111011
12 8:00111100
13 8:00111101
14 8:00111110
15 8:00111111
16 8:01000000
17 8:01000001
18 8:01000010
19 8:01000011
20 8:01000100
21 8:01000101
22 8:01000110
23 8:01000111
24 8:01001000
25 8:01001001
26 8:01001010
27 8:01001011
28 8:01001100
29 8:01001101
30 8:01001110
31 8:01001111
32 8:01010000
33 8:01010001
34 8:01010010
35 8:01010011
36 8:01010100
37 8:01010101
38 8:01010110
39 8:01010111
40 8:01011000
41 8:01011001
42 8:01011010
43 8:01011011
44 8:01011100
45 8:01011101
46 8:01011110
47 8:01011111
48 8:01100000
49 8:01100001
50 8:01100010
51 8:01100011
52 8:01100100
53 8:01100101
54 8:01100110
55 8:01100111
56 8:01101000
57 8:01101001
58 8:01101010
59 8:01101011
60 8:01101100
61 8:01101101
62 8:01101110
63 8:01101111
64 8:01110000
65 8:01110001
66 8:01110010
67 8:01110011
68 8:01110100
69 8:01110101
70 8:01110110
71 8:01110111
72 8:01111000
73 8:01111001
74 8:01111010
75 8:01111011
76 8:01111100
77 8:01111101
78 8:01111110
79 8:01111111
80 8:10000000
81 8:10000001
82 8:10000010
83 8:10000011
84 8:10000100
85 8:10000101
86 8:10000110
87 8:10000111
88 8:10001000
89 8:10001001
90 8:10001010
91 8:10001011
92 8:10001100
93 8:10001101
94 8:10001110
95 8:10001111
96 8:10010000
97 8:10010001
98 8:10010010
99 8:10010011
100 8:10010100
101 8:10010101
102 8:10010110
103 8:10010111
104 8:10011000
105 8:10011001
106 8:10011010
107 8:10011011
108 8:10011100
109 8:10011101
110 8:10011110
111 8:10011111
112 8:10100000
113 8:10100001
114 8:10100010
115 8:10100011
116 8:10100100
117 8:10100101
118 8:10100110
119 8:10100111
120 8:10101000
121 8:10101001
122 8:10101010
123 8:10101011
124 8:10101100
125 8:10101101
126 8:10101110
127 8:10101111
128 8:10110000
129 8:10110001
130 8:10110010
131 8:10110011
132 8:10110100
133 8:10110101
134 8:10110110
135 8:10110111
136 8:10111000
137 8:10111001
138 8:10111010
139 8:10111011
140 8:10111100
141 8:10111101
142 8:10111110
143 8:10111111
144 9:110010000
145 9:110010001
146 9:110010010
147 9:110010011
148 9:110010100
149 9:110010101
150 9:110010110
151 9:110010111
152 9:110011000
153 9:110011001
154 9:110011010
155 9:110011011
156 9:110011100
157 9:110011101
158 9:110011110
159 9:110011111
160 9:110100000
161 9:110100001
162 9:110100010
163 9:110100011
164 9:110100100
165 9:110100101
166 9:110100110
167 9:110100111
168 9:110101000
169 9:110101001
170 9:110101010
171 9:110101011
172 9:110101100
173 9:110101101
174 9:110101110
175 9:110101111
176 9:110110000
177 9:110110001
178 9:110110010
179 9:110110011
180 9:110110100
181 9:110110101
182 9:110110110
183 9:110110111
184 9:110111000
185 9:110111001
186 9:110111010
187 9:110111011
188 9:110111100
189 9:110111101
190 9:110111110
191 9:110111111
192 9:111000000
193 9:111000001
194 9:111000010
195 9:111000011
196 9:111000100
197 9:111000101
198 9:111000110
199 9:111000111
200 9:111001000
201 9:111001001
202 9:111001010
203 9:111001011
204 9:111001100
205 9:111001101
206 9:111001110
207 9:111001111
208 9:111010000
209 9:111010001
210 9:111010010
211 9:111010011
212 9:111010100
213 9:111010101
214 9:111010110
215 9:111010111
216 9:111011000
217 9:111011001
218 9:111011010
219 9:111011011
220 9:111011100
221 9:111011101
222 9:111011110
223 9:111011111
224 9:111100000
225 9:111100001
226 9:111100010
227 9:111100011
228 9:111100100
229 9:111100101
230 9:111100110
231 9:111100111
232 9:111101000
233 9:111101001
234 9:111101010
235 9:111101011
236 9:111101100
237 9:111101101
238 9:111101110
239 9:111101111
240 9:111110000
241 9:111110001
242 9:111110010
243 9:111110011
244 9:111110100
245 9:111110101
246 9:111110110
247 9:111110111
248 9:111111000
249 9:111111001
250 9:111111010
251 9:111111011
252 9:111111100
253 9:111111101
254 9:111111110
255 9:111111111
256 7:0000000
257 7:0000001
258 7:0000010
259 7:0000011
260 7:0000100
261 7:0000101
262 7:0000110
263 7:0000111
264 7:0001000
265 7:0001001
266 7:0001010
267 7:0001011
268 7:0001100
269 7:0001101
270 7:0001110
271 7:0001111
272 7:0010000
273 7:0010001
274 7:0010010
275 7:0010011
276 7:0010100
277 7:0010101
278 7:0010110
279 7:0010111
280 8:11000000
281 8:11000001
282 8:11000010
283 8:11000011
284 8:11000100
285 8:11000101
286 8:11000110
287 8:11000111
前方距離コード用固定辞書
0 5:00000
1 5:00001
2 5:00010
3 5:00011
4 5:00100
5 5:00101
6 5:00110
7 5:00111
8 5:01000
9 5:01001
10 5:01010
11 5:01011
12 5:01100
13 5:01101
14 5:01110
15 5:01111
16 5:10000
17 5:10001
18 5:10010
19 5:10011
20 5:10100
21 5:10101
22 5:10110
23 5:10111
24 5:11000
25 5:11001
26 5:11010
27 5:11011
28 5:11100
29 5:11101
30 5:11110
31 5:11111
BTYPE=10 : 動的辞書形式
BTYPE=10 ブロックは、次のようになっています。
情報量 | 項目 | 内容 |
---|---|---|
1 ビット | BFINAL | 最終ブロックならば 1 |
2 ビット | BTYPE | 10 |
??? | 動的辞書 | 圧縮されたハフマン符号の辞書 |
??? | データ | 圧縮データ |
辞書も圧縮されているので、辞書を解凍してから、圧縮データを解凍します。
動的辞書
BTYPE=10 ブロックの動的辞書は、次のようになっています。
情報量 | 項目 | 内容 |
---|---|---|
5 ビット | HLIT | 「LZ 符号の文字(長)」符号の数 -257 |
5 ビット | HDIST | 「LZ 符号の前方距離コード」符号の数 -1 |
4 ビット | HCLEN | CL4CL の数 - 4 |
(HCLEN + 4) * 3 ビット | CL4CL | 辞書(符号長配列): CL4LA,CL4DA |
(HLIT + 257 個分) ??? | CL4LA | 辞書(符号長配列を圧縮): LZ 符号の文字(長) |
(HDIST + 1 個分) ??? | CL4DA | 辞書(符号長配列を圧縮): LZ 符号の前方距離コード |
以下は勝手に短縮名にした項目
CL4CL = code lengths for the code length
CL4LA = code lengths for the literal/length alphabet
CL4DA = code lengths for the distance alphabet
CL4CL : code lengths for the code length
CL4CL は、CL4LA および CL4DA の圧縮に使用するハフマン符号の符号長が (HCLEN + 4) 個あります。
符号長(3 ビット:0 〜 7)の配列なので、ハフマン符号の辞書になります。
符号長は、次の配列インデックス順で並んでいます。
16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15
(15 - HCLEN) 個は、符号長をゼロとして扱います。
インデックスは、CL4LA,CL4DA の圧縮コードになります。
CL4LA/CL4DA の圧縮形式
圧縮コードは、次のようになります。
圧縮コード | 内容 | 付加情報 |
---|---|---|
0 〜 15 | CL4LA/CL4DA の符号長 | なし |
16 | 直前の値を複写(3〜6個) | 直後の 2 ビット(個数 - 3) |
17 | ゼロで埋める(3〜10個) | 直後の 3 ビット(個数 - 3) |
18 | ゼロで埋める(11〜138個) | 直後の 7 ビット(個数 - 11) |
これを使って、連続部分を「16,17,18 と付加情報」に変換します。
符号長の配列:
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5, 10, 0, 8, 0, 8, 10, 7, 6, 6, 9, 9, 7, 8, 7, 9,
7, 6, 7, 7, 7, 7, 7, 8, 7, 8, 8, 12, 10, 8, 9, 0,
圧縮形式:
17,(3),
17,(1),
8,
18,(10),
5, 10, 0, 8, 0, 8, 10, 7, 6, 6, 9, 9, 7, 8, 7, 9, 7, 6, 7,
16,(1),
8, 7, 8, 8, 12, 10, 8, 9, 0,
# (n) は文脈からビット数が決まり、(n) 以外はハフマン符号化されます
注意: CL4LA と CL4DA は、1 つの連続したデータ列として圧縮されています。
CL4LA の最後と CL4DA の最初は、圧縮コード 16,17,18 によってまとめられているかもしれません。
CL4LA : code lengths for the literal/length alphabet
CL4LA は、「LZ 符号の文字(長)」のハフマン符号の符号長配列を、CL4LA/CL4DA 圧縮形式にしたものになります。
文字(長)の符号長の配列は、符号長が 288 個ですが、CL4LA では (HLIT + 257) 個が格納されます。残り (31 - HLIT) 個は符号長をゼロとして扱います。
CL4DA : code lengths for the distance alphabet
CL4LA は、「LZ 符号の前方距離コード」のハフマン符号の符号長配列を、CL4LA/CL4DA 圧縮形式にしたものになります。
前方距離コードの符号長の配列は、符号長が 32 個ですが、CL4DA では (HDIST + 1) 個が格納されます。残り (31 - HDIST) 個は符号長をゼロとして扱います。
圧縮データ
「DEFLATE 圧縮データ形式での LZ 符号化」にハフマン符号化が施されたものになっています。
ハフマン符号の辞書情報は CL4LA/CL4DA になります。
ブロックの終わりは文字(長)の値が 256 です。ブロック 1 つにつき最後に 1 回だけ現れます。
解凍プログラム
圧縮データの詳細を表示するために作り始めましたが、中身は解凍プログラムになるので解凍結果を出力するようにしました。
zlib モジュールを使った方が圧倒的に高速なのは言うまでもないでしょう。
Python に限らず、処理を自作できると、既存のモジュールやライブラリの API に縛られる処理構造から脱却できます。
解凍プログラム
#!/usr/bin/env python3
import time
####################
#
# メッセージ処理
#
####################
def noop(*args, **kw): pass # 何もしない
def print_list(data, prefix=None, indent=0, width=16):
if prefix:
msgprint(prefix + ':')
indent = ' ' * indent
fmt = '%{0}d'.format(len('%d' % max(data)))
dlen = len(data)
for s in range(0, dlen, width):
line = [fmt % d for d in data[s:min(s + width, dlen)]]
msgprint('%s%04x: %s' % (indent, s, ' '.join(line)))
def set_verbose_mode():
global msgprint
global msgprint_list
msgprint = print
msgprint_list = print_list
msgprint = noop
msgprint_list = noop
########################
#
# チェック・サム
#
########################
#
# Adler-32 チェック・サム
#
class ADLER32:
@staticmethod
def adler32(bs):
return ADLER32(bs).get_value()
def __init__(self, data=None):
self.value = 1
if data != None:
self.update(data)
def update(self, data):
l = self.value & 0xffff
h = self.value >> 16
for d in data:
l = (l + d) % 65521
h = (h + l) % 65521
self.value = (h << 16) | l
def get_value(self):
return self.value
#
# CRC-32 チェック・サム
#
class CRC32:
@staticmethod
def make_table(poly=0xedb88320):
def f(x): return (x >> 1) ^ (poly if x & 1 else 0)
return [f(f(f(f(f(f(f(f(n)))))))) for n in range(256)]
@staticmethod
def crc32(data):
return CRC32(data).get_value()
MASK = 0xffffffff
TABLE = None
def __init__(self, data=None):
if CRC32.TABLE is None:
CRC32.TABLE = CRC32.make_table()
self.value = CRC32.MASK
if data != None:
self.update(data)
def update(self, data):
crc = self.value
for d in data:
crc = (crc >> 8) ^ CRC32.TABLE[(crc ^ d) & 0xff]
self.value = crc
def get_value(self):
return self.value ^ CRC32.MASK
################################
#
# 入出力
#
################################
#
# ビット単位での読取り:
# 8 ビットより大きい場合は、リトル・エンディアン形式として扱う.
#
class BitStream:
def __init__(self, inp):
self.istream = inp
self.fetch_data = 0
self.fetch_bits = 0
# 指定ビット数を先読み保持から破棄する
def feed(self, bits):
self.fetch_data >>= bits
self.fetch_bits -= bits
# 指定ビット数を先読みして保持する
def fetch(self, bits):
while bits > self.fetch_bits:
self.fetch_data |= self.readbyte() << self.fetch_bits
self.fetch_bits += 8
mask = (1 << bits) - 1
return (self.fetch_data & mask)
# 指定ビット数を取り出して進める
def read(self, bits):
value = self.fetch(bits)
self.feed(bits)
return value
# バイト境界まで読み出し位置を進める
def skip_align(self):
bits = self.fetch_bits & 7
if bits != 0:
self.feed(bits)
# 直接1バイト読み出す
def readbyte(self):
return self.istream.read(1)[0]
#
# 解凍データの出力先:
# 全ての解凍データを保持する(手抜き)
#
class OutputStream:
def __init__(self):
self.output = []
# 文字を1つ追加
def write(self, value):
self.output.append(value)
# distance/length による複写追記
def lzcopy(self, distance, length):
pos = len(self.output) - distance
for n in range(length):
self.output.append(self.output[pos+n])
#
# ハフマン符号の解凍
#
class DecodeHuffman:
# 符号のビット列を反転する
# length が 0 のときは None
@staticmethod
def reverse_code(code, length):
return int(('{0:0%db}' % length).format(code)[::-1], 2) if length != 0 else None
# 文字に対する符号長の配列で初期化する
# 入力: lengths[文字] = 符号長
def __init__(self, lengths):
# 最大符号長
self.max_length = max(lengths)
# 符号長ごとの符号の初期値を作る
# length_code[符号長] = 符号の初期値
def gen_length_code():
yield 0
code = 0
for length in range(1, self.max_length + 1):
yield code
code = (code + lengths.count(length)) << 1
# 文字に対する (反転符号, 符号長) を生成
def gen_dictionary(length_code):
for length in lengths:
yield (DecodeHuffman.reverse_code(length_code[length], length), length)
length_code[length] += 1
# 符号化辞書: dictionary[文字] = (反転符号, 符号長)
dictionary = (*gen_dictionary([*gen_length_code()]),)
# 復号辞書の生成
self.decode_table = [None] * (1 << self.max_length)
for literal in range(len(lengths)):
code, length = dictionary[literal]
if length == 0:
continue
step = 1 << length
value = (literal, length)
while code < len(self.decode_table):
self.decode_table[code] = value
code += step
# ストリームから復号して、文字を1つ取り出す
# 出力:(文字, 符号長, ビット反転符号)
def decode(self, bstream):
code = bstream.fetch(self.max_length)
literal, length = self.decode_table[code]
code &= (1 << length) - 1
if False:
msgprint(('Huffman: {2:#05x} <- {1:2}:{0:0%db}' % length).
format(DecodeHuffman.reverse_code(code, length), length, literal))
bstream.feed(length)
return (literal, length, code)
#
# RFC-1951: DEFLATE 圧縮データ形式の解凍
#
class Inflate:
CL4CL_ORDER = (
16, 17, 18,
0, 8,
7, 9,
6, 10,
5, 11,
4, 12,
3, 13,
2, 14,
1, 15
)
LENGTH_BASE = (
(3, 0), (4, 0), (5, 0), (6, 0),
(7, 0), (8, 0), (9, 0), (10, 0),
(11, 1), (13, 1), (15, 1), (17, 1),
(19, 2), (23, 2), (27, 2), (31, 2),
(35, 3), (43, 3), (51, 3), (59, 3),
(67, 4), (83, 4), (99, 4), (115, 4),
(131, 5), (163, 5), (195, 5), (227, 5),
(258, 0),
)
DISTANCE_BASE = (
(1, 0), (2, 0),
(3, 0), (4, 0),
(5, 1), (7, 1),
(9, 2), (13, 2),
(17, 3), (25, 3),
(33, 4), (49, 4),
(65, 5), (97, 5),
(129, 6), (193, 6),
(257, 7), (385, 7),
(513, 8), (769, 8),
(1025, 9), (1537, 9),
(2049, 10), (3073, 10),
(4097, 11), (6145, 11),
(8193, 12), (12289, 12),
(16385, 13), (24577, 13),
)
BTYPE00_LITERAL_LENGTH = [8]*(144-0) + [9]*(256-144) + [7]*(280-256) + [8]*(288-280)
BTYPE00_DISTANCE_LENGTH = [5]*32
def __init__(self, bitstream, ostream):
self.bstream = bitstream
self.ostream = ostream
self.code_decoder = None
self.literal_decoder = None
self.distance_decoder = None
self.BFINAL = None
self.BTYPE = None
self.HLIT = None
self.HDIST = None
self.HCLEN = None
self.CL4CL = None
self.CL4LA = None
self.CL4DA = None
#
# ブロックの解凍
#
def block(self):
msgprint('======== BLOCK ========')
self.BFINAL = self.bstream.read(1)
self.BTYPE = self.bstream.read(2)
BT0 = (self.BTYPE >> 0) & 1
BT1 = (self.BTYPE >> 1) & 1
msgprint('%8s: %d' % ('BFINAL', self.BFINAL))
msgprint('%8s: %d%d' % ('BTYPE', BT1, BT0))
if self.BTYPE == 0:
self.btype00()
elif self.BTYPE == 1:
self.btype01()
elif self.BTYPE == 2:
self.btype10()
elif self.BTYPE == 3:
raise NotImplemented # reserved
return (self.BFINAL == 0)
#
# ブロック タイプ 00 : 非圧縮なので複写
#
def btype00(self):
self.bstream.skip_align()
LEN = self.bstream.read(16)
NLEN = self.bstream.read(16) ^ 0xffff
if LEN != NLEN:
raise Exception('LEN(%#06x) != NLEN(%#06x)' % (LEN, NLEN))
msgprint('BTYPE00: LEN(%#06x), NLEN(%#06x)' % (LEN, NLEN))
for n in range(LEN):
self.ostream.write(self.bstream.read(8))
#
# ブロック タイプ 01 : 固定辞書による解凍
#
def btype01(self):
self.literal_decoder = DecodeHuffman(Inflate.BTYPE00_LITERAL_LENGTH)
self.distance_decoder = DecodeHuffman(Inflate.BTYPE00_DISTANCE_LENGTH)
self.decode_data()
#
# ブロック タイプ 10 : 動的辞書による解凍
#
def btype10(self):
self.btype10_header()
self.btype10_code_length()
self.btype10_literal_length()
self.btype10_distance_length()
self.decode_data()
def btype10_header(self):
HLIT = self.bstream.read(5)
HDIST = self.bstream.read(5)
HCLEN = self.bstream.read(4)
self.HLIT = HLIT + 257
self.HDIST = HDIST + 1
self.HCLEN = HCLEN + 4
msgprint('%8s: %d + 257 = %d' % ('HLIT', HLIT, self.HLIT))
msgprint('%8s: %d + 1 = %d' % ('HDIST', HDIST, self.HDIST))
msgprint('%8s: %d + 4 = %d' % ('HCLEN', HCLEN, self.HCLEN))
# 圧縮された動的辞書の解凍準備
def btype10_code_length(self):
CL4CL = []
for n in range(self.HCLEN):
CL4CL.append(self.bstream.read(3))
self.CL4CL = [0] * 19
for n in range(len(CL4CL)):
p = Inflate.CL4CL_ORDER[n]
self.CL4CL[p] = CL4CL[n]
msgprint_list(CL4CL, ' code length', 4)
self.code_decoder = DecodeHuffman(self.CL4CL)
self.HDICT = self.decode_dictionary()
# literal/length の解凍準備
def btype10_literal_length(self):
self.CL4LA = self.HDICT[:self.HLIT]
msgprint_list(self.CL4LA, ' literal/length code length', 4)
if self.CL4LA[256] == 0:
raise Exception('CODE[256] == 0')
self.literal_decoder = DecodeHuffman(self.CL4LA)
# distance の解凍準備
def btype10_distance_length(self):
self.CL4DA = self.HDICT[self.HLIT:]
msgprint_list(self.CL4DA, ' distance code length', 4)
self.distance_decoder = DecodeHuffman(self.CL4DA)
# literal/length, distance 符号情報の解凍
def decode_dictionary(self):
count = self.HLIT + self.HDIST
clen = []
while len(clen) < count:
(code, bits, huff) = self.code_decoder.decode(self.bstream)
# 0..15 はコードに対する符号のビット数
if code < 16:
clen.append(code)
continue
c = 0
n = 0
if code == 16:
# 直前の値を複写(3~6)
# 長さは直後の 2 ビットに 3 を加える
n = self.bstream.read(2) + 3
c = clen[-1]
elif code == 17:
# ゼロで埋める(3~10)
# 長さは直後の 3 ビットに 3 を加える
n = self.bstream.read(3) + 3
elif code == 18:
# ゼロで埋める(11~138)
# 長さは直後の 7 ビットに 11 を加える
n = self.bstream.read(7) + 11
else:
raise Exception('unknown code: %d' % code)
clen += [c] * n
return clen
# 圧縮データの解凍
def decode_data(self):
msgprint('COMPRESSED DATA:')
while True:
# literal/length の解凍
(code, bits, huff) = self.literal_decoder.decode(self.bstream)
# 0..255 は 8 ビットの文字データ(literal)
if code < 256:
self.ostream.write(code)
c = chr(code)
if not c.isprintable():
c = ''
msgprint(' LITERAL[%#04x]: %s' % (code, c))
continue
# 256 はブロックの終わり
if code == 256:
msgprint(' END:')
break
# 257..285 はデータの複写数情報(length)
# 以下で、実際の数を求める
length, b = Inflate.LENGTH_BASE[code - 257]
if b != 0:
length += self.bstream.read(b)
# データの複写開始位置情報(distance)の解凍
(code, bits, huff) = self.distance_decoder.decode(self.bstream)
# 以下で、実際の開始位置を求める
distance, b = Inflate.DISTANCE_BASE[code]
if b != 0:
distance += self.bstream.read(b)
# データの複写
msgprint(' LENGTH=%d, DISTANCE=%d' % (length, distance))
self.ostream.lzcopy(distance, length)
# RFC-1950: ZLIB 圧縮データ形式の解凍
class ZLIBInflate:
COMPRESSION_ALGORITHM = (
'fastest',
'fast',
'default',
'slowest',
)
def __init__(self, fp):
self.bstream = BitStream(fp)
self.ostream = OutputStream()
self.header_CMF = None
self.header_FLG = None
def CMF_CM(self):
return (self.header_CMF & 0x0f)
def CMF_CINFO(self):
return ((self.header_CMF >> 4) & 0x0f)
def flags_FCHECK(self):
return (self.header_FLG & 0x1f)
def flags_FDICT(self):
return (self.header_FLG & 0x20)
def flags_FLEVEL(self):
return ((self.header_FLG >> 6) & 3)
def inflate_date(self):
return self.ostream.output
#
# 入力ファイルの解凍処理
#
def inflate(self):
# CMF:FLG を調査
self.header_CMF = self.bstream.read(8)
self.header_FLG = self.bstream.read(8)
msgprint('CMF:FLG = %02x:%02x' % (self.header_CMF, self.header_FLG))
if ((self.header_CMF * 256 + self.header_FLG) % 31) != 0:
raise Exception('invalid FCHECK')
cm = self.CMF_CM()
if cm != 8:
raise Exception('unknown compression method: %d' % cm)
cinfo = self.CMF_CINFO()
if cinfo > 7:
raise Exception('invalid CINFO')
# window_size = 1 << (8 + cinfo)
# FDICT には非対応
if self.flags_FDICT():
self.DICTID = self.bstream.readbyte(4)
raise NotImplemented
# 圧縮データの解凍
self.inflate = Inflate(self.bstream, self.ostream)
while self.inflate.block():
pass
self.bstream.skip_align()
# Adler-32 によるエラー検知
xadler32 = ADLER32.adler32(self.inflate_date())
dadler32 = self.read32be()
msgprint('ADLER32: %#010x (%#010x)' % (dadler32, xadler32))
if xadler32 != dadler32:
raise Exception('ADLER32: %#010x != %#010x' % (dadler32, xadler32))
# ビッグ・エンディアンで 32bit の読み出し
def read32be(self):
b0 = self.bstream.read(8)
b1 = self.bstream.read(8)
b2 = self.bstream.read(8)
b3 = self.bstream.read(8)
return (b0 << 24) | (b1 << 16) | (b2 << 8) | b3
# RFC-1952: GZIP 圧縮データ形式の解凍
class GZIPInflate:
OS_NAME = {
0: 'FAT filesystem (MS-DOS, OS/2, NT/Win32)',
1: 'Amiga',
2: 'VMS (or OpenVMS)',
3: 'Unix',
4: 'VM/CMS',
5: 'Atari TOS',
6: 'HPFS filesystem (OS/2, NT)',
7: 'Machintosh',
8: 'Z-System',
9: 'CP/M',
10: 'TOPS-20',
11: 'NTFS filesystem (NT)',
12: 'QDOS',
13: 'Acorn RISCOS',
255: 'unknown',
}
XFL_NAME = {
0: '(default compression)',
2: 'slowest algorithm (maximum compression)',
4: 'fastest algorithm',
}
def __init__(self, fp):
self.bstream = BitStream(fp)
self.ostream = OutputStream()
self.read_data = []
self.compression_method = None
self.flags = None
self.mtime = None
self.extra_flags = None
self.os = None
self.extra = None
self.name = None
self.comment = None
self.crc16 = None
def flags_FTEXT(self):
return (self.flags & 0x01)
def flags_FHCRC(self):
return (self.flags & 0x02)
def flags_FEXTRA(self):
return (self.flags & 0x04)
def flags_FNAME(self):
return (self.flags & 0x08)
def flags_FCOMMENT(self):
return (self.flags & 0x10)
def inflate_date(self):
return self.ostream.output
# 入力ファイルから1バイトの読取り
# FHCRC 処理に備える
def readbyte(self, n=1):
r = 0
b = 0
for i in range(n):
d = self.bstream.readbyte()
self.read_data.append(d)
r += d << b
b += 8
return r
#
# 入力ファイルの解凍処理
#
def inflate(self):
# ID を調査
self.check_id()
# ヘッダ部の読み込み
self.read_header()
# 圧縮データの解凍
self.inflate = Inflate(self.bstream, self.ostream)
while self.inflate.block():
pass
self.bstream.skip_align()
# CRC32 によるエラー検知
xdata = self.inflate_date()
xcrc32 = CRC32.crc32(xdata)
dcrc32 = self.bstream.read(32)
msgprint('CRC32: %#010x (%#010x)' % (dcrc32, xcrc32))
if dcrc32 != xcrc32:
raise Exception('CRC32: %#010x != %#010x' % (dcrc32, xcrc32))
# 元データとの大きさ比較によるエラー検知
xsize = len(xdata)
isize = self.bstream.read(32)
msgprint('ISIZE: %#010x (%#010x)' % (isize, xsize))
if isize != xsize:
raise Exception('ISIZE: %#010x != %#010x' % (isize, xsize))
# ファイルの先頭が 0x1f, 0x8b か?
def check_id(self):
id1 = self.readbyte()
id2 = self.readbyte()
magic = 'FILE MAGIC: %#04x,%#04x' % (id1, id2)
if id1 != 0x1f or id2 != 0x8b:
raise Exception('Invalid ' + magic)
msgprint(magic)
#
# ヘッダ部の読み込み
#
def read_header(self):
# 必須
self.read_header_CM()
self.read_header_FLG()
self.read_header_MTIME()
self.read_header_XFL()
self.read_header_OS()
# オプション
self.read_header_EXTRA()
self.read_header_NAME()
self.read_header_COMMENT()
self.read_header_CRC16()
def read_header_CM(self):
self.compression_method = self.readbyte()
msgprint('%8s: %#04x' % ('CM', self.compression_method))
if self.compression_method != 8:
raise Exception('unknown compression method: %d' % self.compression_method)
def read_header_FLG(self):
self.flags = self.readbyte()
mflg = ''
if self.flags_FTEXT():
mflg += ', FTEXT'
if self.flags_FHCRC():
mflg += ', FHCRC'
if self.flags_FEXTRA():
mflg += ', FEXTRA'
if self.flags_FNAME():
mflg += ', FNAME'
if self.flags_FCOMMENT():
mflg += ', FCOMMENT'
msgprint('%8s: %#04x%s' % ('FLG', self.flags, mflg))
def read_header_MTIME(self):
self.mtime = self.readbyte(4)
msgprint('%8s: %s (%#010x)' % ('MTIME', time.ctime(self.mtime), self.mtime))
def read_header_XFL(self):
self.extra_flags = self.readbyte()
msgprint('%8s: %#04x, %s' % ('XFL', self.extra_flags, GZIPInflate.XFL_NAME[self.extra_flags]))
def read_header_OS(self):
self.os = self.readbyte()
msgprint('%8s: %#04x, %s' % ('OS', self.os, GZIPInflate.OS_NAME[self.os]))
def read_header_EXTRA(self):
self.extra = None
if self.flags_FEXTRA():
self.read_EXTRA()
def read_header_NAME(self):
self.name = None
if self.flags_FNAME():
self.name = self.read_CString()
msgprint('%8s: \'%s\'' % ('NAME', self.name))
def read_header_COMMENT(self):
self.comment = None
if self.flags_FCOMMENT():
self.comment = self.read_CString()
msgprint('%8s: %s' % ('COMMENT', self.name))
def read_header_CRC16(self):
self.crc16 = None
if self.flags_FHCRC():
crc16 = 0xffff & CRC32.crc32(self.read_data)
self.crc16 = self.readbyte(2)
msgprint('CRC16: %#06x, %#06x' % self.crc16, crc16)
# 拡張データの読み込み
def read_EXTRA(self):
xlen = self.readbyte(2)
self.extra = []
for n in range(xlen):
self.extra.append(self.readbyte())
msgprint('%8s: [%d] ...' % ('EXTRA', len(self.extra)))
# NULL 文字終端の文字列の読み込み
def read_CString(self):
s = ''
while True:
c = self.readbyte()
if c == 0:
break
s += chr(c)
return s
# コマンド ライン起動用の関数
def main():
import argparse
import codecs
import sys
# コマンド ライン引数の解析
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', help='冗長モード')
parser.add_argument('-o', '--output', metavar='OUTPUT', help='出力ファイル')
parser.add_argument('INPUT', help='入力ファイル')
args = parser.parse_args()
# 冗長モードを設定
if args.verbose or args.output is None:
set_verbose_mode()
# 入力ファイルを開く
if len(args.INPUT) == 0:
return parser.print_help()
if args.INPUT == '-':
inp = sys.stdin.buffer
else:
inp = open(args.INPUT, 'rb')
# 解凍処理
gz = GZIPInflate(inp)
gz.inflate()
# 解凍データを出力
if args.output:
if args.output == '-':
out = sys.stdout
else:
out = open(args.output, 'wb')
out.write(bytearray(gz.inflate_date()))
if __name__ == '__main__':
main()