Skip to content

Instantly share code, notes, and snippets.

@caputomarcos
Last active August 30, 2024 02:29
Show Gist options
  • Save caputomarcos/68143c17d9d9f72d2e84acadaf6b60a7 to your computer and use it in GitHub Desktop.
Save caputomarcos/68143c17d9d9f72d2e84acadaf6b60a7 to your computer and use it in GitHub Desktop.

Comprehensive Guide to Data Compression: Huffman Coding, Arithmetic Coding, and Asymmetric Numeral Systems

Introduction

Data compression is a crucial field in computer science, providing the means to store and transmit data efficiently. As digital data continues to grow exponentially, the importance of effective compression techniques cannot be overstated. Compression algorithms reduce the amount of data needed to represent a file or message, enabling faster data transmission and saving storage space. In this guide, we will delve deep into three essential data compression algorithms:

  1. Huffman Coding: A widely-used algorithm that assigns variable-length codes to input characters, optimizing the encoding based on character frequency.
  2. Arithmetic Coding: A more advanced technique that represents entire messages as fractions within a specific range, allowing for more efficient compression.
  3. Asymmetric Numeral Systems (ANS): A modern and innovative approach that combines the strengths of arithmetic coding with faster performance by using natural numbers and dynamic labeling.

This guide will provide an in-depth understanding of each algorithm, its mechanics, advantages, and limitations, supported by detailed examples, diagrams, and practical applications.


1. Huffman Coding: The Foundation of Data Compression

1.1 Overview of Huffman Coding

Huffman coding, developed by David A. Huffman in 1952, is one of the simplest and most widely used data compression algorithms. It belongs to the family of lossless data compression algorithms, which means that the original data can be perfectly reconstructed from the compressed data. Huffman coding assigns variable-length codes to input characters, with shorter codes assigned to more frequent characters. This results in a compressed output that minimizes the overall bit-length of the encoded data.

1.2 How Huffman Coding Works

Step 1: Frequency Analysis

  • Analyze the input data to determine the frequency of each character.
  • Example: Consider the text "ABBCCCDDDDEEEEE":
    • A: 1, B: 2, C: 3, D: 4, E: 5.

Step 2: Building the Huffman Tree

  • Create a priority queue (or a binary tree) where each node is a character with its frequency.
  • Combine the two nodes with the lowest frequencies to create a new node with a combined frequency. Repeat this process until only one node remains, forming the root of the tree.

Step 3: Generating Huffman Codes

  • Traverse the Huffman tree from the root to each leaf node. Assign a '0' for left branches and a '1' for right branches.
  • The path to each character determines its binary code.

1.3 Example of Huffman Coding

Using the string "ABBCCCDDDDEEEEE" across all examples ensures consistency:

  1. Frequency Table:
Character Frequency
A 1
B 2
C 3
D 4
E 5
  1. Building the Huffman Tree:

    • Combine A (1) and B (2) to form a node with frequency 3.
    • Combine this node (3) with C (3) to form a node with frequency 6.
    • Combine this node (6) with D (4) to form a node with frequency 10.
    • Combine this node (10) with E (5) to form the root node with frequency 15.

Huffman Tree Diagram:

graph LR
    A[1] -- A --> Node3[3]
    B[2] -- B --> Node3
    Node3 -- Node3 --> Node6[6]
    C[3] -- C --> Node6
    Node6 -- Node6 --> Node10[10]
    D[4] -- D --> Node10
    Node10 -- Node10 --> Root[15]
    E[5] -- E --> Root
Loading
  1. Generating Huffman Codes:
  • A: 1100
  • B: 1101
  • C: 111
  • D: 10
  • E: 0

The encoded message "ABBCCCDDDDEEEEE" using Huffman coding is:

1100 1101 1101 111 111 111 10 10 10 10 0 0 0 0 0

1.4 Advantages and Limitations of Huffman Coding

Advantages:

  • Simple to understand and implement.
  • Effective for data with a clear variance in character frequency.
  • Guaranteed to produce an optimal prefix code for the given symbol frequencies.

Limitations:

  • Not suitable for data where symbol frequencies are nearly uniform.
  • Requires knowledge of symbol frequencies beforehand, which may not always be available or feasible.
  • Inefficient for small datasets or real-time applications due to tree-building overhead.

1.5 Complexity of Huffman Coding

  • Time Complexity: Building the Huffman tree has a time complexity of (O(n \log n)), where (n) is the number of unique characters. This is due to the sorting and merging operations involved in constructing the tree.
  • Space Complexity: The space complexity is (O(n)) for storing the tree and the frequency table.

1.6 Practical Applications of Huffman Coding

Huffman coding is widely used in various data compression applications:

  • Image Compression: JPEG and PNG file formats utilize Huffman coding for lossless compression of image data.
  • Text Compression: Frequently used in file compression utilities like ZIP.
  • Data Transmission: Helps reduce the size of data packets for efficient network transmission.

1.7 Historical Context and Future Directions

Huffman coding has been a foundational algorithm in the field of data compression since its introduction. Its simplicity and effectiveness have made it a staple in many early compression systems. However, as data and computing needs evolve, Huffman coding faces limitations in efficiency, especially with real-time data and larger datasets. Future advancements may focus on hybrid models that combine Huffman coding with more sophisticated techniques to improve efficiency and adaptability.


2. Arithmetic Coding: Precision in Compression

2.1 Overview of Arithmetic Coding

Arithmetic coding is a sophisticated data compression technique that goes beyond the limitations of Huffman coding. Instead of assigning fixed codes to individual symbols, arithmetic coding represents the entire message as a single fractional value between 0 and 1. This allows it to achieve higher compression ratios, especially for data with skewed probability distributions.

2.2 How Arithmetic Coding Works

Step 1: Initial Range Setting

  • Begin with the entire range [0, 1).

Step 2: Subdividing the Range

  • For each symbol, subdivide the current range into smaller segments proportional to the symbol's probability.

Step 3: Narrowing Down the Range

  • As each symbol is processed, the range is narrowed to represent the message uniquely.

2.3 Example of Arithmetic Coding

Using the same string "ABBCCCDDDDEEEEE" ensures consistency:

Assume simplified symbol probabilities for the demonstration:

Symbol Probability
A 0.1
B 0.2
C 0.2
D 0.2
E 0.3

Encoding Process:

  1. Start with the range [0, 1).

  2. Encoding A:

    • Range is subdivided into:
      • A: [0, 0.1), B: [0.1, 0.3), C: [0.3, 0.5), D: [0.5, 0.7), E: [0.7, 1).
    • After encoding A, the range is [0, 0.1).
  3. Encoding B within [0, 0.1):

    • Subdivide range into:
      • A: [0, 0.01), B: [0.01, 0.03), C: [0.03, 0.05), D: [0.05, 0.07), E: [0.07, 0.1).
    • After encoding B, the range is [0.01, 0.03).
  4. Encoding another B within [0.01, 0.03):

    • Subdivide range into:
      • A: [0.01, 0.012), B: [0.012, 0.018), C: [0.018, 0.024), D: [0.024, 0.026), E: [0.026, 0.03).
    • After encoding another B, the range is [0.012, 0.018).

The encoded message "ABB" can be represented by any number in the range [0.012, 0.018), such as 0.015.

2.4 Visualization of Arithmetic Coding

Below is a visual representation of the arithmetic coding process, illustrating how the range is subdivided for each symbol.

graph TD
    Start[Initial Range 0 to 1]
    A[Encode A, Range: 0 to 0.1] --> State1[Range: 0 to 0.1]
    State1 --> B[Encode B, Range: 0.01 to 0.03] --> State2[Range: 0.01 to 0.03]
    State

2 --> B2[Encode B, Range: 0.012 to 0.018] --> State3[Range: 0.012 to 0.018]
    State3 --> Output[Encoded Message]
Loading

2.5 Pros and Cons of Arithmetic Coding

Advantages:

  • Can handle any probability distribution, achieving optimal compression.
  • More efficient than Huffman coding, especially for data with complex symbol distributions.
  • Produces near-optimal entropy coding.

Disadvantages:

  • Computationally intensive, requiring more processing power than Huffman coding.
  • Historically faced patent-related adoption barriers.
  • More challenging to implement, requiring precision in handling fractional numbers.

2.6 Complexity of Arithmetic Coding

  • Time Complexity: Arithmetic coding has a time complexity of (O(n)), where (n) is the length of the input data. Each symbol requires constant time to process.
  • Space Complexity: The space complexity is also (O(n)) due to the need to store the input, output, and range values for each symbol.

2.7 Practical Applications of Arithmetic Coding

Arithmetic coding is used in applications where high compression ratios are essential:

  • Multimedia File Formats: Video codecs like H.264 and H.265 use arithmetic coding for entropy encoding.
  • Data Compression: Commonly used in text compression and data archival systems where maximum efficiency is required.
  • Embedded Systems: Finds use in systems where memory and storage are at a premium.

2.8 Historical Context and Future Directions

Introduced in the 1970s, arithmetic coding has been critical in advancing data compression. Its ability to handle complex distributions made it superior to Huffman coding in many applications. The primary barriers to its adoption were patent restrictions, which have since expired, allowing more widespread use. Future directions may involve optimizing arithmetic coding for faster computation and adapting it to modern real-time processing needs.


3. Asymmetric Numeral Systems (ANS): A New Frontier in Compression

3.1 Introduction to Asymmetric Numeral Systems

Asymmetric Numeral Systems (ANS) is a relatively new compression algorithm that combines the efficiency of arithmetic coding with the speed and simplicity of table-based methods. ANS uses a clever approach by encoding data as natural numbers, making it well-suited for real-time applications. It provides a high compression rate while being computationally less demanding than traditional arithmetic coding.

3.2 How ANS Works

Step 1: Labeling Natural Numbers

  • ANS starts by labeling natural numbers based on symbol frequencies. Each number is labeled with a symbol, ensuring that the frequency of each symbol matches its probability.

Step 2: Encoding a Message

  • Each symbol is encoded by transforming the current state into a new state using a predefined function. The transformation depends on the symbol being encoded and the current state.

Step 3: Renormalization

  • When the state becomes too large, it is renormalized to maintain the encoding efficiency. This process involves shifting bits out and maintaining a manageable state size.

3.3 Example of ANS Encoding

Using the same string "ABBCCCDDDDEEEEE" ensures consistency:

Assume simplified symbol probabilities:

Symbol Probability
A 0.1
B 0.2
C 0.2
D 0.2
E 0.3
  1. Initial State: Choose an initial state, e.g., (X = 10).
  2. Encoding A:
    • Find the next state for A, using a table-based approach or predefined function. The state may become (X = 20).
  3. Encoding B:
    • Update the state based on B, such as (X = 45).
  4. Encoding another B:
    • Update the state again, e.g., (X = 91).

3.4 Visualization of ANS Encoding

Below is a conceptual diagram of the ANS encoding process, showing how natural numbers are labeled and how the state transitions during encoding.

graph TD
    Start[Initial State]
    A1[Encode A] --> State1[State = 20]
    State1 --> B1[Encode B] --> State2[State = 45]
    State2 --> B2[Encode B] --> State3[State = 91]
    State3 --> Output[Encoded Message]
Loading

3.5 Advantages and Limitations of ANS

Advantages:

  • Combines the efficiency of arithmetic coding with the speed of table-based methods.
  • Suitable for real-time applications due to its fast encoding and decoding processes.
  • No floating-point arithmetic needed, reducing computational complexity.
  • Public domain algorithm, avoiding the patent issues that hindered arithmetic coding.

Limitations:

  • More complex to understand and implement than Huffman coding.
  • Performance depends on the proper design of symbol labeling and state transitions.
  • Limited ability to handle adaptive contexts compared to arithmetic coding.

3.6 Complexity of ANS

  • Time Complexity: ANS offers a time complexity of (O(1)) per symbol due to its table-driven nature, making it extremely fast.
  • Space Complexity: The space complexity is (O(n)), where (n) is the number of symbols, as it requires space to store tables and state information.

3.7 Practical Applications of ANS

ANS is rapidly gaining popularity in modern applications:

  • JPEG XL: Uses ANS for efficient and fast image compression, aiming to replace older JPEG formats.
  • Video Encoding: Incorporated into video codecs to improve compression rates and decoding speed.
  • Real-Time Data Streaming: Ideal for scenarios requiring low-latency data transmission, such as live video streaming.

3.8 Historical Context and Future Directions

ANS, introduced by Jarek Duda in the early 2010s, represents a significant leap forward in compression technology. By addressing the speed limitations of arithmetic coding while maintaining efficiency, ANS has become a preferred choice for many real-time and high-performance applications. As research continues, we can expect further optimizations and potential adaptations of ANS to support adaptive contexts and other advancements.


Conclusion

Data compression is a vital tool in the digital age, enabling efficient storage and transmission of information. Understanding the principles and mechanisms behind different compression algorithms can significantly impact how we handle data. Here's a summary of the key points covered:

Summary

  1. Huffman Coding is a foundational technique in data compression, suitable for cases with variable symbol frequencies. It constructs a binary tree and assigns shorter codes to more frequent symbols.
  2. Arithmetic Coding offers higher compression rates by encoding entire messages as fractional numbers, providing near-optimal compression for complex data distributions. It faces challenges in implementation and computational efficiency.
  3. Asymmetric Numeral Systems (ANS) brings together the best of both worlds, achieving high compression efficiency with faster encoding and decoding speeds. It is becoming increasingly popular in applications that demand both high compression ratios and low latency.

Visual Summary

Here is a visual representation to highlight the differences between these three compression methods:

  1. Huffman Coding Tree: Visualizing how characters are represented by variable-length codes based on symbol frequency.
  2. Arithmetic Coding Flowchart: Illustrating the process of subdividing the range for each symbol to encode a message.
  3. ANS Encoding Diagram: Demonstrating the steps involved in encoding a message using ANS.

Further Reading and Resources


By understanding these algorithms, you can select the most appropriate compression method for your specific needs, whether it be for image storage, video streaming, or efficient data transmission. As technology continues to advance, mastering these techniques will become increasingly crucial in managing the vast amounts of digital data generated daily.

image

@caputomarcos
Copy link
Author

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"this builds up the ideas of the tabled asymmetric numeral system"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"given probs: {'a': 0.6948839490593655, 'b': 0.007950499291690652, 'c': 0.2971655516489439}\n",
"generated labeling: aacaacaaacaacaacaaacaacaacaaacab\n",
"labeling probs: {'b': 0.03125, 'c': 0.28125, 'a': 0.6875}\n"
]
}
],
"source": [
"from random import choices, random\n",
"from math import log2\n",
"\n",
"# generate a random probability distribution\n",
"def gen_probs(symbols: list | str):\n",
" return { s: random() for s in symbols }\n",
"PROBS = gen_probs("abc")\n",
"total = sum(PROBS.values())\n",
"for c in PROBS:\n",
" assert isinstance(c, str) and len(c) == 1, f"invalid symbol: '{c}'; all symbols must be single characters"\n",
" PROBS[c] /= total\n",
"SYMBOLS, WEIGHTS = list(PROBS.keys()), list(PROBS.values())\n",
"\n",
"# there are many ways you can generate labelings\n",
"def spread(freqs: dict, length: int):\n",
" labeling = ""\n",
" total = sum(freqs.values())\n",
" probs = { c: total / freq for c, freq in freqs.items() }\n",
" charset = set(probs.keys())\n",
" while len(labeling) < length - len(charset):\n",
" min_c = min(probs, key=probs.get)\n",
" labeling += min_c\n",
" if min_c in charset: charset.remove(min_c)\n",
" probs[min_c] += total / freqs[min_c]\n",
" for c in charset: labeling += c\n",
" return labeling\n",
"BLOCK_LABELING = spread(PROBS, 32)\n",
"\n",
"print("given probs: ", PROBS)\n",
"print("generated labeling: ", BLOCK_LABELING)\n",
"print("labeling probs: ", { c: BLOCK_LABELING.count(c) / len(BLOCK_LABELING) for c in set(BLOCK_LABELING) })"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"# just for comparison\n",
"class Huffman:\n",
" def init(self, freq_table):\n",
" table = { c: "" for c in freq_table }\n",
" sorted_chars = sorted(freq_table.items(), key=lambda x: x[1])\n",
" while len(sorted_chars) > 1:\n",
" left, right = sorted_chars.pop(0), sorted_chars.pop(0)\n",
" for c in left[0]: table[c] = "0" + table[c]\n",
" for c in right[0]: table[c] = "1" + table[c]\n",
" total_freq = left[1] + right[1]\n",
" new_index = next((i for i, x in enumerate(sorted_chars) if x[1] >= total_freq), len(sorted_chars))\n",
" sorted_chars.insert(new_index, (left[0] + right[0], total_freq))\n",
" self.encoding_table = table\n",
" self.decoding_table = { v: k for k, v in table.items() }\n",
" \n",
" def encode(self, message): return ''.join(self.encoding_table[c] for c in message)\n",
" def decode(self, code):\n",
" message = ""\n",
" code_word = ""\n",
" for c in code:\n",
" code_word += c\n",
" if code_word in self.decoding_table:\n",
" message += self.decoding_table[code_word]\n",
" code_word = ""\n",
" return message\n",
"\n",
"huffman = Huffman(PROBS)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Base version\n",
"counts symbols by blocks"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"class ANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
"\n",
" # symbol appears in each block count_per_block[symbol] times\n",
" # count_before_index[i] is the number of numbers labeled labeling[i] that are less than i\n",
" # symbol_table[symbol][i] is the index in labeling of the ith number labeled symbol\n",
" self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
"\n",
" # if the initial value of X is too small, then C(X) might equal X. but we want C(X) to be different from X for everything to be reversible\n",
" # this is analagous to "appending a non-zero digit to the left of X" except in ANS it's "non-first-symbol symbol" instead\n",
" # if you're sure that no message will ever start with the first symbol, you can set initial_state = 0\n",
" self.initial_state = next((i for i, c in enumerate(labeling) if c != labeling[0]), len(labeling))\n",
" \n",
" def C(self, state: int, symbol: str):\n",
" """Returns the state + 1th number labeled symbol"""\n",
"\n",
" # full_blocks * count_per_block[symbol] + symbols_left = state + 1\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol] # equivalently, (state + 1) - full_blocks * self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" # Count symbols_left symbols within the block\n",
" index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" # if block_size is a power of 2, this multiplication is a bitshift\n",
" return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled symbol that are less than state"""\n",
"\n",
" index_within_block = state % self.block_size # if block_size is a power of 2, this is state & (block_size - 1)\n",
" symbol = self.labeling[index_within_block]\n",
"\n",
" num_previous_blocks = state // self.block_size # if block_size is a power of 2, this division is a bitshift\n",
" count_before_block = self.count_per_block[symbol] * num_previous_blocks\n",
" \n",
" return symbol, count_before_block + self.count_before_index[index_within_block]\n",
" \n",
" def encode(self, message: str | list):\n",
" state = self.initial_state\n",
" for symbol in message[::-1]:\n",
" state = self.C(state, symbol)\n",
" return state\n",
" \n",
" def decode(self, state: int):\n",
" message = ""\n",
" while state > self.initial_state:\n",
" symbol, state = self.D(state)\n",
" message += symbol\n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 37620.468208228645\n",
"code length: 38370 (1.992% longer than information)\n",
"huffman: 52210 (38.781% longer than information)\n"
]
}
],
"source": [
"ans = ANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 40000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state = ans.encode(message)\n",
"\n",
"ans_length = state.bit_length() - 1 # the first bit is always 1, so it's not counted\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state)\n",
"assert decoded == message, message + ", " + decoded"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Streamed version\n",
"incorporates renormalization\n",
"\n",
"state will now always be in [block_size, block_size * 2 - 1]. This means we have to initialize state to a number in that range, but the decoder doesn't need to know initial_state anymore, so you could store some data in initial_state that the decoder will reproduce at the end\n",
"\n",
"With streaming, we take a hit on compression rate"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"class StreamANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
"\n",
" self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
" \n",
" def C(self, state: int, symbol: str):\n",
" """Returns the state + 1th number labeled symbol"""\n",
"\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" # Count symbols_left symbols within the block\n",
" index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled symbol that are less than state"""\n",
"\n",
" # because of renormalization, state is guaranteed to be in [block_size, 2 * block_size - 1]\n",
" index_within_block = state - self.block_size\n",
" symbol = self.labeling[index_within_block]\n",
" \n",
" return symbol, self.count_per_block[symbol] + self.count_before_index[index_within_block]\n",
" \n",
" def encode(self, message: str | list, initial_state: int = 0):\n",
" bitstream = 1\n",
" state = [i for i in range(self.block_size) if self.C(i, message[-1]) >= self.block_size][initial_state] # the initial_stateth number X such that block_size <= C(X) < 2 * block_size\n",
"\n",
" for symbol in message[::-1]:\n",
"\n",
" # shrink X until C(X) < 2N\n",
" predicted = self.C(state, symbol)\n",
" normalized_state = state\n",
" while predicted >= 2 * self.block_size:\n",
" bitstream <<= 1\n",
" bitstream |= normalized_state & 1\n",
" normalized_state >>= 1\n",
" predicted = self.C(normalized_state, symbol)\n",
" \n",
" state = predicted\n",
" \n",
" return state, bitstream\n",
" \n",
" def decode(self, state: int, bitstream):\n",
" message = ""\n",
" \n",
" while True:\n",
" symbol, state = self.D(state)\n",
" message += symbol\n",
"\n",
" # expand X until X >= N or we run out of bits\n",
" while state < self.block_size and bitstream > 1:\n",
" state <<= 1\n",
" state |= bitstream & 1\n",
" bitstream >>= 1\n",
" if state < self.block_size: break\n",
" \n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 112715.83728822623\n",
"code length: 114047 (1.181% longer than information)\n",
"huffman: 156605 (38.938% longer than information)\n"
]
}
],
"source": [
"ans = StreamANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 120000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state, bitstring = ans.encode(message)\n",
"\n",
"ans_length = len(BLOCK_LABELING).bit_length() + bitstring.bit_length() - 1\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state, bitstring)\n",
"assert decoded == message, message + ", " + decoded"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tabled version\n",
"decode and encode don't use C or D anymore; they only reference renormalization_table, encoding_table, and decoding_table, which are generated at initialization. The table generation I'm doing is slow; you should probably look at other implementations to see how they do it\n",
"\n",
"Because we're storing the number of bits to renormalize with in the table, this only works when block_size is a power of 2(?); otherwise, encoding the same symbol into the same state might not pop the same number of bits to the bitstream. Duda mentions this in his paper; I'm not exactly sure how he does it"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [],
"source": [
"class tANS:\n",
" def init(self, labeling: str | list):\n",
" self.labeling = labeling\n",
" self.block_size = len(labeling)\n",
" self.block_mask = self.block_size - 1 # X & block_mask = X % block_size\n",
"\n",
" # these are now only used during initialization to generate tables\n",
" self.count_per_block = {}\n",
" self.count_before_index = []\n",
" self.symbol_table = {}\n",
" for i, c in enumerate(labeling):\n",
" if c not in self.symbol_table:\n",
" self.count_per_block[c] = 0\n",
" self.symbol_table[c] = []\n",
" self.count_before_index.append(self.count_per_block[c])\n",
" self.count_per_block[c] += 1\n",
" self.symbol_table[c].append(i)\n",
"\n",
" self.generate_tables()\n",
" \n",
" # C and D are only used during initialization\n",
" def C(self, state: int, symbol: str):\n",
" """Returns the state + 1th number labeled symbol"""\n",
"\n",
" full_blocks = (state + 1) // self.count_per_block[symbol]\n",
" symbols_left = (state + 1) % self.count_per_block[symbol]\n",
" if symbols_left == 0:\n",
" full_blocks -= 1\n",
" symbols_left = self.count_per_block[symbol]\n",
"\n",
" index_within_block = self.symbol_table[symbol][symbols_left - 1]\n",
" \n",
" return full_blocks * self.block_size + index_within_block\n",
" \n",
" def D(self, state: int):\n",
" """Counts the number of numbers labeled symbol that are less than state"""\n",
"\n",
" index_within_block = state & self.block_mask\n",
" symbol = self.labeling[index_within_block]\n",
" count_within_block = sum(c == symbol for c in self.labeling[:index_within_block])\n",
" \n",
" return symbol, self.count_per_block[symbol] + count_within_block\n",
" \n",
" # the way i'm generating these is bad; i just wrote it like this to make it more obvious what it's doing\n",
" def generate_tables(self):\n",
" # i'm not exactly sure about the proof, but encoding any one symbol symbol will always result in either n_bits_chopped or n_bits_chopped + 1 being output into the bitstream during renormalization\n",
" # it's n_bits_chopped + 1 when the state is over or equal to min_state. except here i use min_state = 0 as a special case where the number of bits output is never n_bits_chopped + 1\n",
" renormalization_table = {}\n",
" for symbol in self.symbol_table.keys():\n",
" n_bits_chopped = -1\n",
" min_state = 0\n",
" prev_n_bits = -1\n",
" for state in range(self.block_size, self.block_size * 2):\n",
" renormalized = state\n",
" n_bits = 0\n",
" while self.C(renormalized, symbol) >= self.block_size * 2:\n",
" renormalized >>= 1\n",
" n_bits += 1\n",
" if n_bits != prev_n_bits:\n",
" prev_n_bits = n_bits\n",
" if n_bits_chopped < 0:\n",
" n_bits_chopped = n_bits\n",
" else:\n",
" assert min_state == 0, "uh oh"\n",
" min_state = state\n",
" renormalization_table[symbol] = (n_bits_chopped, min_state, (1 << n_bits_chopped) - 1)\n",
" self.renormalization_table = renormalization_table\n",
"\n",
" encoding_table = { c: {} for c in self.symbol_table.keys() }\n",
" decoding_table = [0 for i in range(self.block_size)]\n",
" for state in range(self.block_size, self.block_size * 2):\n",
" symbol, old_state = self.D(state)\n",
" encoding_table[symbol][old_state] = state\n",
" n_bits = 0\n",
" scaled_state = old_state\n",
" while scaled_state < self.block_size:\n",
" scaled_state <<= 1\n",
" n_bits += 1\n",
" decoding_table[state - self.block_size] = (old_state << n_bits, n_bits, (1 << n_bits) - 1)\n",
" self.encoding_table = encoding_table\n",
" self.decoding_table = decoding_table\n",
" \n",
" def encode(self, message: str | list, initial_state: int = 0):\n",
" bitstream = 1\n",
" state = [i for i, c in enumerate(self.labeling) if c == message[-1]][initial_state] # the initial_stateth number labeled message[-1]\n",
" assert self.decoding_table[state][1] > 0, "code is ambiguous because initial state is too large" # we want this symbol to prompt the decoder to scale back up so that it can run out of bits and terminate\n",
"\n",
" state += self.block_size\n",
" for symbol in message[-2::-1]:\n",
" n_bits, min_state, mask = self.renormalization_table[symbol]\n",
" if min_state > 0 and state >= min_state:\n",
" n_bits += 1\n",
" mask = (mask << 1) | 1\n",
" bitstream = (bitstream << n_bits) | (state & mask)\n",
" state = self.encoding_table[symbol][state >> n_bits]\n",
" \n",
" return state, bitstream\n",
" \n",
" def decode(self, state: int, bitstream):\n",
" message = ""\n",
" \n",
" while True:\n",
" state -= self.block_size\n",
" message += self.labeling[state]\n",
" (state, num_bits, mask) = self.decoding_table[state]\n",
" if num_bits > 0 and bitstream <= 1: break\n",
" state |= bitstream & mask\n",
" bitstream >>= num_bits\n",
" \n",
" return message, state"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"information content: 140855.6232644608\n",
"code length: 142481 (1.154% longer than information)\n",
"huffman: 195671 (38.916% longer than information)\n"
]
}
],
"source": [
"ans = tANS(BLOCK_LABELING)\n",
"\n",
"message = "".join(choices(SYMBOLS, WEIGHTS, k = 150000))\n",
"information_content = -sum(log2(PROBS[symbol]) for symbol in message)\n",
"\n",
"state, bitstring = ans.encode(message)\n",
"\n",
"ans_length = len(BLOCK_LABELING).bit_length() + bitstring.bit_length() - 1\n",
"huffman_length = len(huffman.encode(message))\n",
"\n",
"print(f"information content: {information_content}")\n",
"print(f"code length: {ans_length} ({ans_length / information_content - 1:.3%} longer than information)")\n",
"print(f"huffman: {huffman_length} ({huffman_length / information_content - 1:.3%} longer than information)")\n",
"\n",
"decoded, state = ans.decode(state, bitstring)\n",
"assert decoded == message, message + ", " + decoded"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment