Created
August 6, 2021 15:03
-
-
Save mateon1/2a5803c8b9d0b036af7429c7013561a2 to your computer and use it in GitHub Desktop.
RLE-only zstd compressor (Python3 only, prefer PyPy3)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
def header(): | |
return b"\x28\xb5\x2f\xfd\x00\x38" # ZSTD header, no checksum, no dict, 128K window size | |
def raw(b, end=False): | |
#print("RAW %d bytes" % len(b), repr(b)) | |
blk = [] | |
while len(b) > 128 * 1024: | |
i = struct.pack("<I", 128 * 1024 << 3) | |
blk.append(i[:3] + b[:128 * 1024]) | |
b = b[128 * 1024:] | |
l = len(b) | |
assert l <= 128 * 1024 | |
i = struct.pack("<I", (l << 3) | 0 | (1 if end else 0)) | |
return b"".join(blk) + i[:3] + b | |
def rle(b, l, end=False): | |
assert 4 <= l | |
#print("RLE x%d, byte 0x%02x" % (l, b)) | |
i = struct.pack("<I", (128 * 1024 << 3) | 2) | |
b128 = i[:3] + bytes([b]) | |
n128, l = divmod(l-1, 128 * 1024) | |
l += 1 # zero doesn't make sense, 128KiB does | |
i = struct.pack("<I", (l << 3) | 2 | (1 if end else 0)) | |
return b128 * n128 + i[:3] + bytes([b]) | |
if __name__ == "__main__": | |
import sys | |
with open(sys.argv[1],"rb") as f: | |
data = f.read() | |
ind = [0] | |
for i, (e, l) in enumerate(zip(data[1:], data)): | |
if e != l: ind.append(i+1) | |
rleind = [] | |
for e, l in zip(ind[1:], ind): | |
if e >= l + 5: rleind.append((l,e)) | |
if len(data) >= ind[-1] + 5: | |
rleind.append((ind[-1], len(data))) | |
# RLE of at least 7 bytes (to account for next raw header too!) or two consecutive RLE blocks. | |
# not a perfect heuristic since two 5-byte RLE blocks in a row between raw blocks are still worse that just raw blocks all the way | |
rleind = [c for i,c in enumerate(rleind) if c[1]-c[0]>=7 or (i==0 and c[0]==0) or (i>0 and rleind[i-1][1] == c[0]) or (i+1<len(rleind) and rleind[i+1][0]==c[1])] | |
with open(sys.argv[1]+".rle.zst","wb") as w: | |
w.write(header()) | |
written = 0 | |
for e,t in rleind: | |
if e > written: | |
w.write(raw(data[written:e])) | |
w.write(rle(data[e], t-e, t == len(data))) | |
written = t | |
if written < len(data): | |
w.write(raw(data[written:], True)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment