Comprehensive Guide to Data Compression: Huffman Coding, Arithmetic Coding, and Asymmetric Numeral Systems
Data compression is a crucial field in computer science, providing the means to store and transmit data efficiently. As digital data continues to grow exponentially, the importance of effective compression techniques cannot be overstated. Compression algorithms reduce the amount of data needed to represent a file or message, enabling faster data transmission and saving storage space. In this guide, we will delve deep into three essential data compression algorithms:
- Huffman Coding: A widely-used algorithm that assigns variable-length codes to input characters, optimizing the encoding based on character frequency.
- Arithmetic Coding: A more advanced technique that represents entire messages as fractions within a specific range, allowing for more efficient compression.
- Asymmetric Numeral Systems (ANS): A modern and innovative approach that combines the strengths of arithmetic coding with faster performance by using natural numbers and dynamic labeling.
This guide will provide an in-depth understanding of each algorithm, its mechanics, advantages, and limitations, supported by detailed examples, diagrams, and practical applications.
Huffman coding, developed by David A. Huffman in 1952, is one of the simplest and most widely used data compression algorithms. It belongs to the family of lossless data compression algorithms, which means that the original data can be perfectly reconstructed from the compressed data. Huffman coding assigns variable-length codes to input characters, with shorter codes assigned to more frequent characters. This results in a compressed output that minimizes the overall bit-length of the encoded data.
- Analyze the input data to determine the frequency of each character.
- Example: Consider the text "ABBCCCDDDDEEEEE":
- A: 1, B: 2, C: 3, D: 4, E: 5.
- Create a priority queue (or a binary tree) where each node is a character with its frequency.
- Combine the two nodes with the lowest frequencies to create a new node with a combined frequency. Repeat this process until only one node remains, forming the root of the tree.
- Traverse the Huffman tree from the root to each leaf node. Assign a '0' for left branches and a '1' for right branches.
- The path to each character determines its binary code.
Using the string "ABBCCCDDDDEEEEE" across all examples ensures consistency:
- Frequency Table:
Character | Frequency |
---|---|
A | 1 |
B | 2 |
C | 3 |
D | 4 |
E | 5 |
-
Building the Huffman Tree:
- Combine A (1) and B (2) to form a node with frequency 3.
- Combine this node (3) with C (3) to form a node with frequency 6.
- Combine this node (6) with D (4) to form a node with frequency 10.
- Combine this node (10) with E (5) to form the root node with frequency 15.
Huffman Tree Diagram:
graph LR
A[1] -- A --> Node3[3]
B[2] -- B --> Node3
Node3 -- Node3 --> Node6[6]
C[3] -- C --> Node6
Node6 -- Node6 --> Node10[10]
D[4] -- D --> Node10
Node10 -- Node10 --> Root[15]
E[5] -- E --> Root
- Generating Huffman Codes:
- A: 1100
- B: 1101
- C: 111
- D: 10
- E: 0
The encoded message "ABBCCCDDDDEEEEE" using Huffman coding is:
1100 1101 1101 111 111 111 10 10 10 10 0 0 0 0 0
Advantages:
- Simple to understand and implement.
- Effective for data with a clear variance in character frequency.
- Guaranteed to produce an optimal prefix code for the given symbol frequencies.
Limitations:
- Not suitable for data where symbol frequencies are nearly uniform.
- Requires knowledge of symbol frequencies beforehand, which may not always be available or feasible.
- Inefficient for small datasets or real-time applications due to tree-building overhead.
- Time Complexity: Building the Huffman tree has a time complexity of (O(n \log n)), where (n) is the number of unique characters. This is due to the sorting and merging operations involved in constructing the tree.
- Space Complexity: The space complexity is (O(n)) for storing the tree and the frequency table.
Huffman coding is widely used in various data compression applications:
- Image Compression: JPEG and PNG file formats utilize Huffman coding for lossless compression of image data.
- Text Compression: Frequently used in file compression utilities like ZIP.
- Data Transmission: Helps reduce the size of data packets for efficient network transmission.
Huffman coding has been a foundational algorithm in the field of data compression since its introduction. Its simplicity and effectiveness have made it a staple in many early compression systems. However, as data and computing needs evolve, Huffman coding faces limitations in efficiency, especially with real-time data and larger datasets. Future advancements may focus on hybrid models that combine Huffman coding with more sophisticated techniques to improve efficiency and adaptability.
Arithmetic coding is a sophisticated data compression technique that goes beyond the limitations of Huffman coding. Instead of assigning fixed codes to individual symbols, arithmetic coding represents the entire message as a single fractional value between 0 and 1. This allows it to achieve higher compression ratios, especially for data with skewed probability distributions.
- Begin with the entire range [0, 1).
- For each symbol, subdivide the current range into smaller segments proportional to the symbol's probability.
- As each symbol is processed, the range is narrowed to represent the message uniquely.
Using the same string "ABBCCCDDDDEEEEE" ensures consistency:
Assume simplified symbol probabilities for the demonstration:
Symbol | Probability |
---|---|
A | 0.1 |
B | 0.2 |
C | 0.2 |
D | 0.2 |
E | 0.3 |
Encoding Process:
-
Start with the range [0, 1).
-
Encoding
A
:- Range is subdivided into:
- A: [0, 0.1), B: [0.1, 0.3), C: [0.3, 0.5), D: [0.5, 0.7), E: [0.7, 1).
- After encoding
A
, the range is [0, 0.1).
- Range is subdivided into:
-
Encoding
B
within [0, 0.1):- Subdivide range into:
- A: [0, 0.01), B: [0.01, 0.03), C: [0.03, 0.05), D: [0.05, 0.07), E: [0.07, 0.1).
- After encoding
B
, the range is [0.01, 0.03).
- Subdivide range into:
-
Encoding another
B
within [0.01, 0.03):- Subdivide range into:
- A: [0.01, 0.012), B: [0.012, 0.018), C: [0.018, 0.024), D: [0.024, 0.026), E: [0.026, 0.03).
- After encoding another
B
, the range is [0.012, 0.018).
- Subdivide range into:
The encoded message "ABB" can be represented by any number in the range [0.012, 0.018), such as 0.015.
Below is a visual representation of the arithmetic coding process, illustrating how the range is subdivided for each symbol.
graph TD
Start[Initial Range 0 to 1]
A[Encode A, Range: 0 to 0.1] --> State1[Range: 0 to 0.1]
State1 --> B[Encode B, Range: 0.01 to 0.03] --> State2[Range: 0.01 to 0.03]
State
2 --> B2[Encode B, Range: 0.012 to 0.018] --> State3[Range: 0.012 to 0.018]
State3 --> Output[Encoded Message]
Advantages:
- Can handle any probability distribution, achieving optimal compression.
- More efficient than Huffman coding, especially for data with complex symbol distributions.
- Produces near-optimal entropy coding.
Disadvantages:
- Computationally intensive, requiring more processing power than Huffman coding.
- Historically faced patent-related adoption barriers.
- More challenging to implement, requiring precision in handling fractional numbers.
- Time Complexity: Arithmetic coding has a time complexity of (O(n)), where (n) is the length of the input data. Each symbol requires constant time to process.
- Space Complexity: The space complexity is also (O(n)) due to the need to store the input, output, and range values for each symbol.
Arithmetic coding is used in applications where high compression ratios are essential:
- Multimedia File Formats: Video codecs like H.264 and H.265 use arithmetic coding for entropy encoding.
- Data Compression: Commonly used in text compression and data archival systems where maximum efficiency is required.
- Embedded Systems: Finds use in systems where memory and storage are at a premium.
Introduced in the 1970s, arithmetic coding has been critical in advancing data compression. Its ability to handle complex distributions made it superior to Huffman coding in many applications. The primary barriers to its adoption were patent restrictions, which have since expired, allowing more widespread use. Future directions may involve optimizing arithmetic coding for faster computation and adapting it to modern real-time processing needs.
Asymmetric Numeral Systems (ANS) is a relatively new compression algorithm that combines the efficiency of arithmetic coding with the speed and simplicity of table-based methods. ANS uses a clever approach by encoding data as natural numbers, making it well-suited for real-time applications. It provides a high compression rate while being computationally less demanding than traditional arithmetic coding.
Step 1: Labeling Natural Numbers
- ANS starts by labeling natural numbers based on symbol frequencies. Each number is labeled with a symbol, ensuring that the frequency of each symbol matches its probability.
Step 2: Encoding a Message
- Each symbol is encoded by transforming the current state into a new state using a predefined function. The transformation depends on the symbol being encoded and the current state.
Step 3: Renormalization
- When the state becomes too large, it is renormalized to maintain the encoding efficiency. This process involves shifting bits out and maintaining a manageable state size.
Using the same string "ABBCCCDDDDEEEEE" ensures consistency:
Assume simplified symbol probabilities:
Symbol | Probability |
---|---|
A | 0.1 |
B | 0.2 |
C | 0.2 |
D | 0.2 |
E | 0.3 |
- Initial State: Choose an initial state, e.g., (X = 10).
- Encoding
A
:- Find the next state for
A
, using a table-based approach or predefined function. The state may become (X = 20).
- Find the next state for
- Encoding
B
:- Update the state based on
B
, such as (X = 45).
- Update the state based on
- Encoding another
B
:- Update the state again, e.g., (X = 91).
Below is a conceptual diagram of the ANS encoding process, showing how natural numbers are labeled and how the state transitions during encoding.
graph TD
Start[Initial State]
A1[Encode A] --> State1[State = 20]
State1 --> B1[Encode B] --> State2[State = 45]
State2 --> B2[Encode B] --> State3[State = 91]
State3 --> Output[Encoded Message]
Advantages:
- Combines the efficiency of arithmetic coding with the speed of table-based methods.
- Suitable for real-time applications due to its fast encoding and decoding processes.
- No floating-point arithmetic needed, reducing computational complexity.
- Public domain algorithm, avoiding the patent issues that hindered arithmetic coding.
Limitations:
- More complex to understand and implement than Huffman coding.
- Performance depends on the proper design of symbol labeling and state transitions.
- Limited ability to handle adaptive contexts compared to arithmetic coding.
- Time Complexity: ANS offers a time complexity of (O(1)) per symbol due to its table-driven nature, making it extremely fast.
- Space Complexity: The space complexity is (O(n)), where (n) is the number of symbols, as it requires space to store tables and state information.
ANS is rapidly gaining popularity in modern applications:
- JPEG XL: Uses ANS for efficient and fast image compression, aiming to replace older JPEG formats.
- Video Encoding: Incorporated into video codecs to improve compression rates and decoding speed.
- Real-Time Data Streaming: Ideal for scenarios requiring low-latency data transmission, such as live video streaming.
ANS, introduced by Jarek Duda in the early 2010s, represents a significant leap forward in compression technology. By addressing the speed limitations of arithmetic coding while maintaining efficiency, ANS has become a preferred choice for many real-time and high-performance applications. As research continues, we can expect further optimizations and potential adaptations of ANS to support adaptive contexts and other advancements.
Data compression is a vital tool in the digital age, enabling efficient storage and transmission of information. Understanding the principles and mechanisms behind different compression algorithms can significantly impact how we handle data. Here's a summary of the key points covered:
- Huffman Coding is a foundational technique in data compression, suitable for cases with variable symbol frequencies. It constructs a binary tree and assigns shorter codes to more frequent symbols.
- Arithmetic Coding offers higher compression rates by encoding entire messages as fractional numbers, providing near-optimal compression for complex data distributions. It faces challenges in implementation and computational efficiency.
- Asymmetric Numeral Systems (ANS) brings together the best of both worlds, achieving high compression efficiency with faster encoding and decoding speeds. It is becoming increasingly popular in applications that demand both high compression ratios and low latency.
Here is a visual representation to highlight the differences between these three compression methods:
- Huffman Coding Tree: Visualizing how characters are represented by variable-length codes based on symbol frequency.
- Arithmetic Coding Flowchart: Illustrating the process of subdividing the range for each symbol to encode a message.
- ANS Encoding Diagram: Demonstrating the steps involved in encoding a message using ANS.
- Wikipedia: Huffman Coding
- Wikipedia: Arithmetic Coding
- Wikipedia: Asymmetric Numeral Systems
- David Salomon's "Data Compression: The Complete Reference"
- Jarek Duda's Research on ANS
By understanding these algorithms, you can select the most appropriate compression method for your specific needs, whether it be for image storage, video streaming, or efficient data transmission. As technology continues to advance, mastering these techniques will become increasingly crucial in managing the vast amounts of digital data generated daily.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"this builds up the ideas of the tabled asymmetric numeral system"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"given probs: {'a': 0.6948839490593655, 'b': 0.007950499291690652, 'c': 0.2971655516489439}\n",
"generated labeling: aacaacaaacaacaacaaacaacaacaaacab\n",
"labeling probs: {'b': 0.03125, 'c': 0.28125, 'a': 0.6875}\n"
]
}
],
"source": [
"from random import choices, random\n",
"from math import log2\n",
"\n",
"# generate a random probability distribution\n",
"def gen_probs(symbols: list | str):\n",
" return { s: random() for s in symbols }\n",
"PROBS = gen_probs("abc")\n",
"total = sum(PROBS.values())\n",
"for c in PROBS:\n",
" assert isinstance(c, str) and len(c) == 1, f"invalid symbol: '{c}'; all symbols must be single characters"\n",
" PROBS[c] /= total\n",
"SYMBOLS, WEIGHTS = list(PROBS.keys()), list(PROBS.values())\n",
"\n",
"# there are many ways you can generate labelings\n",
"def spread(freqs: dict, length: int):\n",
" labeling = ""\n",
" total = sum(freqs.values())\n",
" probs = { c: total / freq for c, freq in freqs.items() }\n",
" charset = set(probs.keys())\n",
" while len(labeling) < length - len(charset):\n",
" min_c = min(probs, key=probs.get)\n",
" labeling += min_c\n",
" if min_c in charset: charset.remove(min_c)\n",
" probs[min_c] += total / freqs[min_c]\n",
" for c in charset: labeling += c\n",
" return labeling\n",
"BLOCK_LABELING = spread(PROBS, 32)\n",
"\n",
"print("given probs: ", PROBS)\n",
"print("generated labeling: ", BLOCK_LABELING)\n",
"print("labeling probs: ", { c: BLOCK_LABELING.count(c) / len(BLOCK_LABELING) for c in set(BLOCK_LABELING) })"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"# just for comparison\n",
"class Huffman:\n",
" def init(self, freq_table):\n",
" table = { c: "" for c in freq_table }\n",
" sorted_chars = sorted(freq_table.items(), key=lambda x: x[1])\n",
" while len(sorted_chars) > 1:\n",
" left, right = sorted_chars.pop(0), sorted_chars.pop(0)\n",
" for c in left[0]: table[c] = "0" + table[c]\n",
" for c in right[0]: table[c] = "1" + table[c]\n",
" total_freq = left[1] + right[1]\n",
" new_index = next((i for i, x in enumerate(sorted_chars) if x[1] >= total_freq), len(sorted_chars))\n",
" sorted_chars.insert(new_index, (left[0] + right[0], total_freq))\n",
" self.encoding_table = table\n",
" self.decoding_table = { v: k for k, v in table.items() }\n",
" \n",
" def encode(self, message): return ''.join(self.encoding_table[c] for c in message)\n",
" def decode(self, code):\n",
" message = ""\n",
" code_word = ""\n",
" for c in code:\n",
" code_word += c\n",
" if code_word in self.decoding_table:\n",
" message += self.decoding_table[code_word]\n",
" code_word = ""\n",
" return message\n",
"\n",
"huffman = Huffman(PROBS)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Base version\n",
"counts symbols by blocks"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"class ANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
"\n",
" #
symbol
appears in each blockcount_per_block[symbol]
times\n"," #
count_before_index[i]
is the number of numbers labeledlabeling[i]
that are less thani
\n"," #
symbol_table[symbol][i]
is the index inlabeling
of thei
th number labeledsymbol
\n"," self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
"\n",
" # if the initial value of
X
is too small, thenC(X)
might equalX
. but we wantC(X)
to be different fromX
for everything to be reversible\n"," # this is analagous to "appending a non-zero digit to the left of
X
" except in ANS it's "non-first-symbol symbol" instead\n"," # if you're sure that no message will ever start with the first symbol, you can set
initial_state = 0
\n"," self.initial_state = next((i for i, c in enumerate(labeling) if c != labeling[0]), len(labeling))\n",
" \n",
" def C(self, state: int, symbol: str):\n",
" """Returns the
state + 1
th number labeledsymbol
"""\n","\n",
" # full_blocks * count_per_block[symbol] + symbols_left = state + 1\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol] # equivalently, (state + 1) - full_blocks * self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" # Count
symbols_left
symbols within the block\n"," index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" # if
block_size
is a power of 2, this multiplication is a bitshift\n"," return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled
symbol
that are less thanstate
"""\n","\n",
" index_within_block = state % self.block_size # if
block_size
is a power of 2, this isstate & (block_size - 1)
\n"," symbol = self.labeling[index_within_block]\n",
"\n",
" num_previous_blocks = state // self.block_size # if
block_size
is a power of 2, this division is a bitshift\n"," count_before_block = self.count_per_block[symbol] * num_previous_blocks\n",
" \n",
" return symbol, count_before_block + self.count_before_index[index_within_block]\n",
" \n",
" def encode(self, message: str | list):\n",
" state = self.initial_state\n",
" for symbol in message[::-1]:\n",
" state = self.C(state, symbol)\n",
" return state\n",
" \n",
" def decode(self, state: int):\n",
" message = ""\n",
" while state > self.initial_state:\n",
" symbol, state = self.D(state)\n",
" message += symbol\n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 37620.468208228645\n",
"code length: 38370 (1.992% longer than information)\n",
"huffman: 52210 (38.781% longer than information)\n"
]
}
],
"source": [
"ans = ANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 40000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state = ans.encode(message)\n",
"\n",
"ans_length = state.bit_length() - 1 # the first bit is always 1, so it's not counted\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state)\n",
"assert decoded == message, message + ", " + decoded"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Streamed version\n",
"incorporates renormalization\n",
"\n",
"
state
will now always be in[block_size, block_size * 2 - 1]
. This means we have to initializestate
to a number in that range, but the decoder doesn't need to knowinitial_state
anymore, so you could store some data ininitial_state
that the decoder will reproduce at the end\n","\n",
"With streaming, we take a hit on compression rate"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"class StreamANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
"\n",
" self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
" \n",
" def C(self, state: int, symbol: str):\n",
" """Returns the
state + 1
th number labeledsymbol
"""\n","\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" # Count
symbols_left
symbols within the block\n"," index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled
symbol
that are less thanstate
"""\n","\n",
" # because of renormalization,
state
is guaranteed to be in [block_size, 2 * block_size - 1]\n"," index_within_block = state - self.block_size\n",
" symbol = self.labeling[index_within_block]\n",
" \n",
" return symbol, self.count_per_block[symbol] + self.count_before_index[index_within_block]\n",
" \n",
" def encode(self, message: str | list, initial_state: int = 0):\n",
" bitstream = 1\n",
" state = [i for i in range(self.block_size) if self.C(i, message[-1]) >= self.block_size][initial_state] # the
initial_state
th number X such thatblock_size <= C(X) < 2 * block_size
\n","\n",
" for symbol in message[::-1]:\n",
"\n",
" # shrink
X
untilC(X) < 2N
\n"," predicted = self.C(state, symbol)\n",
" normalized_state = state\n",
" while predicted >= 2 * self.block_size:\n",
" bitstream <<= 1\n",
" bitstream |= normalized_state & 1\n",
" normalized_state >>= 1\n",
" predicted = self.C(normalized_state, symbol)\n",
" \n",
" state = predicted\n",
" \n",
" return state, bitstream\n",
" \n",
" def decode(self, state: int, bitstream):\n",
" message = ""\n",
" \n",
" while True:\n",
" symbol, state = self.D(state)\n",
" message += symbol\n",
"\n",
" # expand
X
untilX >= N
or we run out of bits\n"," while state < self.block_size and bitstream > 1:\n",
" state <<= 1\n",
" state |= bitstream & 1\n",
" bitstream >>= 1\n",
" if state < self.block_size: break\n",
" \n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 112715.83728822623\n",
"code length: 114047 (1.181% longer than information)\n",
"huffman: 156605 (38.938% longer than information)\n"
]
}
],
"source": [
"ans = StreamANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 120000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state, bitstring = ans.encode(message)\n",
"\n",
"ans_length = len(BLOCK_LABELING).bit_length() + bitstring.bit_length() - 1\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state, bitstring)\n",
"assert decoded == message, message + ", " + decoded"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tabled version\n",
"
decode
andencode
don't useC
orD
anymore; they only referencerenormalization_table
,encoding_table
, anddecoding_table
, which are generated at initialization. The table generation I'm doing is slow; you should probably look at other implementations to see how they do it\n","\n",
"Because we're storing the number of bits to renormalize with in the table, this only works when
block_size
is a power of 2(?); otherwise, encoding the same symbol into the same state might not pop the same number of bits to the bitstream. Duda mentions this in his paper; I'm not exactly sure how he does it"]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [],
"source": [
"class tANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
" self.block_mask = self.block_size - 1 # X & block_mask = X % block_size\n",
"\n",
" # these are now only used during initialization to generate tables\n",
" self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
"\n",
" self.generate_tables()\n",
" \n",
" #
C
andD
are only used during initialization\n"," def C(self, state: int, symbol: str):\n",
" """Returns the
state + 1
th number labeledsymbol
"""\n","\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled
symbol
that are less thanstate
"""\n","\n",
" index_within_block = state & self.block_mask\n",
" symbol = self.labeling[index_within_block]\n",
" count_within_block = sum(c == symbol for c in self.labeling[:index_within_block])\n",
" \n",
" return symbol, self.count_per_block[symbol] + count_within_block\n",
" \n",
" # the way i'm generating these is bad; i just wrote it like this to make it more obvious what it's doing\n",
" def generate_tables(self):\n",
" # i'm not exactly sure about the proof, but encoding any one symbol
symbol
will always result in eithern_bits_chopped
orn_bits_chopped + 1
being output into the bitstream during renormalization\n"," # it's
n_bits_chopped + 1
when the state is over or equal tomin_state
. except here i usemin_state = 0
as a special case where the number of bits output is nevern_bits_chopped + 1
\n"," renormalization_table = {}\n",
" for symbol in self.symbol_table.keys():\n",
" n_bits_chopped = -1\n",
" min_state = 0\n",
" prev_n_bits = -1\n",
" for state in range(self.block_size, self.block_size * 2):\n",
" renormalized = state\n",
" n_bits = 0\n",
" while self.C(renormalized, symbol) >= self.block_size * 2:\n",
" renormalized >>= 1\n",
" n_bits += 1\n",
" if n_bits != prev_n_bits:\n",
" prev_n_bits = n_bits\n",
" if n_bits_chopped < 0:\n",
" n_bits_chopped = n_bits\n",
" else:\n",
" assert min_state == 0, "uh oh"\n",
" min_state = state\n",
" renormalization_table[symbol] = (n_bits_chopped, min_state, (1 << n_bits_chopped) - 1)\n",
" self.renormalization_table = renormalization_table\n",
"\n",
" encoding_table = { c: {} for c in self.symbol_table.keys() }\n",
" decoding_table = [0 for i in range(self.block_size)]\n",
" for state in range(self.block_size, self.block_size * 2):\n",
" symbol, old_state = self.D(state)\n",
" encoding_table[symbol][old_state] = state\n",
" n_bits = 0\n",
" scaled_state = old_state\n",
" while scaled_state < self.block_size:\n",
" scaled_state <<= 1\n",
" n_bits += 1\n",
" decoding_table[state - self.block_size] = (old_state << n_bits, n_bits, (1 << n_bits) - 1)\n",
" self.encoding_table = encoding_table\n",
" self.decoding_table = decoding_table\n",
" \n",
" def encode(self, message: str | list, initial_state: int = 0):\n",
" bitstream = 1\n",
" state = [i for i, c in enumerate(self.labeling) if c == message[-1]][initial_state] # the
initial_state
th number labeledmessage[-1]
\n"," assert self.decoding_table[state][1] > 0, "code is ambiguous because initial state is too large" # we want this symbol to prompt the decoder to scale back up so that it can run out of bits and terminate\n",
"\n",
" state += self.block_size\n",
" for symbol in message[-2::-1]:\n",
" n_bits, min_state, mask = self.renormalization_table[symbol]\n",
" if min_state > 0 and state >= min_state:\n",
" n_bits += 1\n",
" mask = (mask << 1) | 1\n",
" bitstream = (bitstream << n_bits) | (state & mask)\n",
" state = self.encoding_table[symbol][state >> n_bits]\n",
" \n",
" return state, bitstream\n",
" \n",
" def decode(self, state: int, bitstream):\n",
" message = ""\n",
" \n",
" while True:\n",
" state -= self.block_size\n",
" message += self.labeling[state]\n",
" (state, num_bits, mask) = self.decoding_table[state]\n",
" if num_bits > 0 and bitstream <= 1: break\n",
" state |= bitstream & mask\n",
" bitstream >>= num_bits\n",
" \n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 140855.6232644608\n",
"code length: 142481 (1.154% longer than information)\n",
"huffman: 195671 (38.916% longer than information)\n"
]
}
],
"source": [
"ans = tANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 150000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state, bitstring = ans.encode(message)\n",
"\n",
"ans_length = len(BLOCK_LABELING).bit_length() + bitstring.bit_length() - 1\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state, bitstring)\n",
"assert decoded == message, message + ", " + decoded"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}