1
2

ZLIB/GZIP DEFLATE 圧縮データの解凍処理は難しくないので作ってみる(Python版)

Last updated at Posted at 2021-04-01

以下の仕様書を参照しています。

  • RFC 1950 : ZLIB Compressed Data Format Specification version 3.3
  • RFC 1951 : DEFLATE Compressed Data Format Specification version 1.3
  • RFC 1952 : GZIP file format specification version 4.3

データ圧縮の概要

主に、二種類の圧縮用符号化が使用されています。

  • LZ 符号化
  • ハフマン符号化

LZ 符号化

LZ 符号化は、次のような変換イメージです。

データ:
    'code length, literal/length alphabet, distance alphabet, abababababa'

LZ符号:
    'code length, literal/' {6,16} ' alphabet, distance' {11,19} 'ab' {9,2}

{文字数, 前方距離} 形式は、「前方距離」分の文字を遡り、「文字数」を複写
    { 6, 16} -> 'length'
    {11, 19} -> ' alphabet, '
    { 9,  2} -> 'ababababa'

DEFLATE 圧縮データ形式での LZ 符号化

DEFLATE 圧縮データ形式での LZ 符号では、文字の範囲を拡張して複写情報などを含みます。

種類 内容
0 〜 255 文字 8 ビットのデータ
256 制御文字 ブロック終端(後述)
257 〜 285 複写 複写の文字数情報も兼ねる(付加情報あり)
286 〜 287 未使用

値は、文字や複写情報などですが、これを**文字(長)**と略記します。

■ 複写の場合

複写の場合は、次のようになります。

文字(長) 文字数用付加情報 前方距離コード 前方距離用付加情報
257 〜 285 0 〜 5 ビット 0 〜 29 0 〜 13 ビット

前半 2 項目が「文字数」で、後半 2 項目が「前方距離」です。これで {文字数, 前方距離} になります。

前半:文字数の詳細

文字(長) 付加ビット数 文字数
257 0 3
258 0 4
259 0 5
260 0 6
261 0 7
262 0 8
263 0 9
264 0 10
265 1 11 〜 12
266 1 13 〜 14
267 1 15 〜 16
268 1 17 〜 18
269 2 19 〜 22
270 2 23 〜 26
271 2 27 〜 30
272 2 31 〜 34
273 3 35 〜 42
274 3 43 〜 50
275 3 51 〜 58
276 3 59 〜 66
277 4 67 〜 82
278 4 83 〜 98
279 4 99 〜 99
280 4 115 〜 130
281 5 131 〜 162
282 5 163 〜 194
283 5 195 〜 226
284 5 227 〜 257
285 0 258

後半:前方距離の詳細

前方距離コード 付加ビット数 前方距離
0 0 1
1 0 2
2 0 3
3 0 4
4 1 5 〜 6
5 1 7 〜 8
6 2 9 〜 12
7 2 13 〜 16
8 3 17 〜 24
9 3 25 〜 32
10 4 33 〜 48
11 4 49 〜 64
12 5 65 〜 96
13 5 97 〜 128
14 6 129 〜 192
15 6 193 〜 256
16 7 257 〜 384
17 7 385 〜 512
18 8 513 〜 768
19 8 769 〜 1024
20 9 1025 〜 1536
21 9 1537 〜 2048
22 10 2049 〜 3072
23 10 3073 〜 4096
24 11 4097 〜 6144
25 11 6145 〜 8192
26 12 8193 〜 12288
27 12 12289 〜 16384
28 13 16385 〜 24576
29 13 24577 〜 32768
30 0 (未使用)
31 0 (未使用)

■ 上記例の場合

LZ符号化例
'code length, literal/' {6,16} ' alphabet, distance' {11,19} 'ab' {9,2}

上記の複写情報 {文字数, 前方距離} は、以下になります。

複写情報 文字(長) 値(ビット数) 前方距離コード 値(ビット数)
{ 6, 16 } 260 7 3 (2 ビット)
{ 11, 19 } 265 0 (1 ビット) 8 2 (3 ビット)
{ 9, 2 } 263 1

「文字(長)」と「前方距離コード」のビット数は、ハフマン符号化されるので不定です。

LZ 符号化例の DEFLATE 形式を疑似的に表現すると、以下のようになります。

DEFLATE形式
'code length, literal/', 260,7,(3), ' alphabet, distance', 265,(0),8,(2), 'ab', 263,1

# (n) は文脈からビット数が決まり、(n) 以外[文字:0〜255を含む]はハフマン符号化されます

ハフマン符号化

例として、次の文字列を下記プログラム(huffman.py)で、ハフマン符号を作ります。

'huffman compressed data'
huffman.py : ハフマン符号生成プログラム
huffman.py
#!/usr/bin/env python3

#
# ハフマン符号の生成
#
class Huffman:

    # 生成される符号
    class Code:
        def __init__(self, code, length, literal, freq):
            self.code = code
            self.length = length
            self.literal = literal
            self.freq = freq
            self.set_binary()
            self.set_char()

        def set_binary(self):
            self.binary = ('{0:0%sb}' % self.length).format(self.code)

        def set_char(self):
            c = chr(self.literal)
            if c.isprintable():
                self.char = '\'%s\'' % c
            else:
                self.char = '%#x' % self.literal

        def key_code(self): return self.code
        def key_length(self): return self.length
        def key_length_literal(self): return (self.length, self.literal)
        def key_literal(self): return self.literal
        def key_freq_literal(self): return (self.freq, self.literal)
        def key_binary(self): return self.binary


    # 葉(Leaf)または節(Node)となる要素
    class Element:
        def __init__(self, freq, literal, branch):
            self.freq = freq
            self.literal = literal
            self.branch = branch

        def key_freq_literal(self):
            return (self.freq, self.literal)


    # 葉 情報
    class Leaf(Element):
        def __init__(self, freq, literal):
            super().__init__(freq, literal, None)


    # 節 情報
    class Node(Element):
        def __init__(self, freq, branch):
            super().__init__(freq, 0, branch)


    # 木の生成
    @staticmethod
    def build_tree(data):
        tree = []

        # 葉の一覧を作る
        for literal in range(min(data), max(data) + 1):
            freq = data.count(literal)
            if freq != 0:
                tree.append(Huffman.Leaf(freq, literal))

        # 木を作る
        while len(tree) > 1:
            # 頻度,文字の優先順位でソート
            tree = sorted(tree, key=Huffman.Element.key_freq_literal)

            # 頻度の下位2つを、1つの節にまとめる
            branch = tree[:2]
            tree = tree[2:]
            freq = branch[0].freq + branch[1].freq
            tree.append(Huffman.Node(freq, branch))

        return tree[0]


    # 辞書の生成
    @staticmethod
    def make_dictionary(top):
        dictionary = []
        nodes = [(top, 0, 0)]
        while len(nodes) != 0:
            elem, code, length = nodes.pop()
            if elem.branch is None:
                dictionary.append(Huffman.Code(code, length, elem.literal, elem.freq))
                continue
            nodes.append((elem.branch[0], code * 2 + 0, length + 1))
            nodes.append((elem.branch[1], code * 2 + 1, length + 1))
        return sorted(dictionary, key=Huffman.Code.key_literal)


    # RFC 1951: DEFLATE 形式に変換
    @staticmethod
    def to_deflate(dictionary):
        dictionary = sorted(dictionary, key=Huffman.Code.key_literal)

        length_min = min(dictionary, key=Huffman.Code.key_length).length
        length_max = max(dictionary, key=Huffman.Code.key_length).length
        length_size = length_max + 1

        length_count = [0] * length_size
        for code in dictionary:
            length_count[code.length] += 1

        code = 0
        length_code = [0] * length_size
        for n in range(length_max+1):
            code <<= 1
            length_code[n] = code
            code += length_count[n]

        for code in dictionary:
            code.code = length_code[code.length]
            code.set_binary()
            length_code[code.length] += 1


    #
    # Huffman クラスの初期化
    #
    def __init__(self, data, deflate=False):
        self.tree = Huffman.build_tree(data)
        self.dictionary = Huffman.make_dictionary(self.tree)
        if deflate:
            Huffman.to_deflate(self.dictionary)


    # 統計情報の取得
    def statistics(huff):
        length = 0
        freq = 0
        max_code = 0
        max_length = 0
        max_literal = 0
        max_char = 0
        max_freq = 0
        max_binary = 0
        for code in huff.dictionary:
            c = code.code
            b = code.length
            l = code.literal
            s = len(code.char)
            w = code.freq
            p = len(code.binary)

            max_code = max(c, max_code)
            max_length = max(b, max_length)
            max_literal = max(l, max_literal)
            max_char = max(s, max_char)
            max_freq = max(w, max_freq)
            max_binary = max(p, max_binary)

            length += (b * w)
            freq += w

        return {
            'total': {
                'length': length,
                'freq': freq,
            },
            'max': {
                'code': max_code,
                'length': max_length,
                'literal': max_literal,
                'char': max_char,
                'freq': max_freq,
                'binary': max_binary,
            },
        }


#
# コマンド ラインから呼び出し
#
if __name__ == '__main__':
    def main():
        import sys
        import codecs
        import argparse

        #
        # コマンド ライン引数の解析
        #
        parser = argparse.ArgumentParser(description='データからハフマン符号を生成します')
        parser.add_argument('-T', '--text', help='入力文字列 (FILE は無視される)')
        parser.add_argument('-SF', '--sort-freq', action='store_true', help='頻度でソート')
        parser.add_argument('-SL', '--sort-literal', action='store_true', help='文字(値)でソート')
        parser.add_argument('-SB', '--sort-length', action='store_true', help='符号長でソート')
        parser.add_argument('-SC', '--sort-code-be', action='store_true', help='符号(左詰)でソート [デフォルト]')
        parser.add_argument('-Sc', '--sort-code-le', action='store_true', help='符号(右詰)でソート')
        parser.add_argument('-p', '--padding', metavar='CHAR', help='符号に CHAR 文字を追加')
        parser.add_argument('-r', '--reverse', action='store_true', help='降順でソート')
        parser.add_argument('-D', '--deflate', action='store_true', help='DEFLATE 形式の辞書にする')
        parser.add_argument('-DL', '--deflate-literal', action='store_true',
                            help='RFC 1951 literal/length 固定辞書モード (FILE は無視される)')
        parser.add_argument('-DD', '--deflate-distance', action='store_true',
                            help='RFC 1951 の distance 固定辞書モード (FILE は無視される)')
        parser.add_argument('-F', '--format', metavar='STR', help='出力形式を指定')
        parser.add_argument('-N', '--no-total', action='store_true', help='合計の出力を抑制する')
        parser.add_argument('FILE', nargs='?')
        args = parser.parse_args()

        # 出力形式:
        #   %B: 符号長
        #   %C: 符号(10進数)
        #   %P: 符号(左詰:2進数)
        #   %p: 符号(右詰:2進数)
        #   %L: 文字
        #   %D: 文字の値(10進数)
        #   %X: 文字の値(16進数大文字)
        #   %x: 文字の値(16進数小文字)
        #   %F: 頻度

        line_format = args.format
        if line_format is None:
            if args.deflate_literal:
                line_format = '%D=0x%X %B:%P'
            elif args.deflate_distance:
                line_format = '%D %B:%P'
            else:
                line_format = '%L (%D=0x%X)[%F] %B:%P'

        #
        # ソート方法の設定
        #
        if args.sort_freq:
            sort_key = Huffman.Code.key_freq_literal
        elif args.sort_literal:
            sort_key = Huffman.Code.key_literal
        elif args.sort_length:
            sort_key = Huffman.Code.key_length_literal
        elif args.sort_code_be:
            sort_key = Huffman.Code.key_code
        else:  # if args.sort_binary: # default
            sort_key = Huffman.Code.key_binary

        #
        # データの読み込みとハフマン符号の作成
        #
        flag_deflate = args.deflate
        flag_total = not args.no_total
        if args.deflate_literal:
            # RFC 1951 literal/length 固定辞書用
            flag_deflate = True
            flag_total = False
            data = []
            for n in range(144):
                data += [(n + 0)] * 2
            for n in range(112):
                data += [(n + 144)] * 1
            for n in range(24):
                data += [(n + 256)] * 4
            for n in range(8):
                data += [(n + 280)] * 2
        elif args.deflate_distance:
            # RFC 1951 distance 固定辞書用
            flag_deflate = True
            flag_total = False
            data = [n for n in range(32)]
        elif args.text:
            data = codecs.encode(args.text)
        else:
            if args.FILE is None:
                parser.print_help()
                return
            if args.FILE == '-':
                data = codecs.encode(sys.stdin.read())
            else:
                data = open(args.FILE, 'rb').read()
        huff = Huffman(data, flag_deflate)

        #
        # 結果の出力
        #
        dinf = huff.statistics()

        clen = len('%d' % dinf['max']['code'])
        dlen = len('%d' % dinf['max']['literal'])
        xlen = len('%x' % dinf['max']['literal'])
        slen = dinf['max']['char']
        flen = len('%d' % dinf['max']['freq'])
        blen = len('%d' % dinf['max']['length'])
        plen = dinf['max']['binary']

        fmtstr_dict = {
            'C': '{0:%dd}' % clen,
            'B': '{1:%dd}' % blen,
            'D': '{2:%dd}' % dlen,
            'X': '{2:0%dX}' % xlen,
            'x': '{2:0%dx}' % xlen,
            'F': '{3:%dd}' % flen,
            'P': '{4:<%ds}' % plen,
            'p': '{4:>%ds}' % plen,
            'L': '{5:<%ds}' % slen,
        }

        # 出力形式の解析
        fmtstr = ''
        prefix = False
        for c in line_format:
            if not prefix:
                if c != '%':
                    fmtstr += c
                    continue
                prefix = True
                continue

            prefix = False
            if c in fmtstr_dict:
                fmtstr += fmtstr_dict[c]
                continue
            fmtstr += c

        # 辞書の出力
        squote = '\''
        pchar = args.padding and args.padding[0] or ' '
        for code in sorted(huff.dictionary, key=sort_key, reverse=args.reverse):
            binary = code.binary
            if args.padding:
                binary += (pchar * (plen - len(binary)))
            pass
            print(fmtstr.format(code.code, code.length, code.literal, code.freq, binary, code.char))

        # 合計の出力
        if flag_total:
            total = dinf['total']
            freq = total['freq']
            print('入力: %d バイト (%d ビット)' % (freq, freq * 8))
            print('符号: %d バイト (%d ビット)' % ((total['length'] + 7) >> 3, total['length']))


    main()
実行結果
$ python3 huffman.py -T 'huffman compressed data' -SL -F '%F %L %P'
2 ' ' 1110 
3 'a' 100  
1 'c' 10100
2 'd' 1111 
2 'e' 000  
2 'f' 001  
1 'h' 10101
2 'm' 010  
1 'n' 10110
1 'o' 10111
1 'p' 11000
1 'r' 11001
2 's' 011  
1 't' 11010
1 'u' 11011
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)

リストの出力形式は

  頻度 '文字' 符号

になっています。

■ 頻度 : 文字に対する文字列中の個数です。空白は 2 個、'a' は 3 個です。

■ 符号 : ビット列です。頻度が高いほど短くなり、低いほど長くなります。この性質がデータ圧縮の鍵です。ハフマン木の分岐は左から右へ順に行います

■ 入力 : 1 文字を 1 バイトとする文字列として、23 バイトになります。

■ 符号 : 文字を符号に変換して、ビット列にすると 12 バイトにすることができます。ただし、符号化・復号で同じ辞書をプログラム側で用意する必要があります。

※ 辞書 : 文字と符号は 1 対 1 の対応になります。対応の一覧が辞書です。

符号化と復号

文字と符号が 1 対 1 なので、符号化・復号ともに置き換え処理を文字数だけループするプログラムになります。

上記で作った辞書(文字 ' acdefhmnoprst' だけで作られた文字列が対象になる)を使った、ハフマン符号化と復号のサンプル プログラムを作ってみます。

ハフマン符号化と復号
sample1.py
#!/usr/bin/env python3

# 辞書
dictionary = (
    (' ', '1110'),
    ('a', '100'),
    ('c', '10100'),
    ('d', '1111'),
    ('e', '000'),
    ('f', '001'),
    ('h', '10101'),
    ('m', '010'),
    ('n', '10110'),
    ('o', '10111'),
    ('p', '11000'),
    ('r', '11001'),
    ('s', '011'),
    ('t', '11010'),
    ('u', '11011'),
)

# 事前準備
code_length_min = min((len(d[1]) for d in dictionary))
code_length_max = max((len(d[1]) for d in dictionary))
code_length_range = range(code_length_min, code_length_max + 1)
code_literal_dict = {code: literal for literal, code in dictionary}
literal_code_dict = {literal: code for literal, code in dictionary}


# 復号
def decode(data):
    output = ''
    while len(data):
        for length in code_length_range:
            code = data[:length]
            if code in code_literal_dict:
                output += code_literal_dict[code]
                data = data[length:]
                break
        else:
            raise Exception('バグってる')
    return output


# 符号化
def encode(data):
    return ''.join(literal_code_dict[literal] for literal in data)


# 例
data = 'huffman compressed data'

# 符号化と復号
enc_data = encode(data)
dec_data = decode(enc_data)

print('文字列: \'%s\'' % data)
print('符号化 %d ビット: %s' % (len(enc_data), enc_data))
print('復号  %d バイト: \'%s\'' % (len(dec_data), dec_data))
print('検証:', data == dec_data and '正常' or '異常')

(エンディアンの問題を避けるため、符号を文字列のまま処理しています)

実行結果
文字列: 'huffman compressed data'
符号化 89 ビット: 10101110110010010101001011011101010010111010110001100100001101100011111110111110011010100
復号  23 バイト: 'huffman compressed data'
検証: 正常

符号化すると、ごちゃごちゃしたビット列の情報になり、復号すると元に戻ります。

DEFLATE 圧縮データ形式でのハフマン符号

huffman.py に '-D' オプションを付けると DEFLATE 圧縮データ形式で使用する符号を生成します。

上記と同じ例の文字列を、プログラムのオプションを増やして、ハフマン符号を作ります。

'huffman compressed data'
実行結果
$ python3 huffman.py -T 'huffman compressed data' -SL -F '%F %L %B:%P' -D
2 ' ' 4:1010 
3 'a' 3:000  
1 'c' 5:11000
2 'd' 4:1011 
2 'e' 3:001  
2 'f' 3:010  
1 'h' 5:11001
2 'm' 3:011  
1 'n' 5:11010
1 'o' 5:11011
1 'p' 5:11100
1 'r' 5:11101
2 's' 3:100  
1 't' 5:11110
1 'u' 5:11111
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)

リストの出力形式は

  頻度 '文字' 符号長:符号

になっています。

これは '文字' でソートしたリストですが、(符号,'文字') の優先順位でソートしてみます。

ソート方法を変更
$ python3 huffman.py -T 'huffman compressed data' -SB -F '%F %L %B:%P' -D
3 'a' 3:000  
2 'e' 3:001  
2 'f' 3:010  
2 'm' 3:011  
2 's' 3:100  
2 ' ' 4:1010 
2 'd' 4:1011 
1 'c' 5:11000
1 'h' 5:11001
1 'n' 5:11010
1 'o' 5:11011
1 'p' 5:11100
1 'r' 5:11101
1 't' 5:11110
1 'u' 5:11111
入力: 23 バイト (184 ビット)
符号: 12 バイト (89 ビット)

符号の 2 進数を 10 進数にすると

3 'a' 3:000   =  0
2 'e' 3:001   =  1
2 'f' 3:010   =  2
2 'm' 3:011   =  3
2 's' 3:100   =  4
2 ' ' 4:1010  =  5 * 2 + 0 = 10
2 'd' 4:1011  =  5 * 2 + 1 = 11
1 'c' 5:11000 = (5 * 2 + 2) * 2 + 0 = 24
1 'h' 5:11001 = (5 * 2 + 2) * 2 + 1 = 25
1 'n' 5:11010 = (5 * 2 + 2) * 2 + 2 = 26
1 'o' 5:11011 = (5 * 2 + 2) * 2 + 3 = 27
1 'p' 5:11100 = (5 * 2 + 2) * 2 + 4 = 28
1 'r' 5:11101 = (5 * 2 + 2) * 2 + 5 = 29
1 't' 5:11110 = (5 * 2 + 2) * 2 + 6 = 30
1 'u' 5:11111 = (5 * 2 + 2) * 2 + 7 = 31

だから、('文字',符号長) の一覧があれば符号を求められることが分かります。

■ 辞書を圧縮する

code_length の '文字' 順に符号長を配列 (4,3,5,4,3,3,5,3,5,5,5,3,5,5) にすると、符号長 3,4,5 に対応するハフマン符号を割り当てることができます。

符号長のハフマン符号
$ python3 huffman.py -T '435433535555355' -SL -F '%F %L %B:%P' -D -N
5 '3' 2:10
2 '4' 2:11
8 '5' 1:0 

この辞書は、配列 (0,0,0,2,2,1) から作ることができます。(0 は情報として存在しないことを意味する)

これで「符号長の配列=辞書相当」をハフマン符号化して圧縮できることになります。

DEFLATE 形式の利点は
  「配列のインデックスが文字(値)」となる「符号長の配列」から「辞書を再現できる」 
   ( length = code_length [ literal ]  -->  code, length = dictionary [ literal ] )
ことです。

配列から辞書を作るサンプル プログラム
sample2.py
#!/usr/bin/env python3

# 「文字と符号長」の一覧から辞書を作る
def make_dictionary(lengths):
    if type(lengths) == dict:
        literals = tuple(literal for literal in lengths)
        lengths = [lengths[literal] for literal in lengths]
    else:
        literals = range(len(lengths))

    def gen_length_code():
        yield 0
        code = 0
        for length in range(1, max(lengths) + 1):
            yield code
            code = (code + lengths.count(length)) << 1

    def gen_dictionary(length_code):
        for index in range(len(lengths)):
            literal = literals[index]
            length = lengths[index]
            code = length_code[length] if length != 0 else None
            length_code[length] += 1
            yield (literal, code, length)

    return (*gen_dictionary([*gen_length_code()]),)


# 辞書の表示
def dump_dictionary(dictionary, fmtstr=None):
    if not fmtstr:
        fmtstr = '\'{0}\' {1:d}:{2}'
    for literal, code, length in dictionary:
        if length != 0:
            binary = ('{0:0%db}' % length).format(code)
            print(fmtstr.format(literal, length, binary))


if __name__ == '__main__':

    print('文字列例の辞書')
    dump_dictionary(make_dictionary({
        ' ': 4, 'a': 3, 'c': 5, 'd': 4, 'e': 3,
        'f': 3, 'h': 5, 'm': 3, 'n': 5, 'o': 5,
        'p': 5, 'r': 5, 's': 3, 't': 5, 'u': 5,
    }))
    print()

    print('符号長の辞書')
    dump_dictionary(make_dictionary((
        0, 0, 0, 2, 2, 1
    )))
sample2.pyの実行結果
文字列例の辞書
' ' 4:1010
'a' 3:000
'c' 5:11000
'd' 4:1011
'e' 3:001
'f' 3:010
'h' 5:11001
'm' 3:011
'n' 5:11010
'o' 5:11011
'p' 5:11100
'r' 5:11101
's' 3:100
't' 5:11110
'u' 5:11111

符号長の辞書
'3' 2:10
'4' 2:11
'5' 1:0

ZLIB/GZIP 圧縮データ形式

ZLIB, GZIP のデータは大まかに、次のようになっています。

項目 ZLIB GZIP
ヘッダ部 ZLIB の情報 GZIP の情報
圧縮データ DEFLATE 形式 DEFLATE 形式
チェック・サム ADLER32 CRC32
サイズ なし ISIZE

ZLIB 形式では、ネットワーク・バイト・オーダー(ビッグ・エンディアン:上位バイト、下位バイトの順)で格納されます。解凍処理後のデータ検証には Adler-32 が利用できます。

GZIP 形式では、リトル・エンディアン(下位バイト、上位バイトの順)で格納されます。解凍処理後のデータ検証には、CRC-32 と ISIZE(元データのサイズ:32 bit)が利用できます。

※ ZLIB/GZIP の情報は、解凍処理に影響がないので折りたたみます。


ZLIB の情報

ZLIB の情報

ZLIB 圧縮データ形式は先頭から圧縮データの直前までは、次のようになっています。

項目 バイト数 内容 備考
CMF 1 圧縮形式と圧縮形式の情報
FLG 1 フラグなど
DICTID 0 か 4 固定辞書の ID FLG.FDICT が 0 の場合はなし

CMF : Compression Method and Flags

CMF の内容は、次のとおりです。

ビット位置 項目 内容
3 〜 0 CM 圧縮形式
7 〜 4 CINFO 圧縮形式に対する情報

CM : 圧縮形式

CM が 8 のとき、圧縮データは DEFLATE 形式になります。15 は予約です。

(RFC 1950 : Version 3.3 では CM=8 のみが許可されています)

CINFO : 圧縮形式に対する情報

CM=8 のとき、CINFO は LZ77 のウインドウ・サイズを、0 から 7 で表します。

ウインドウ・サイズ = 2^{CINFO+8}

FLG : FLaGs

FLG の内容は、次のとおりです。

ビット位置 項目 内容
4 〜 0 FCHECK CMF,FLG の検証用
5 FDICT DICTID あり
7 〜 6 FLEVEL 圧縮アルゴリズムの種類

FCHECK

次の計算が真になるように、FCHECK を定めます。

(CMF * 256 + FLG) % 31 == 0

偽ならば、エラーです。

FDICT

後続に DICTID が存在する場合は 1 になります。(CM=8 の場合は 0 です)

FLEVEL

FLEVEL の値と内容は、次のとおりです。

アルゴリズム
0 最速 : 処理時間が最短
1 高速 : 処理時間が短い
2 デフォルト
3 最小 : 処理時間が長い

再圧縮などでの参考値です。

DICTID

辞書のバイト・シーケンスを Adler-32 で計算した値が、辞書 ID になります。


GZIP の情報

GZIP の情報

GZIP 圧縮データ形式は先頭から圧縮データの直前までは、次のようになっています。

項目 バイト数 内容 備考
ID1 1 固定値 31 = 0x1f
ID2 1 固定値 139 = 0x8b
CM 1 圧縮形式
FLG 1 フラグなど
MTIME 4 日時
XFL 1 追加フラグ
OS 1 圧縮場所 テキスト形式の参考情報
EXTRA ? 追加情報 FLG.FEXTRA が 0 の場合はなし
NAME ? 元のファイル名 FLG.FNAME が 0 の場合はなし
COMMENT ? コメント FLG.FCOMMENT が 0 の場合はなし
CRC16 0 か 2 上記のチェック・サム FLG.FHCRC が 0 の場合はなし

ID1,ID2 : IDentification 1,2

GZIP 圧縮データ形式の始まりを示します。

CM : Compression Method

CM が 0 から 7 まて予約になっています。

CM が 8 のとき、圧縮データは DEFLATE 形式になります。

FLG : FLaGs

FLG の内容は、次のとおりです。

ビット位置 項目 内容
0 FTEXT テキストかも
1 FHCRC CRC16 あり
2 FEXTRA EXTRA あり
3 FNAME NAME あり
4 FCOMMENT COMMENT あり
5 〜 7 予約

MTIME : Modification TIME

MTIME は GMT 1970/1/1 00:00:00 からの秒数です。

MTIME が 0 の場合は、日時情報は無効です。

XFL : eXtra FLags

XFL の内容は CM に依存します。

CM=8 の場合は、次のとおりです。

アルゴリズム
2 最小 : 処理時間が長い
4 最速 : 処理時間が最短

OS : Operating System

テキスト形式(CR/LF など)の参考情報です。

OS
0 FAT filesystem (MS-DOS, OS/2, NT/Win32)
1 Amiga
2 VMS (or OpenVMS)
3 Unix
4 VM/CMS
5 Atari TOS
6 HPFS filesystem (OS/2, NT)
7 Machintosh
8 Z-System
9 CP/M
10 TOPS-20
11 NTFS filesystem (NT)
12 QDOS
13 Acorn RISCOS
255 unknown

EXTRA : XLEN と Extra field

FLG.FEXTRA が 1 とき、追加情報があります。

項目 バイト数 内容
XLEN 2 Extra filed の大きさ
Extra filed XLEN 追加情報

XLEN : eXtra LENgth

Extra filed の大きさを示します。

Extra field

追加情報は、次の形式のサブフィールドが並びます。

項目 バイト数 内容
SI1 1 サブフィールド識別子 1
SI2 1 サブフィールド識別子 2
LEN 2 サブフィールドの大きさ
sub field data LEN データ

NAME

FLG.FNAME が 1 とき、オリジナルのファイル名が NULL(=0x00)文字で終わる文字列として格納されます。

COMMENT

FLG.FCOMMENT が 1 とき、コメントが NULL(=0x00)文字で終わる文字列として格納されます。

CRC16

FLG.FHCRC が 1 とき、先頭から CRC16 の直前までデータの CRC32 の結果から下位 16 ビットが格納されます。


DEFLATE 圧縮データ形式

DEFLATE 圧縮データは、ブロック単位(可変長)のデータが並んでいます。

ブロック

ブロックは大まかに、次のようになっています。

項目 ビット数 内容
BFINAL 1 最後のブロックであることを示す
BTYPE 2 後続のデータ形式を示す
データ 不定 BTYPE による

ブロックの並び

DEFLATE 圧縮データの全体は、次の並びになります。

ブロック \ 項目 BFINAL BTYPE データ
ブロック 1 0 ?? . . .
ブロック 2 0 ?? . . .
... 0 ?? . . .
ブロック n 1 ?? . . .

ビット列の順序

左側を上位ビットとすると、ビット列の順序は次のようになります。

... 上位ビット BTYPE BFINAL 下位ビット ...
... 後続のデータ 2 ビット 1 ビット 以前のデータ ...

データのビット列は、下位ビットから順に詰められたもの(リトル・エンディアン)になっています。

項目の順序と、数の表現(上位の桁から下位の桁へ)は逆向きになります。

ハフマン符号のビット列は反転させる

ハフマン木の分岐は左から右に進みますが、格納されるデータのビット列は右から左なので、ハフマン符号のビット列を反転させる必要があります。

BTYPE 別ブロック

BTYPE とブロックの内容は、次のようになっています。

種類 \ 項目 BFINAL BTYPE バイト境界 データ長 動的辞書 データ
非圧縮形式 ? 00 あり あり なし 無変換
固定辞書形式 ? 01 なし なし なし 圧縮
動的辞書形式 ? 10 なし なし あり 圧縮
(予約) ? 11 - - - -

BTYPE が 01 および 10 では、共通の圧縮データ形式(LZ 符号)です。また、LZ 符号の項目でハフマン符号化される「文字(長)」と「前方距離コード」は、別々の辞書を使います。

BTYPE=00 : 非圧縮形式

BTYPE=00 のブロックは、次のようになっています。

情報量 項目 内容
1 ビット BFINAL 最終ブロックならば 1
2 ビット BTYPE 00
0 〜 7 ビット 余白 LEN をバイト境界から開始するための穴埋め
16 ビット LEN データのバイト数
16 ビット NLEN LEN の 1 の補数 (~LEN)
LEN バイト データ 無変換のデータ(最大 65535 バイト)

解凍処理は LEN == ~NLEN を確認してデータをコピーするだけ。

BTYPE=01 : 固定辞書形式

BTYPE=01 のブロックは、次のようになっています。

情報量 項目 内容
1 ビット BFINAL 最終ブロックならば 1
2 ビット BTYPE 01
??? データ 圧縮データ

BTYPE=10 から動的辞書を省いた(固定辞書を使う)ものになっています。

固定辞書の生成プログラム

make_dictionary の引数(符号長の配列)が固定辞書になります。

sample3.py
#!/usr/bin/env python3

def make_dictionary(lengths):
    def gen_length_code():
        code = 0
        for length in range(max(lengths) + 1):
            yield code
            code = (code + lengths.count(length)) << 1

    def gen_dictionary(length_code):
        for length in lengths:
            yield (length_code[length], length)
            length_code[length] += 1

    return (*gen_dictionary([*gen_length_code()]),)


def dump_dictionary(dictionary, fmtstr=None):
    if fmtstr is None:
        fmtstr = '{0:3} {1:d}:{2}'
    for literal in range(len(dictionary)):
        code, length = dictionary[literal]
        binary = ('{0:0%db}' % length).format(code)
        print(fmtstr.format(literal, length, binary))
    print()


print('文字(値)の固定辞書')
dump_dictionary(make_dictionary(
    [8]*(144-0) + [9]*(256-144) + [7]*(280-256) + [8]*(288-280)
))

print('前方距離コード用固定辞書')
dump_dictionary(make_dictionary([5]*32))
BTYPE=01 の固定辞書
sample3.pyの実行結果
文字(値)の固定辞書
  0 8:00110000
  1 8:00110001
  2 8:00110010
  3 8:00110011
  4 8:00110100
  5 8:00110101
  6 8:00110110
  7 8:00110111
  8 8:00111000
  9 8:00111001
 10 8:00111010
 11 8:00111011
 12 8:00111100
 13 8:00111101
 14 8:00111110
 15 8:00111111
 16 8:01000000
 17 8:01000001
 18 8:01000010
 19 8:01000011
 20 8:01000100
 21 8:01000101
 22 8:01000110
 23 8:01000111
 24 8:01001000
 25 8:01001001
 26 8:01001010
 27 8:01001011
 28 8:01001100
 29 8:01001101
 30 8:01001110
 31 8:01001111
 32 8:01010000
 33 8:01010001
 34 8:01010010
 35 8:01010011
 36 8:01010100
 37 8:01010101
 38 8:01010110
 39 8:01010111
 40 8:01011000
 41 8:01011001
 42 8:01011010
 43 8:01011011
 44 8:01011100
 45 8:01011101
 46 8:01011110
 47 8:01011111
 48 8:01100000
 49 8:01100001
 50 8:01100010
 51 8:01100011
 52 8:01100100
 53 8:01100101
 54 8:01100110
 55 8:01100111
 56 8:01101000
 57 8:01101001
 58 8:01101010
 59 8:01101011
 60 8:01101100
 61 8:01101101
 62 8:01101110
 63 8:01101111
 64 8:01110000
 65 8:01110001
 66 8:01110010
 67 8:01110011
 68 8:01110100
 69 8:01110101
 70 8:01110110
 71 8:01110111
 72 8:01111000
 73 8:01111001
 74 8:01111010
 75 8:01111011
 76 8:01111100
 77 8:01111101
 78 8:01111110
 79 8:01111111
 80 8:10000000
 81 8:10000001
 82 8:10000010
 83 8:10000011
 84 8:10000100
 85 8:10000101
 86 8:10000110
 87 8:10000111
 88 8:10001000
 89 8:10001001
 90 8:10001010
 91 8:10001011
 92 8:10001100
 93 8:10001101
 94 8:10001110
 95 8:10001111
 96 8:10010000
 97 8:10010001
 98 8:10010010
 99 8:10010011
100 8:10010100
101 8:10010101
102 8:10010110
103 8:10010111
104 8:10011000
105 8:10011001
106 8:10011010
107 8:10011011
108 8:10011100
109 8:10011101
110 8:10011110
111 8:10011111
112 8:10100000
113 8:10100001
114 8:10100010
115 8:10100011
116 8:10100100
117 8:10100101
118 8:10100110
119 8:10100111
120 8:10101000
121 8:10101001
122 8:10101010
123 8:10101011
124 8:10101100
125 8:10101101
126 8:10101110
127 8:10101111
128 8:10110000
129 8:10110001
130 8:10110010
131 8:10110011
132 8:10110100
133 8:10110101
134 8:10110110
135 8:10110111
136 8:10111000
137 8:10111001
138 8:10111010
139 8:10111011
140 8:10111100
141 8:10111101
142 8:10111110
143 8:10111111
144 9:110010000
145 9:110010001
146 9:110010010
147 9:110010011
148 9:110010100
149 9:110010101
150 9:110010110
151 9:110010111
152 9:110011000
153 9:110011001
154 9:110011010
155 9:110011011
156 9:110011100
157 9:110011101
158 9:110011110
159 9:110011111
160 9:110100000
161 9:110100001
162 9:110100010
163 9:110100011
164 9:110100100
165 9:110100101
166 9:110100110
167 9:110100111
168 9:110101000
169 9:110101001
170 9:110101010
171 9:110101011
172 9:110101100
173 9:110101101
174 9:110101110
175 9:110101111
176 9:110110000
177 9:110110001
178 9:110110010
179 9:110110011
180 9:110110100
181 9:110110101
182 9:110110110
183 9:110110111
184 9:110111000
185 9:110111001
186 9:110111010
187 9:110111011
188 9:110111100
189 9:110111101
190 9:110111110
191 9:110111111
192 9:111000000
193 9:111000001
194 9:111000010
195 9:111000011
196 9:111000100
197 9:111000101
198 9:111000110
199 9:111000111
200 9:111001000
201 9:111001001
202 9:111001010
203 9:111001011
204 9:111001100
205 9:111001101
206 9:111001110
207 9:111001111
208 9:111010000
209 9:111010001
210 9:111010010
211 9:111010011
212 9:111010100
213 9:111010101
214 9:111010110
215 9:111010111
216 9:111011000
217 9:111011001
218 9:111011010
219 9:111011011
220 9:111011100
221 9:111011101
222 9:111011110
223 9:111011111
224 9:111100000
225 9:111100001
226 9:111100010
227 9:111100011
228 9:111100100
229 9:111100101
230 9:111100110
231 9:111100111
232 9:111101000
233 9:111101001
234 9:111101010
235 9:111101011
236 9:111101100
237 9:111101101
238 9:111101110
239 9:111101111
240 9:111110000
241 9:111110001
242 9:111110010
243 9:111110011
244 9:111110100
245 9:111110101
246 9:111110110
247 9:111110111
248 9:111111000
249 9:111111001
250 9:111111010
251 9:111111011
252 9:111111100
253 9:111111101
254 9:111111110
255 9:111111111
256 7:0000000
257 7:0000001
258 7:0000010
259 7:0000011
260 7:0000100
261 7:0000101
262 7:0000110
263 7:0000111
264 7:0001000
265 7:0001001
266 7:0001010
267 7:0001011
268 7:0001100
269 7:0001101
270 7:0001110
271 7:0001111
272 7:0010000
273 7:0010001
274 7:0010010
275 7:0010011
276 7:0010100
277 7:0010101
278 7:0010110
279 7:0010111
280 8:11000000
281 8:11000001
282 8:11000010
283 8:11000011
284 8:11000100
285 8:11000101
286 8:11000110
287 8:11000111

前方距離コード用固定辞書
  0 5:00000
  1 5:00001
  2 5:00010
  3 5:00011
  4 5:00100
  5 5:00101
  6 5:00110
  7 5:00111
  8 5:01000
  9 5:01001
 10 5:01010
 11 5:01011
 12 5:01100
 13 5:01101
 14 5:01110
 15 5:01111
 16 5:10000
 17 5:10001
 18 5:10010
 19 5:10011
 20 5:10100
 21 5:10101
 22 5:10110
 23 5:10111
 24 5:11000
 25 5:11001
 26 5:11010
 27 5:11011
 28 5:11100
 29 5:11101
 30 5:11110
 31 5:11111

BTYPE=10 : 動的辞書形式

BTYPE=10 ブロックは、次のようになっています。

情報量 項目 内容
1 ビット BFINAL 最終ブロックならば 1
2 ビット BTYPE 10
??? 動的辞書 圧縮されたハフマン符号の辞書
??? データ 圧縮データ

辞書も圧縮されているので、辞書を解凍してから、圧縮データを解凍します。

動的辞書

BTYPE=10 ブロックの動的辞書は、次のようになっています。

情報量 項目 内容
5 ビット HLIT 「LZ 符号の文字(長)」符号の数 -257
5 ビット HDIST 「LZ 符号の前方距離コード」符号の数 -1
4 ビット HCLEN CL4CL の数 - 4
(HCLEN + 4) * 3 ビット CL4CL 辞書(符号長配列): CL4LA,CL4DA
(HLIT + 257 個分) ??? CL4LA 辞書(符号長配列を圧縮): LZ 符号の文字(長)
(HDIST + 1 個分) ??? CL4DA 辞書(符号長配列を圧縮): LZ 符号の前方距離コード

以下は勝手に短縮名にした項目
CL4CL = code lengths for the code length
CL4LA = code lengths for the literal/length alphabet
CL4DA = code lengths for the distance alphabet

CL4CL : code lengths for the code length

CL4CL は、CL4LA および CL4DA の圧縮に使用するハフマン符号の符号長が (HCLEN + 4) 個あります。

符号長(3 ビット:0 〜 7)の配列なので、ハフマン符号の辞書になります。

符号長は、次の配列インデックス順で並んでいます。

16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15

(15 - HCLEN) 個は、符号長をゼロとして扱います。

インデックスは、CL4LA,CL4DA の圧縮コードになります。

CL4LA/CL4DA の圧縮形式

圧縮コードは、次のようになります。

圧縮コード 内容 付加情報
0 〜 15 CL4LA/CL4DA の符号長 なし
16 直前の値を複写(3〜6個) 直後の 2 ビット(個数 - 3)
17 ゼロで埋める(3〜10個) 直後の 3 ビット(個数 - 3)
18 ゼロで埋める(11〜138個) 直後の 7 ビット(個数 - 11)

これを使って、連続部分を「16,17,18 と付加情報」に変換します。

符号長の配列:
    0,  0, 0, 0, 0, 0,  0, 0, 0, 0, 8,  0,  0, 0, 0, 0,
    0,  0, 0, 0, 0, 0,  0, 0, 0, 0, 0,  0,  0, 0, 0, 0,
    5, 10, 0, 8, 0, 8, 10, 7, 6, 6, 9,  9,  7, 8, 7, 9,
    7,  6, 7, 7, 7, 7,  7, 8, 7, 8, 8, 12, 10, 8, 9, 0,

圧縮形式:
    17,(3),
    17,(1),
    8,
    18,(10),
    5, 10, 0, 8, 0, 8, 10, 7, 6, 6, 9, 9, 7, 8, 7, 9, 7, 6, 7,
    16,(1),
    8, 7, 8, 8, 12, 10, 8, 9, 0,

# (n) は文脈からビット数が決まり、(n) 以外はハフマン符号化されます

注意: CL4LA と CL4DA は、1 つの連続したデータ列として圧縮されています。
CL4LA の最後と CL4DA の最初は、圧縮コード 16,17,18 によってまとめられているかもしれません。

CL4LA : code lengths for the literal/length alphabet

CL4LA は、「LZ 符号の文字(長)」のハフマン符号の符号長配列を、CL4LA/CL4DA 圧縮形式にしたものになります。

文字(長)の符号長の配列は、符号長が 288 個ですが、CL4LA では (HLIT + 257) 個が格納されます。残り (31 - HLIT) 個は符号長をゼロとして扱います。

CL4DA : code lengths for the distance alphabet

CL4LA は、「LZ 符号の前方距離コード」のハフマン符号の符号長配列を、CL4LA/CL4DA 圧縮形式にしたものになります。

前方距離コードの符号長の配列は、符号長が 32 個ですが、CL4DA では (HDIST + 1) 個が格納されます。残り (31 - HDIST) 個は符号長をゼロとして扱います。

圧縮データ

「DEFLATE 圧縮データ形式での LZ 符号化」にハフマン符号化が施されたものになっています。

ハフマン符号の辞書情報は CL4LA/CL4DA になります。

ブロックの終わりは文字(長)の値が 256 です。ブロック 1 つにつき最後に 1 回だけ現れます。


解凍プログラム

圧縮データの詳細を表示するために作り始めましたが、中身は解凍プログラムになるので解凍結果を出力するようにしました。

zlib モジュールを使った方が圧倒的に高速なのは言うまでもないでしょう。

Python に限らず、処理を自作できると、既存のモジュールやライブラリの API に縛られる処理構造から脱却できます。

解凍プログラム
inflate.py
#!/usr/bin/env python3

import time

####################
#
#   メッセージ処理
#
####################


def noop(*args, **kw): pass  # 何もしない


def print_list(data, prefix=None, indent=0, width=16):
    if prefix:
        msgprint(prefix + ':')
    indent = ' ' * indent
    fmt = '%{0}d'.format(len('%d' % max(data)))
    dlen = len(data)
    for s in range(0, dlen, width):
        line = [fmt % d for d in data[s:min(s + width, dlen)]]
        msgprint('%s%04x: %s' % (indent, s, '  '.join(line)))


def set_verbose_mode():
    global msgprint
    global msgprint_list

    msgprint = print
    msgprint_list = print_list


msgprint = noop
msgprint_list = noop

########################
#
#   チェック・サム
#
########################

#
# Adler-32 チェック・サム
#


class ADLER32:

    @staticmethod
    def adler32(bs):
        return ADLER32(bs).get_value()

    def __init__(self, data=None):
        self.value = 1
        if data != None:
            self.update(data)

    def update(self, data):
        l = self.value & 0xffff
        h = self.value >> 16
        for d in data:
            l = (l + d) % 65521
            h = (h + l) % 65521
        self.value = (h << 16) | l

    def get_value(self):
        return self.value


#
# CRC-32 チェック・サム
#
class CRC32:
    @staticmethod
    def make_table(poly=0xedb88320):
        def f(x): return (x >> 1) ^ (poly if x & 1 else 0)
        return [f(f(f(f(f(f(f(f(n)))))))) for n in range(256)]

    @staticmethod
    def crc32(data):
        return CRC32(data).get_value()

    MASK = 0xffffffff
    TABLE = None

    def __init__(self, data=None):
        if CRC32.TABLE is None:
            CRC32.TABLE = CRC32.make_table()
        self.value = CRC32.MASK
        if data != None:
            self.update(data)

    def update(self, data):
        crc = self.value
        for d in data:
            crc = (crc >> 8) ^ CRC32.TABLE[(crc ^ d) & 0xff]
        self.value = crc

    def get_value(self):
        return self.value ^ CRC32.MASK


################################
#
#   入出力
#
################################

#
# ビット単位での読取り:
#     8 ビットより大きい場合は、リトル・エンディアン形式として扱う.
#
class BitStream:
    def __init__(self, inp):
        self.istream = inp
        self.fetch_data = 0
        self.fetch_bits = 0

    # 指定ビット数を先読み保持から破棄する
    def feed(self, bits):
        self.fetch_data >>= bits
        self.fetch_bits -= bits

    # 指定ビット数を先読みして保持する
    def fetch(self, bits):
        while bits > self.fetch_bits:
            self.fetch_data |= self.readbyte() << self.fetch_bits
            self.fetch_bits += 8
        mask = (1 << bits) - 1
        return (self.fetch_data & mask)

    # 指定ビット数を取り出して進める
    def read(self, bits):
        value = self.fetch(bits)
        self.feed(bits)
        return value

    # バイト境界まで読み出し位置を進める
    def skip_align(self):
        bits = self.fetch_bits & 7
        if bits != 0:
            self.feed(bits)

    # 直接1バイト読み出す
    def readbyte(self):
        return self.istream.read(1)[0]


#
# 解凍データの出力先:
#     全ての解凍データを保持する(手抜き)
#
class OutputStream:
    def __init__(self):
        self.output = []

    # 文字を1つ追加
    def write(self, value):
        self.output.append(value)

    # distance/length による複写追記
    def lzcopy(self, distance, length):
        pos = len(self.output) - distance
        for n in range(length):
            self.output.append(self.output[pos+n])


#
# ハフマン符号の解凍
#
class DecodeHuffman:

    # 符号のビット列を反転する
    #   length が 0 のときは None
    @staticmethod
    def reverse_code(code, length):
        return int(('{0:0%db}' % length).format(code)[::-1], 2) if length != 0 else None

    # 文字に対する符号長の配列で初期化する
    #   入力: lengths[文字] = 符号長
    def __init__(self, lengths):
        # 最大符号長
        self.max_length = max(lengths)

        # 符号長ごとの符号の初期値を作る
        #    length_code[符号長] = 符号の初期値
        def gen_length_code():
            yield 0
            code = 0
            for length in range(1, self.max_length + 1):
                yield code
                code = (code + lengths.count(length)) << 1

        # 文字に対する (反転符号, 符号長) を生成
        def gen_dictionary(length_code):
            for length in lengths:
                yield (DecodeHuffman.reverse_code(length_code[length], length), length)
                length_code[length] += 1

        # 符号化辞書: dictionary[文字] = (反転符号, 符号長)
        dictionary = (*gen_dictionary([*gen_length_code()]),)

        # 復号辞書の生成
        self.decode_table = [None] * (1 << self.max_length)
        for literal in range(len(lengths)):
            code, length = dictionary[literal]
            if length == 0:
                continue
            step = 1 << length
            value = (literal, length)
            while code < len(self.decode_table):
                self.decode_table[code] = value
                code += step

    # ストリームから復号して、文字を1つ取り出す
    #   出力:(文字, 符号長, ビット反転符号)

    def decode(self, bstream):
        code = bstream.fetch(self.max_length)
        literal, length = self.decode_table[code]
        code &= (1 << length) - 1
        if False:
            msgprint(('Huffman: {2:#05x} <- {1:2}:{0:0%db}' % length).
                     format(DecodeHuffman.reverse_code(code, length), length, literal))
        bstream.feed(length)
        return (literal, length, code)


#
# RFC-1951: DEFLATE 圧縮データ形式の解凍
#
class Inflate:
    CL4CL_ORDER = (
        16, 17, 18,
        0, 8,
        7, 9,
        6, 10,
        5, 11,
        4, 12,
        3, 13,
        2, 14,
        1, 15
    )

    LENGTH_BASE = (
        (3, 0), (4, 0), (5, 0), (6, 0),
        (7, 0), (8, 0), (9, 0), (10, 0),
        (11, 1), (13, 1), (15, 1), (17, 1),
        (19, 2), (23, 2), (27, 2), (31, 2),
        (35, 3), (43, 3), (51, 3), (59, 3),
        (67, 4), (83, 4), (99, 4), (115, 4),
        (131, 5), (163, 5), (195, 5), (227, 5),
        (258, 0),
    )

    DISTANCE_BASE = (
        (1,  0), (2,  0),
        (3,  0), (4,  0),
        (5,  1), (7,  1),
        (9,  2), (13,  2),
        (17,  3), (25,  3),
        (33,  4), (49,  4),
        (65,  5), (97,  5),
        (129,  6), (193,  6),
        (257,  7), (385,  7),
        (513,  8), (769,  8),
        (1025,  9), (1537,  9),
        (2049, 10), (3073, 10),
        (4097, 11), (6145, 11),
        (8193, 12), (12289, 12),
        (16385, 13), (24577, 13),
    )

    BTYPE00_LITERAL_LENGTH = [8]*(144-0) + [9]*(256-144) + [7]*(280-256) + [8]*(288-280)
    BTYPE00_DISTANCE_LENGTH = [5]*32

    def __init__(self, bitstream, ostream):
        self.bstream = bitstream
        self.ostream = ostream
        self.code_decoder = None
        self.literal_decoder = None
        self.distance_decoder = None
        self.BFINAL = None
        self.BTYPE = None
        self.HLIT = None
        self.HDIST = None
        self.HCLEN = None
        self.CL4CL = None
        self.CL4LA = None
        self.CL4DA = None

    #
    # ブロックの解凍
    #
    def block(self):
        msgprint('======== BLOCK ========')
        self.BFINAL = self.bstream.read(1)
        self.BTYPE = self.bstream.read(2)

        BT0 = (self.BTYPE >> 0) & 1
        BT1 = (self.BTYPE >> 1) & 1
        msgprint('%8s: %d' % ('BFINAL', self.BFINAL))
        msgprint('%8s: %d%d' % ('BTYPE', BT1, BT0))

        if self.BTYPE == 0:
            self.btype00()
        elif self.BTYPE == 1:
            self.btype01()
        elif self.BTYPE == 2:
            self.btype10()
        elif self.BTYPE == 3:
            raise NotImplemented  # reserved
        return (self.BFINAL == 0)

    #
    # ブロック タイプ 00 : 非圧縮なので複写
    #
    def btype00(self):
        self.bstream.skip_align()
        LEN = self.bstream.read(16)
        NLEN = self.bstream.read(16) ^ 0xffff
        if LEN != NLEN:
            raise Exception('LEN(%#06x) != NLEN(%#06x)' % (LEN, NLEN))
        msgprint('BTYPE00: LEN(%#06x), NLEN(%#06x)' % (LEN, NLEN))

        for n in range(LEN):
            self.ostream.write(self.bstream.read(8))

    #
    # ブロック タイプ 01 : 固定辞書による解凍
    #
    def btype01(self):
        self.literal_decoder = DecodeHuffman(Inflate.BTYPE00_LITERAL_LENGTH)
        self.distance_decoder = DecodeHuffman(Inflate.BTYPE00_DISTANCE_LENGTH)
        self.decode_data()

    #
    # ブロック タイプ 10 : 動的辞書による解凍
    #
    def btype10(self):
        self.btype10_header()
        self.btype10_code_length()
        self.btype10_literal_length()
        self.btype10_distance_length()
        self.decode_data()

    def btype10_header(self):
        HLIT = self.bstream.read(5)
        HDIST = self.bstream.read(5)
        HCLEN = self.bstream.read(4)
        self.HLIT = HLIT + 257
        self.HDIST = HDIST + 1
        self.HCLEN = HCLEN + 4
        msgprint('%8s: %d + 257 = %d' % ('HLIT', HLIT, self.HLIT))
        msgprint('%8s: %d + 1 = %d' % ('HDIST', HDIST, self.HDIST))
        msgprint('%8s: %d + 4 = %d' % ('HCLEN', HCLEN, self.HCLEN))

    # 圧縮された動的辞書の解凍準備
    def btype10_code_length(self):
        CL4CL = []
        for n in range(self.HCLEN):
            CL4CL.append(self.bstream.read(3))

        self.CL4CL = [0] * 19
        for n in range(len(CL4CL)):
            p = Inflate.CL4CL_ORDER[n]
            self.CL4CL[p] = CL4CL[n]
        msgprint_list(CL4CL, '  code length', 4)

        self.code_decoder = DecodeHuffman(self.CL4CL)
        self.HDICT = self.decode_dictionary()

    # literal/length の解凍準備
    def btype10_literal_length(self):
        self.CL4LA = self.HDICT[:self.HLIT]
        msgprint_list(self.CL4LA, '  literal/length code length', 4)
        if self.CL4LA[256] == 0:
            raise Exception('CODE[256] == 0')
        self.literal_decoder = DecodeHuffman(self.CL4LA)

    # distance の解凍準備
    def btype10_distance_length(self):
        self.CL4DA = self.HDICT[self.HLIT:]
        msgprint_list(self.CL4DA, '  distance code length', 4)
        self.distance_decoder = DecodeHuffman(self.CL4DA)

    # literal/length, distance 符号情報の解凍
    def decode_dictionary(self):
        count = self.HLIT + self.HDIST
        clen = []
        while len(clen) < count:
            (code, bits, huff) = self.code_decoder.decode(self.bstream)

            # 0..15 はコードに対する符号のビット数
            if code < 16:
                clen.append(code)
                continue

            c = 0
            n = 0
            if code == 16:
                # 直前の値を複写(3~6)
                #   長さは直後の 2 ビットに 3 を加える
                n = self.bstream.read(2) + 3
                c = clen[-1]
            elif code == 17:
                # ゼロで埋める(3~10)
                #    長さは直後の 3 ビットに 3 を加える
                n = self.bstream.read(3) + 3
            elif code == 18:
                # ゼロで埋める(11~138)
                #    長さは直後の 7 ビットに 11 を加える
                n = self.bstream.read(7) + 11
            else:
                raise Exception('unknown code: %d' % code)
            clen += [c] * n
        return clen

    # 圧縮データの解凍
    def decode_data(self):
        msgprint('COMPRESSED DATA:')
        while True:
            # literal/length の解凍
            (code, bits, huff) = self.literal_decoder.decode(self.bstream)

            # 0..255 は 8 ビットの文字データ(literal)
            if code < 256:
                self.ostream.write(code)
                c = chr(code)
                if not c.isprintable():
                    c = ''
                msgprint('  LITERAL[%#04x]: %s' % (code, c))
                continue

            # 256 はブロックの終わり
            if code == 256:
                msgprint('  END:')
                break

            # 257..285 はデータの複写数情報(length)
            # 以下で、実際の数を求める
            length, b = Inflate.LENGTH_BASE[code - 257]
            if b != 0:
                length += self.bstream.read(b)

            # データの複写開始位置情報(distance)の解凍
            (code, bits, huff) = self.distance_decoder.decode(self.bstream)

            # 以下で、実際の開始位置を求める
            distance, b = Inflate.DISTANCE_BASE[code]
            if b != 0:
                distance += self.bstream.read(b)

            # データの複写
            msgprint('  LENGTH=%d, DISTANCE=%d' % (length, distance))
            self.ostream.lzcopy(distance, length)


# RFC-1950: ZLIB 圧縮データ形式の解凍
class ZLIBInflate:
    COMPRESSION_ALGORITHM = (
        'fastest',
        'fast',
        'default',
        'slowest',
    )

    def __init__(self, fp):
        self.bstream = BitStream(fp)
        self.ostream = OutputStream()
        self.header_CMF = None
        self.header_FLG = None

    def CMF_CM(self):
        return (self.header_CMF & 0x0f)

    def CMF_CINFO(self):
        return ((self.header_CMF >> 4) & 0x0f)

    def flags_FCHECK(self):
        return (self.header_FLG & 0x1f)

    def flags_FDICT(self):
        return (self.header_FLG & 0x20)

    def flags_FLEVEL(self):
        return ((self.header_FLG >> 6) & 3)

    def inflate_date(self):
        return self.ostream.output

    #
    # 入力ファイルの解凍処理
    #
    def inflate(self):
        # CMF:FLG を調査
        self.header_CMF = self.bstream.read(8)
        self.header_FLG = self.bstream.read(8)
        msgprint('CMF:FLG = %02x:%02x' % (self.header_CMF, self.header_FLG))

        if ((self.header_CMF * 256 + self.header_FLG) % 31) != 0:
            raise Exception('invalid FCHECK')

        cm = self.CMF_CM()
        if cm != 8:
            raise Exception('unknown compression method: %d' % cm)

        cinfo = self.CMF_CINFO()
        if cinfo > 7:
            raise Exception('invalid CINFO')
        # window_size = 1 << (8 + cinfo)

        # FDICT には非対応
        if self.flags_FDICT():
            self.DICTID = self.bstream.readbyte(4)
            raise NotImplemented

        # 圧縮データの解凍
        self.inflate = Inflate(self.bstream, self.ostream)
        while self.inflate.block():
            pass
        self.bstream.skip_align()

        # Adler-32 によるエラー検知
        xadler32 = ADLER32.adler32(self.inflate_date())
        dadler32 = self.read32be()
        msgprint('ADLER32: %#010x (%#010x)' % (dadler32, xadler32))
        if xadler32 != dadler32:
            raise Exception('ADLER32: %#010x != %#010x' % (dadler32, xadler32))

    # ビッグ・エンディアンで 32bit の読み出し
    def read32be(self):
        b0 = self.bstream.read(8)
        b1 = self.bstream.read(8)
        b2 = self.bstream.read(8)
        b3 = self.bstream.read(8)
        return (b0 << 24) | (b1 << 16) | (b2 << 8) | b3


# RFC-1952: GZIP 圧縮データ形式の解凍
class GZIPInflate:
    OS_NAME = {
        0: 'FAT filesystem (MS-DOS, OS/2, NT/Win32)',
        1: 'Amiga',
        2: 'VMS (or OpenVMS)',
        3: 'Unix',
        4: 'VM/CMS',
        5: 'Atari TOS',
        6: 'HPFS filesystem (OS/2, NT)',
        7: 'Machintosh',
        8: 'Z-System',
        9: 'CP/M',
        10: 'TOPS-20',
        11: 'NTFS filesystem (NT)',
        12: 'QDOS',
        13: 'Acorn RISCOS',
        255: 'unknown',
    }

    XFL_NAME = {
        0: '(default compression)',
        2: 'slowest algorithm (maximum compression)',
        4: 'fastest algorithm',
    }

    def __init__(self, fp):
        self.bstream = BitStream(fp)
        self.ostream = OutputStream()
        self.read_data = []
        self.compression_method = None
        self.flags = None
        self.mtime = None
        self.extra_flags = None
        self.os = None
        self.extra = None
        self.name = None
        self.comment = None
        self.crc16 = None

    def flags_FTEXT(self):
        return (self.flags & 0x01)

    def flags_FHCRC(self):
        return (self.flags & 0x02)

    def flags_FEXTRA(self):
        return (self.flags & 0x04)

    def flags_FNAME(self):
        return (self.flags & 0x08)

    def flags_FCOMMENT(self):
        return (self.flags & 0x10)

    def inflate_date(self):
        return self.ostream.output

    # 入力ファイルから1バイトの読取り
    #   FHCRC 処理に備える
    def readbyte(self, n=1):
        r = 0
        b = 0
        for i in range(n):
            d = self.bstream.readbyte()
            self.read_data.append(d)
            r += d << b
            b += 8
        return r

    #
    # 入力ファイルの解凍処理
    #
    def inflate(self):
        # ID を調査
        self.check_id()

        # ヘッダ部の読み込み
        self.read_header()

        # 圧縮データの解凍
        self.inflate = Inflate(self.bstream, self.ostream)
        while self.inflate.block():
            pass
        self.bstream.skip_align()

        # CRC32 によるエラー検知
        xdata = self.inflate_date()
        xcrc32 = CRC32.crc32(xdata)
        dcrc32 = self.bstream.read(32)
        msgprint('CRC32: %#010x (%#010x)' % (dcrc32, xcrc32))
        if dcrc32 != xcrc32:
            raise Exception('CRC32: %#010x != %#010x' % (dcrc32, xcrc32))

        # 元データとの大きさ比較によるエラー検知
        xsize = len(xdata)
        isize = self.bstream.read(32)
        msgprint('ISIZE: %#010x (%#010x)' % (isize, xsize))
        if isize != xsize:
            raise Exception('ISIZE: %#010x != %#010x' % (isize, xsize))

    # ファイルの先頭が 0x1f, 0x8b か?
    def check_id(self):
        id1 = self.readbyte()
        id2 = self.readbyte()
        magic = 'FILE MAGIC: %#04x,%#04x' % (id1, id2)
        if id1 != 0x1f or id2 != 0x8b:
            raise Exception('Invalid ' + magic)
        msgprint(magic)

    #
    # ヘッダ部の読み込み
    #
    def read_header(self):
        # 必須
        self.read_header_CM()
        self.read_header_FLG()
        self.read_header_MTIME()
        self.read_header_XFL()
        self.read_header_OS()
        # オプション
        self.read_header_EXTRA()
        self.read_header_NAME()
        self.read_header_COMMENT()
        self.read_header_CRC16()

    def read_header_CM(self):
        self.compression_method = self.readbyte()
        msgprint('%8s: %#04x' % ('CM', self.compression_method))
        if self.compression_method != 8:
            raise Exception('unknown compression method: %d' % self.compression_method)

    def read_header_FLG(self):
        self.flags = self.readbyte()
        mflg = ''
        if self.flags_FTEXT():
            mflg += ', FTEXT'
        if self.flags_FHCRC():
            mflg += ', FHCRC'
        if self.flags_FEXTRA():
            mflg += ', FEXTRA'
        if self.flags_FNAME():
            mflg += ', FNAME'
        if self.flags_FCOMMENT():
            mflg += ', FCOMMENT'
        msgprint('%8s: %#04x%s' % ('FLG', self.flags, mflg))

    def read_header_MTIME(self):
        self.mtime = self.readbyte(4)
        msgprint('%8s: %s (%#010x)' % ('MTIME', time.ctime(self.mtime), self.mtime))

    def read_header_XFL(self):
        self.extra_flags = self.readbyte()
        msgprint('%8s: %#04x, %s' % ('XFL', self.extra_flags, GZIPInflate.XFL_NAME[self.extra_flags]))

    def read_header_OS(self):
        self.os = self.readbyte()
        msgprint('%8s: %#04x, %s' % ('OS', self.os, GZIPInflate.OS_NAME[self.os]))

    def read_header_EXTRA(self):
        self.extra = None
        if self.flags_FEXTRA():
            self.read_EXTRA()

    def read_header_NAME(self):
        self.name = None
        if self.flags_FNAME():
            self.name = self.read_CString()
            msgprint('%8s: \'%s\'' % ('NAME', self.name))

    def read_header_COMMENT(self):
        self.comment = None
        if self.flags_FCOMMENT():
            self.comment = self.read_CString()
            msgprint('%8s: %s' % ('COMMENT', self.name))

    def read_header_CRC16(self):
        self.crc16 = None
        if self.flags_FHCRC():
            crc16 = 0xffff & CRC32.crc32(self.read_data)
            self.crc16 = self.readbyte(2)
            msgprint('CRC16: %#06x, %#06x' % self.crc16, crc16)

    # 拡張データの読み込み
    def read_EXTRA(self):
        xlen = self.readbyte(2)
        self.extra = []
        for n in range(xlen):
            self.extra.append(self.readbyte())
        msgprint('%8s: [%d] ...' % ('EXTRA', len(self.extra)))

    # NULL 文字終端の文字列の読み込み
    def read_CString(self):
        s = ''
        while True:
            c = self.readbyte()
            if c == 0:
                break
            s += chr(c)
        return s


# コマンド ライン起動用の関数
def main():
    import argparse
    import codecs
    import sys

    # コマンド ライン引数の解析
    parser = argparse.ArgumentParser()
    parser.add_argument('-v', '--verbose', action='store_true', help='冗長モード')
    parser.add_argument('-o', '--output', metavar='OUTPUT', help='出力ファイル')
    parser.add_argument('INPUT', help='入力ファイル')
    args = parser.parse_args()

    # 冗長モードを設定
    if args.verbose or args.output is None:
        set_verbose_mode()

    # 入力ファイルを開く
    if len(args.INPUT) == 0:
        return parser.print_help()
    if args.INPUT == '-':
        inp = sys.stdin.buffer
    else:
        inp = open(args.INPUT, 'rb')

    # 解凍処理
    gz = GZIPInflate(inp)
    gz.inflate()

    # 解凍データを出力
    if args.output:
        if args.output == '-':
            out = sys.stdout
        else:
            out = open(args.output, 'wb')
        out.write(bytearray(gz.inflate_date()))


if __name__ == '__main__':
    main()
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2