Skip to content

Instantly share code, notes, and snippets.

@mateon1
Created August 6, 2021 15:03
Show Gist options
  • Save mateon1/2a5803c8b9d0b036af7429c7013561a2 to your computer and use it in GitHub Desktop.
Save mateon1/2a5803c8b9d0b036af7429c7013561a2 to your computer and use it in GitHub Desktop.
RLE-only zstd compressor (Python3 only, prefer PyPy3)
import struct
def header():
return b"\x28\xb5\x2f\xfd\x00\x38" # ZSTD header, no checksum, no dict, 128K window size
def raw(b, end=False):
#print("RAW %d bytes" % len(b), repr(b))
blk = []
while len(b) > 128 * 1024:
i = struct.pack("<I", 128 * 1024 << 3)
blk.append(i[:3] + b[:128 * 1024])
b = b[128 * 1024:]
l = len(b)
assert l <= 128 * 1024
i = struct.pack("<I", (l << 3) | 0 | (1 if end else 0))
return b"".join(blk) + i[:3] + b
def rle(b, l, end=False):
assert 4 <= l
#print("RLE x%d, byte 0x%02x" % (l, b))
i = struct.pack("<I", (128 * 1024 << 3) | 2)
b128 = i[:3] + bytes([b])
n128, l = divmod(l-1, 128 * 1024)
l += 1 # zero doesn't make sense, 128KiB does
i = struct.pack("<I", (l << 3) | 2 | (1 if end else 0))
return b128 * n128 + i[:3] + bytes([b])
if __name__ == "__main__":
import sys
with open(sys.argv[1],"rb") as f:
data = f.read()
ind = [0]
for i, (e, l) in enumerate(zip(data[1:], data)):
if e != l: ind.append(i+1)
rleind = []
for e, l in zip(ind[1:], ind):
if e >= l + 5: rleind.append((l,e))
if len(data) >= ind[-1] + 5:
rleind.append((ind[-1], len(data)))
# RLE of at least 7 bytes (to account for next raw header too!) or two consecutive RLE blocks.
# not a perfect heuristic since two 5-byte RLE blocks in a row between raw blocks are still worse that just raw blocks all the way
rleind = [c for i,c in enumerate(rleind) if c[1]-c[0]>=7 or (i==0 and c[0]==0) or (i>0 and rleind[i-1][1] == c[0]) or (i+1<len(rleind) and rleind[i+1][0]==c[1])]
with open(sys.argv[1]+".rle.zst","wb") as w:
w.write(header())
written = 0
for e,t in rleind:
if e > written:
w.write(raw(data[written:e]))
w.write(rle(data[e], t-e, t == len(data)))
written = t
if written < len(data):
w.write(raw(data[written:], True))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment