zlib
underlies most zip
file decompressors, and DEFLATE is one the binary formats used to store compressed data in a bitstream.
The goal of this article is to walk through how my Python DEFLATE compressor implementation works. There are many
guides on the internet that describe how to implement each step of DEFLATE, but very few end up producing a bitstream
that can actually be parsed by a library like zlib
. This article assumes that you roughly know how each step of the DEFLATE algorithm
is implemented, but are having trouble with some of the finer points that are often glossed over.
The code can be found in "deflate.py".
This is probably the easiest part because there are many examples of "correct" LZ77 compression. Make sure, however,
that the default parameters will align with the parameters that are given to zlib
at the end. A good implementation that you can use to check
yours is here.
After LZ77 compression, you will have to convert those tokens into a bitstream. In my implementation, to keep things simple, I used Fixed Huffman Coding, which means that the trees are defined in RFC 1951.
Huffman coding uses two different alphabets--one for both literals and length codes, and one for distance codes. Depending on the value of the literal/length, it will be encoded with a set number of bits. For literals, that is it. For lengths, one must also add on extra bits if necessary. This is because both the length and distance codes have a set number of initial bits (7 and 5, respectively), but also an additional set of bits that determine the offset from the base length/distance. The number of extra bits is determined by the size of the prefix.
def tokens_to_stream(compressed):
it = "110"
for tok in compressed:
if tok.length <= 1:
# Write a literal
code, shift = huff_codes(tok.indicator)
it += f"{code:0{shift}b}"
else:
# Length/distance pair, write `nbits` bits with value `ebits` in reverse order
code, ebits, nbits = length_code(tok.length)
it += f"{code:07b}"[-7:]
if nbits >= 1:
it += f"{ebits:0{nbits}b}"[-nbits:][
::-1
]
code, ebits, nbits = distance_code(tok.offset)
it += f"{code:05b}"[-5:]
if nbits >= 1:
it += f"{ebits:0{nbits}b}"[-nbits:][
::-1
]
# Pad to byte boundary, add terminating byte
return b"".join(
[
int(it[i:i+8][::-1], 2).to_bytes(1, byteorder="big", signed=False)
for i in range(0, len(it), 8)
]
)+b"\x00"
Some important details that are often glossed over:
- You must always write out
nbits
bits, even if the value of those bits is 0. - Any extra bits are written in reverse order, and should be truncated to the minimum number of possible bits.
- Double check that your distance and length codes produce the correct number of bits, etc, for all possible inputs. In my case, I just reversed the tables shown in the RFC, and used that.
- The distance alphabet is a distinct alphabet and has very little relation to the length alphabet except in its construction (base + extra bits).
- Make sure that the output is byte-aligned and ends with a null byte.
Everything covered so far results in a valid DEFLATE stream, but to make it understandable to zlib
in a mode besides raw
, you need to add a header and trailer. The header is two bytes that you can essentially just look up,
but the trailer is an adler32
checksum.
def adler32(data):
a1,a2=1,0
for b in data:
a1=(a1+b) % 65521
a2=(a2+a1) % 65521
return a2.to_bytes(2, byteorder="big")+a1.to_bytes(2, byteorder="big")
bitstream_bytes = b"\x78\x01" + bitstream_bytes + adler32(strk.encode("ascii"))
The header is a magic number, and the adler32
function is a simple checksum calculation, but make sure that it is calculated based on the uncompressed data.
Furthermore, it must be in big-endian byte-order.
Another source of some confusion is the parameters that must be given to zlib
. To test your raw deflate stream,
attempt to decompress with wbits=-15
. Once adding the zlib
header and footer, you must switch to wbits=15
.
All together, it forms a valid DEFLATE stream that can be decoded by zlib
:
strk = """
"Did you win your sword fight?"
"Of course I won the fucking sword fight," Hiro says. "I'm the greatest sword fighter in the world."
"And you wrote the software."
"Yeah. That, too," Hiro says."
"""
compressed = compress(strk)
bitstream_bytes = tokens_to_stream(compressed)
# Add `zlib` header and footer. Checksum is calculated on the uncompressed original
bitstream_bytes = b"\x78\x01" + bitstream_bytes + adler32(strk.encode("ascii"))
import zlib
decompressed_data = zlib.decompress(bitstream_bytes, wbits=15)
assert decompressed_data == strk.encode("ascii")
print(str(decompressed_data, "ascii"))
"Did you win your sword fight?"
"Of course I won the fucking sword fight," Hiro says. "I'm the greatest sword fighter in the world."
"And you wrote the software."
"Yeah. That, too," Hiro says."
Further reading: