LoginSignup
6
5

More than 3 years have passed since last update.

Pythonとzlibを使って圧縮ファイルを作ってみる

Last updated at Posted at 2019-12-06

はじめに

LOCAL学生部アドベントカレンダー6日目

11日目の記事を書いている途中に偶然生えたので、空いている枠を埋めようと思います。

zlib って何?

概要

公式: zlib.net
Zip等に使われている圧縮アルゴリズムをライブラリ化したもので、内部ではDeflateを実装しています。
バイナリデータの圧縮が簡単にできるので、通信とかに使うのもありかも?(試したことはない)
ファイルの圧縮とかだと、結構見かけますよね。

license

zlib には zlib Licenseが適応されています。
MITに似たかなり緩いライセンスです。
詳しくは調べてみてください。

使ってみる

詳しい情報はこちらをご覧ください

compress(data: bytes, level: int = -1) -> bytes

dataを圧縮し返します。
levelは圧縮率です。
-1 ~ 9 の値が入り、デフォルト値は-1(2019年12月5日現在は6と同等)です。
0は無圧縮で、9で最も圧縮率が高くなります。
圧縮率が高いほどかかる時間も長くなるため、大抵の場合はデフォルトのままで良いと思います。

compress()
import zlib

data = b'test data\x00' # 任意のバイナリデータ
compressed = zlib.compress(data)
print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'

decompress(data: bytes, wbits: int = 15, bufsize: int = 16384) -> bytes

dataを解凍し返します。
他の引数は基本的にデフォルトで良いです。
bufsizeは必要に応じて増加します。

decompress()
import zlib

data = b'test data\x00' # 任意のバイナリデータ
decompressed = zlib.decompress(zlib.compress(data))
print(decompressed) # b'test data\x00'

compressobj(level: int = -1, method: int = 8, wbits: int = 15, memLevel: int = 8, strategy: int = 0, zdict: bytes = ...) -> _Compress

一度にメモリ上に置けないようなデータを圧縮するための圧縮オブジェクトを返します。
levelcompress()と同じです。
methodは圧縮アルゴリズムで、2019年12月5日現在サポートされている値はDEFLATED = 8のみです
zdictは定義済み圧縮辞書で、データ内で繰り返し現れると予想されるバイト列のシーケンスです。

compressobj()
import zlib
import io

data_stream = io.BytesIO(b'test data\x00')
cobj = zlib.compressobj()
compressed = b''
while True:
    tmp = data_stream.read(64)
    if not tmp:
        compressed += cobj.flush()
        break
    compressed += cobj.compress(tmp)

print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'

最後のflush()を忘れるとデータが不完全になってしまう可能性があります。

decompressobj(wbits: int = 15, zdict: bytes = ...) -> _Decompress

zdictcompressobj()で使用したものと同じでなければなりません。
また、decompressobj()の呼び出しとdecompress()の最初の呼び出しの間にzdictに渡したオブジェクトを変更してはいけません。

decompressobj()
import zlib
import io

data_stream = io.BytesIO(zlib.compress(b'test data\x00'))
dobj = zlib.decompressobj()
decompressed = b''
while True:
    tmp = data_stream.read(64)
    if not tmp:
        decompressed += dobj.flush()
        break
    while True:
        if not tmp:
            break
        decompressed += dobj.decompress(tmp)
        tmp = dobj.unconsumed_tail

print(decompressed) # b'test data\x00'

バッファに入りきらず、decompress()呼び出しで処理されなかったバイト列がunconsumed_tailに入ります。

圧縮ファイルを作る

構造

header, filename&path, compressed_fileの順番で保存されていて、ファイルの数だけこのブロックが繰り返される。

file_header
| 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |
|---------------------------------------|
| name_len(uint_32) | file_len(uint_32) |
|---------------------------------------|

実装

python mcp.py TARGET [-o OUTPUT]で使用できます。
TARGETにはファイルもしくはディレクトリのパスが入ります。
実際に使うために書いたわけではないので、もし使用する場合は自己責任でお願いします。
解凍は11日のアドベントカレンダーでやります。

mcp.py
import sys
import argparse
import os
import zlib
from ctypes import *
import random
import string
import glob
import io
import shutil

tmp_dir = ''.join(random.choices(
    string.ascii_letters + string.digits, k=64))+'_mcptmp'


def main():
    p = argparse.ArgumentParser(
        description='Compress file and dir', usage='Add target to Command line arguments')
    p.add_argument('target', help='Compression target')
    p.add_argument('--out', '-o', help='Output file path',
                   default='compressed.mcp')
    if len(sys.argv) < 2:
        p.print_help()
    target = p.parse_args().target
    out = p.parse_args().out

    if os.path.isfile(target):
        _compress_file(target, out)
    elif os.path.isdir(target):
        _compress_dir(target, out)
    else:
        raise Exception('Argument error')


def _compress_file(path: str, out: str):
    _create_mtp(os.path.basename(path), path)
    size = os.path.getsize(os.path.join(tmp_dir, os.path.basename(path)))
    with open(os.path.join(tmp_dir, os.path.basename(path)), 'rb') as t:
        with open(out, 'wb') as o:
            o.write(_make_file_header(size, os.path.basename(path)))
            while True:
                tmp = t.read(1024)
                if not tmp:
                    o.flush()
                    break
                o.write(tmp)


def _make_file_header(file_len: int, filename: str) -> bytes:
    filename_len = len(filename)
    return bytes(FileHeaderStructure(filename_len, file_len)) + filename.encode('UTF-8')


def _compress_dir(path: str, out: str):
    files = [p[len(path)-1 + len(os.sep):] for p in glob.glob(
        os.path.join(path, '**'), recursive=True) if os.path.isfile(p)]
    for f in files:
        os.makedirs(os.path.join(tmp_dir, os.path.dirname(f)), exist_ok=True)
        _create_mtp(f, os.path.join(path, f))

    with open(out, 'wb') as o:
        for f in files:
            o.write(_make_file_header(
                os.path.getsize(os.path.join(tmp_dir, f)), f))
            with open(os.path.join(tmp_dir, f), 'rb') as t:
                while True:
                    tmp = t.read(1024)
                    if not tmp:
                        break
                    o.write(tmp)
        o.flush()


def _create_mtp(path: str, source: str):
    c = zlib.compressobj()
    with open(source, mode='rb') as f:
        with open(os.path.join(tmp_dir, path), mode='wb') as o:
            while True:
                t = f.read(1024)
                if not t:
                    o.write(c.flush())
                    break
                ced = c.compress(t)
                if ced:
                    o.write(ced)


def _rem_tmp():
    shutil.rmtree(tmp_dir)


class FileHeaderStructure(Structure):
    _fields_ = (
        ('filename_len', c_uint32),
        ('file_len', c_uint32)
    )


if __name__ == "__main__":
    main()
    _rem_tmp()

圧縮後のサイズを得る方法が思いつかず、一度圧縮したものをファイルに出力してそのファイルのサイズを取得しています。
圧縮したものをメモリ上においておけばlen()で得れますが、それじゃcompressobj()を使った意味がなくなってしまいますよね…。

ファイルのデータについているヘッダーを作るのに一苦労しました。
Pythonはこういうこと苦手だからC++とかでやれって話なんでしょうけど。
Pythonには構造体は存在しませんが、Structureを継承したクラスを用いてそれっぽいものを作ることはできるようです。
from ctypes import *して_fields_に構造を記述します。
struct.pack(format, values...)というのもあるようですが、整数にしか対応していないようです(普通に使える)主要な型はほぼすべて対応しているようです(ドキュメント

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5