はじめに
LOCAL学生部アドベントカレンダー6日目
11日目の記事を書いている途中に偶然生えたので、空いている枠を埋めようと思います。
zlib って何?
概要
公式: zlib.net
Zip等に使われている圧縮アルゴリズムをライブラリ化したもので、内部ではDeflateを実装しています。
バイナリデータの圧縮が簡単にできるので、通信とかに使うのもありかも?(試したことはない)
ファイルの圧縮とかだと、結構見かけますよね。
license
zlib には zlib Licenseが適応されています。
MITに似たかなり緩いライセンスです。
詳しくは調べてみてください。
使ってみる
詳しい情報はこちらをご覧ください
compress(data: bytes, level: int = -1) -> bytes
data
を圧縮し返します。
level
は圧縮率です。
-1 ~ 9
の値が入り、デフォルト値は-1
(2019年12月5日現在は6
と同等)です。
0
は無圧縮で、9
で最も圧縮率が高くなります。
圧縮率が高いほどかかる時間も長くなるため、大抵の場合はデフォルトのままで良いと思います。
import zlib
data = b'test data\x00' # 任意のバイナリデータ
compressed = zlib.compress(data)
print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'
decompress(data: bytes, wbits: int = 15, bufsize: int = 16384) -> bytes
data
を解凍し返します。
他の引数は基本的にデフォルトで良いです。
bufsize
は必要に応じて増加します。
import zlib
data = b'test data\x00' # 任意のバイナリデータ
decompressed = zlib.decompress(zlib.compress(data))
print(decompressed) # b'test data\x00'
compressobj(level: int = -1, method: int = 8, wbits: int = 15, memLevel: int = 8, strategy: int = 0, zdict: bytes = ...) -> _Compress
一度にメモリ上に置けないようなデータを圧縮するための圧縮オブジェクトを返します。
level
はcompress()
と同じです。
method
は圧縮アルゴリズムで、2019年12月5日現在サポートされている値はDEFLATED = 8
のみです
zdict
は定義済み圧縮辞書で、データ内で繰り返し現れると予想されるバイト列のシーケンスです。
import zlib
import io
data_stream = io.BytesIO(b'test data\x00')
cobj = zlib.compressobj()
compressed = b''
while True:
tmp = data_stream.read(64)
if not tmp:
compressed += cobj.flush()
break
compressed += cobj.compress(tmp)
print(compressed) # b'x\x9c+I-.QHI,Id\x00\x00\x159\x03{'
最後のflush()
を忘れるとデータが不完全になってしまう可能性があります。
decompressobj(wbits: int = 15, zdict: bytes = ...) -> _Decompress
zdict
はcompressobj()
で使用したものと同じでなければなりません。
また、decompressobj()
の呼び出しとdecompress()
の最初の呼び出しの間にzdictに渡したオブジェクトを変更してはいけません。
import zlib
import io
data_stream = io.BytesIO(zlib.compress(b'test data\x00'))
dobj = zlib.decompressobj()
decompressed = b''
while True:
tmp = data_stream.read(64)
if not tmp:
decompressed += dobj.flush()
break
while True:
if not tmp:
break
decompressed += dobj.decompress(tmp)
tmp = dobj.unconsumed_tail
print(decompressed) # b'test data\x00'
バッファに入りきらず、decompress()
呼び出しで処理されなかったバイト列がunconsumed_tail
に入ります。
圧縮ファイルを作る
構造
header, filename&path, compressed_file
の順番で保存されていて、ファイルの数だけこのブロックが繰り返される。
| 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |
|---------------------------------------|
| name_len(uint_32) | file_len(uint_32) |
|---------------------------------------|
実装
python mcp.py TARGET [-o OUTPUT]
で使用できます。
TARGET
にはファイルもしくはディレクトリのパスが入ります。
実際に使うために書いたわけではないので、もし使用する場合は自己責任でお願いします。
解凍は11日のアドベントカレンダーでやります。
import sys
import argparse
import os
import zlib
from ctypes import *
import random
import string
import glob
import io
import shutil
tmp_dir = ''.join(random.choices(
string.ascii_letters + string.digits, k=64))+'_mcptmp'
def main():
p = argparse.ArgumentParser(
description='Compress file and dir', usage='Add target to Command line arguments')
p.add_argument('target', help='Compression target')
p.add_argument('--out', '-o', help='Output file path',
default='compressed.mcp')
if len(sys.argv) < 2:
p.print_help()
target = p.parse_args().target
out = p.parse_args().out
if os.path.isfile(target):
_compress_file(target, out)
elif os.path.isdir(target):
_compress_dir(target, out)
else:
raise Exception('Argument error')
def _compress_file(path: str, out: str):
_create_mtp(os.path.basename(path), path)
size = os.path.getsize(os.path.join(tmp_dir, os.path.basename(path)))
with open(os.path.join(tmp_dir, os.path.basename(path)), 'rb') as t:
with open(out, 'wb') as o:
o.write(_make_file_header(size, os.path.basename(path)))
while True:
tmp = t.read(1024)
if not tmp:
o.flush()
break
o.write(tmp)
def _make_file_header(file_len: int, filename: str) -> bytes:
filename_len = len(filename)
return bytes(FileHeaderStructure(filename_len, file_len)) + filename.encode('UTF-8')
def _compress_dir(path: str, out: str):
files = [p[len(path)-1 + len(os.sep):] for p in glob.glob(
os.path.join(path, '**'), recursive=True) if os.path.isfile(p)]
for f in files:
os.makedirs(os.path.join(tmp_dir, os.path.dirname(f)), exist_ok=True)
_create_mtp(f, os.path.join(path, f))
with open(out, 'wb') as o:
for f in files:
o.write(_make_file_header(
os.path.getsize(os.path.join(tmp_dir, f)), f))
with open(os.path.join(tmp_dir, f), 'rb') as t:
while True:
tmp = t.read(1024)
if not tmp:
break
o.write(tmp)
o.flush()
def _create_mtp(path: str, source: str):
c = zlib.compressobj()
with open(source, mode='rb') as f:
with open(os.path.join(tmp_dir, path), mode='wb') as o:
while True:
t = f.read(1024)
if not t:
o.write(c.flush())
break
ced = c.compress(t)
if ced:
o.write(ced)
def _rem_tmp():
shutil.rmtree(tmp_dir)
class FileHeaderStructure(Structure):
_fields_ = (
('filename_len', c_uint32),
('file_len', c_uint32)
)
if __name__ == "__main__":
main()
_rem_tmp()
圧縮後のサイズを得る方法が思いつかず、一度圧縮したものをファイルに出力してそのファイルのサイズを取得しています。
圧縮したものをメモリ上においておけばlen()
で得れますが、それじゃcompressobj()
を使った意味がなくなってしまいますよね…。
ファイルのデータについているヘッダーを作るのに一苦労しました。
Pythonはこういうこと苦手だからC++とかでやれって話なんでしょうけど。
Pythonには構造体は存在しませんが、Structureを継承したクラスを用いてそれっぽいものを作ることはできるようです。
from ctypes import *
して_fields_
に構造を記述します。
struct.pack(format, values...)
というのもあるようですが、~~整数にしか対応していないようです(普通に使える)~~主要な型はほぼすべて対応しているようです(ドキュメント。